Source code for openfactory.kafka.asset_producer
""" OpenFactory Assets Kafka Producers. """
import json
from confluent_kafka import Producer
from openfactory.kafka.ksql import KSQLDBClient
from openfactory.assets.utils import AssetAttribute
import openfactory.config as config
[docs]
class AssetProducer(Producer):
"""
Kafka producer for sending OpenFactory asset data.
This class wraps a Kafka producer and binds it to a specific asset and topic used by OpenFactory.
Attributes:
ksql (KSQLDBClient): Client used to interact with ksqlDB.
topic (str): Kafka topic to which asset data will be produced.
asset_uuid (str): Unique identifier of the asset being tracked.
"""
[docs]
def __init__(self, ksqlClient: KSQLDBClient, bootstrap_servers: str = config.KAFKA_BROKER) -> None:
"""
Initializes the AssetProducer.
Args:
ksqlClient (KSQLDBClient): Client to retrieve Kafka topic info, typically a wrapper over ksqlDB.
bootstrap_servers (str): Kafka bootstrap server address, defaults to value from config.
"""
super().__init__({'bootstrap.servers': bootstrap_servers})
self.ksql = ksqlClient
self.topic = self.ksql.get_kafka_topic('ASSETS_STREAM')
[docs]
def send_asset_attribute(self, asset_uuid: str, assetAttribute: AssetAttribute, attributes: dict | None = None) -> None:
"""
Sends a Kafka message representing an asset attribute.
Constructs a JSON message from the given asset attribute and sends it to the Kafka ASSETS_STREAM topic.
Args:
asset_uuid (str): UUID of the asset this producer is associated with.
assetAttribute (AssetAttribute): The asset attribute object containing value, type, tag, and timestamp.
attributes (dict, optional): Additional attributes to include under the "attributes" field in the message.
"""
# Base attributes with timestamp
attr_payload = {"timestamp": assetAttribute.timestamp}
# If additional attributes were provided, merge them in
if attributes:
attr_payload.update(attributes)
msg = {
"ID": assetAttribute.id,
"VALUE": assetAttribute.value,
"TAG": assetAttribute.tag,
"TYPE": assetAttribute.type,
"attributes": attr_payload,
}
self.produce(topic=self.topic,
key=asset_uuid,
value=json.dumps(msg))
self.flush()