KafkaAssetConsumer#
Kafka consumer for the OpenFactory stream ASSETS_STREAM.
- class openfactory.kafka.asset_consumer.KafkaAssetConsumer(asset_uuid, consumer_group_id, on_message, ksqlClient, bootstrap_servers='${KAFKA_BROKER}')[source]#
Bases:
objectKafka consumer for OpenFactory’s ASSETS_STREAM.
Consumes messages for an Asset with UUID asset_uuid Messages are filtered by key, and optionally via the filter_messages method.
- Example usage:
from openfactory.kafka import KafkaAssetConsumer, KSQLDBClient def on_message(msg_key, msg_value): print(f"[{msg_key}] {msg_value}") class PROVER3018_EventsConsumer(KafkaAssetConsumer): def filter_messages(self, msg_value): # filter Events messages if msg_value['type'] == 'Events': return msg_value else: return None consumer = PROVER3018_EventsConsumer( asset_uuid="PROVER3018", consumer_group_id="demo_ofa_assets_consumer_group", on_message=on_message, ksqlClient=KSQLDBClient('http://localhost:8088'), bootstrap_servers="localhost:9092" ) consumer.consume()
- KSQL_ASSET_STREAM = 'ASSETS_STREAM'#
- __init__(asset_uuid, consumer_group_id, on_message, ksqlClient, bootstrap_servers='${KAFKA_BROKER}')[source]#
Initialize the Kafka consumer.
- Parameters:
asset_uuid (str) – UUID of the asset to filter messages by.
consumer_group_id (str) – Kafka consumer group ID.
on_message (Callable) – Callback to process each valid message.
ksqlClient (KSQLDBClient) – Client object to get Kafka topic information.
bootstrap_servers (str) – Kafka bootstrap servers (default from config).
- consume()[source]#
Start consuming messages from the Kafka topic.
Polls for new messages, applies key and value filtering, and dispatches valid messages to the on_message callback.
- Return type:
- Raises:
json.JSONDecodeError – If a message value is not valid JSON.
Exception – For any other unexpected errors during consumption.
- consumer_timeout = 0.1#