Source code for openfactory.kafka.asset_uns_consumer

""" Kafka consumer for the OpenFactory  stream ASSETS_STREAM_UNS. """

from openfactory.kafka import KafkaAssetConsumer
from openfactory.kafka.ksql import KSQLDBClient
import openfactory.config as config


[docs] class KafkaAssetUNSConsumer(KafkaAssetConsumer): """ Kafka consumer for OpenFactory's ASSETS_STREAM_UNS. Consumes messages for an Asset with UNS_ID asset_uns_id Messages are filtered by key, and optionally via the `filter_messages` method. """ KSQL_ASSET_STREAM = 'ASSETS_STREAM_UNS'
[docs] def __init__( self, asset_uns_id: str, consumer_group_id: str, on_message: callable, ksqlClient: KSQLDBClient, bootstrap_servers: str = config.KAFKA_BROKER ): """ Initialize the Kafka consumer. Args: asset_uns_id (str): UNS_ID of the asset to filter messages by. consumer_group_id (str): Kafka consumer group ID. on_message (Callable): Callback to process each valid message. ksqlClient (KSQLDBClient): Client object to get Kafka topic information. bootstrap_servers (str): Kafka bootstrap servers (default from config). """ super().__init__(asset_uns_id, consumer_group_id, on_message, ksqlClient, bootstrap_servers)
if __name__ == "__main__": """ Example usage of KafkaAssetUNSConsumer for processing asset-related events. This example demonstrates how to: - Instantiate the KafkaAssetUNSConsumer class with a custom asset UNS_ID. - Create a subclass of KafkaAssetUNSConsumer (`CNC_EventsConsumer`) to filter out specific messages. - Use a custom callback (`on_message`) to handle the messages. - Consume Kafka messages related to the asset with UNS ID `cnc` using the configured consumer. To run the example: python openfactory/kafka/asset_uns_consumer.py """ from openfactory.kafka.ksql import KSQLDBClient from openfactory.kafka.kafka_logger import kafka_logger kafka_logger.level = 'INFO' ksql = KSQLDBClient(config.KSQLDB_URL) def on_message(msg_key, msg_value): """ Callback to process received messages. """ print(f"[{msg_key}] {msg_value}") class CNC_EventsConsumer(KafkaAssetUNSConsumer): """ Example KafkaAssetUNSConsumer. """ def filter_messages(self, msg_value): """ Filters out Events. """ if msg_value['type'] == 'Events': return msg_value else: return None consumer = CNC_EventsConsumer( asset_uns_id="cnc", consumer_group_id="demo_ofa_assets_uns_consumer_group", on_message=on_message, ksqlClient=ksql, bootstrap_servers="localhost:9092" ) consumer.consume()