KafkaAssetUNSConsumer#

Kafka consumer for the OpenFactory stream ASSETS_STREAM_UNS.

class openfactory.kafka.asset_uns_consumer.KafkaAssetUNSConsumer(asset_uns_id, consumer_group_id, on_message, ksqlClient, bootstrap_servers='${KAFKA_BROKER}')[source]#

Bases: KafkaAssetConsumer

Kafka consumer for OpenFactory’s ASSETS_STREAM_UNS.

Consumes messages for an Asset with UNS_ID asset_uns_id Messages are filtered by key, and optionally via the filter_messages method.

KSQL_ASSET_STREAM = 'ASSETS_STREAM_UNS'#
__init__(asset_uns_id, consumer_group_id, on_message, ksqlClient, bootstrap_servers='${KAFKA_BROKER}')[source]#

Initialize the Kafka consumer.

Parameters:
  • asset_uns_id (str) – UNS_ID of the asset to filter messages by.

  • consumer_group_id (str) – Kafka consumer group ID.

  • on_message (Callable) – Callback to process each valid message.

  • ksqlClient (KSQLDBClient) – Client object to get Kafka topic information.

  • bootstrap_servers (str) – Kafka bootstrap servers (default from config).