KafkaCommandsConsumer#
Kafka consumer for OpenFactory’s CMDS_STREAM.
- class openfactory.kafka.commands_consumer.KafkaCommandsConsumer(consumer_group_id, asset_uuid, on_command, ksqlClient, bootstrap_servers=None)[source]#
Bases:
objectKafka consumer for OpenFactory CMDS_STREAM.
Consumes command messages intended for a specific asset identified by asset_uuid. A user-provided callback (on_command) is called when a valid message is received. You can override filter_messages to apply custom filtering on received messages.
- Example usage:
from openfactory.kafka import KafkaCommandsConsumer, KSQLDBClient def on_command(msg_key, msg_value): # Callback to process received messages. print(f"[{msg_key}] {msg_value}") consumer = KafkaCommandsConsumer( consumer_group_id="demo_ofa_commands_consumer_group", asset_uuid="PROVER3018", on_command=on_command, ksqlClient=KSQLDBClient('http://localhost:8088'), bootstrap_servers="localhost:9092" ) consumer.consume()
- __init__(consumer_group_id, asset_uuid, on_command, ksqlClient, bootstrap_servers=None)[source]#
Initialize the KafkaCommandsConsumer.
- Parameters:
consumer_group_id (str) – Kafka consumer group ID.
asset_uuid (str) – UUID of the asset to filter messages for.
on_command (Callable) – Callback to process valid messages.
ksqlClient (KSQLDBClient) – Client to retrieve Kafka topic metadata.
bootstrap_servers (str) – Kafka broker address.
- consume()[source]#
Start consuming messages from Kafka.
This method runs a loop, polling the Kafka topic for new messages, filtering them by key and (optionally) content, and dispatching them to the on_command callback.
- Return type:
- Raises:
json.JSONDecodeError – If a message is not valid JSON.
Exception – For all other runtime errors during consumption.
- consumer_timeout = 0.1#