KafkaCommandsConsumer#

Kafka consumer for OpenFactory’s CMDS_STREAM.

class openfactory.kafka.commands_consumer.KafkaCommandsConsumer(consumer_group_id, asset_uuid, on_command, ksqlClient, bootstrap_servers='${KAFKA_BROKER}')[source]#

Bases: object

Kafka consumer for OpenFactory CMDS_STREAM.

Consumes command messages intended for a specific asset identified by asset_uuid. A user-provided callback (on_command) is called when a valid message is received. You can override filter_messages to apply custom filtering on received messages.

Example usage:
from openfactory.kafka import KafkaCommandsConsumer, KSQLDBClient

def on_command(msg_key, msg_value):
    # Callback to process received messages.
    print(f"[{msg_key}] {msg_value}")

consumer = KafkaCommandsConsumer(
    consumer_group_id="demo_ofa_commands_consumer_group",
    asset_uuid="PROVER3018",
    on_command=on_command,
    ksqlClient=KSQLDBClient('http://localhost:8088'),
    bootstrap_servers="localhost:9092"
)

consumer.consume()
__init__(consumer_group_id, asset_uuid, on_command, ksqlClient, bootstrap_servers='${KAFKA_BROKER}')[source]#

Initialize the KafkaCommandsConsumer.

Parameters:
  • consumer_group_id (str) – Kafka consumer group ID.

  • asset_uuid (str) – UUID of the asset to filter messages for.

  • on_command (Callable) – Callback to process valid messages.

  • ksqlClient (KSQLDBClient) – Client to retrieve Kafka topic metadata.

  • bootstrap_servers (str) – Kafka broker address.

consume()[source]#

Start consuming messages from Kafka.

This method runs a loop, polling the Kafka topic for new messages, filtering them by key and (optionally) content, and dispatching them to the on_command callback.

Return type:

None

Raises:
consumer_timeout = 0.1#
filter_messages(msg_value)[source]#

Optional message filter method.

Override this method to filter or transform command messages before they are passed to the on_command callback.

Return type:

Dict

Parameters:

msg_value (dict) – The incoming message value as a dictionary.

Returns:

dict or None – The message value to process, or None to discard it.

stop()[source]#

Signal the consumer to stop.

Return type:

None