KafkaAssetUNSConsumer#
Kafka consumer for the OpenFactory stream ASSETS_STREAM_UNS.
- class openfactory.kafka.asset_uns_consumer.KafkaAssetUNSConsumer(asset_uns_id, consumer_group_id, on_message, ksqlClient, bootstrap_servers='${KAFKA_BROKER}')[source]#
Bases:
KafkaAssetConsumerKafka consumer for OpenFactory’s ASSETS_STREAM_UNS.
Consumes messages for an Asset with UNS_ID asset_uns_id Messages are filtered by key, and optionally via the filter_messages method.
- KSQL_ASSET_STREAM = 'ASSETS_STREAM_UNS'#
- __init__(asset_uns_id, consumer_group_id, on_message, ksqlClient, bootstrap_servers='${KAFKA_BROKER}')[source]#
Initialize the Kafka consumer.
- Parameters:
asset_uns_id (str) – UNS_ID of the asset to filter messages by.
consumer_group_id (str) – Kafka consumer group ID.
on_message (Callable) – Callback to process each valid message.
ksqlClient (KSQLDBClient) – Client object to get Kafka topic information.
bootstrap_servers (str) – Kafka bootstrap servers (default from config).