Source code for openfactory.kafka.asset_producer

""" OpenFactory Assets Kafka Producers. """

import json
from confluent_kafka import Producer
from openfactory.kafka.ksql import KSQLDBClient
from openfactory.assets.utils import AssetAttribute
import openfactory.config as config


[docs] class AssetProducer(Producer): """ Kafka producer for sending OpenFactory asset data. This class wraps a Kafka producer and binds it to a specific asset and topic used by OpenFactory. Attributes: ksql (KSQLDBClient): Client used to interact with ksqlDB. topic (str): Kafka topic to which asset data will be produced. asset_uuid (str): Unique identifier of the asset being tracked. """
[docs] def __init__(self, ksqlClient: KSQLDBClient, bootstrap_servers: str = config.KAFKA_BROKER) -> None: """ Initializes the AssetProducer. Args: ksqlClient (KSQLDBClient): Client to retrieve Kafka topic info, typically a wrapper over ksqlDB. bootstrap_servers (str): Kafka bootstrap server address, defaults to value from config. """ super().__init__({'bootstrap.servers': bootstrap_servers}) self.ksql = ksqlClient self.topic = self.ksql.get_kafka_topic('ASSETS_STREAM')
[docs] def send_asset_attribute(self, asset_uuid: str, assetAttribute: AssetAttribute) -> None: """ Sends a Kafka message representing an asset attribute. Constructs a JSON message from the given asset attribute and sends it to the Kafka ASSETS_STREAM topic. Args: asset_uuid (str): UUID of the asset this producer is associated with. assetAttribute (AssetAttribute): The asset attribute object containing value, type, tag, and timestamp. """ msg = { "ID": assetAttribute.id, "VALUE": assetAttribute.value, "TAG": assetAttribute.tag, "TYPE": assetAttribute.type, "attributes": { "timestamp": assetAttribute.timestamp } } self.produce(topic=self.topic, key=asset_uuid, value=json.dumps(msg)) self.flush()