KafkaAssetConsumer#

Kafka consumer for the OpenFactory stream ASSETS_STREAM.

class openfactory.kafka.asset_consumer.KafkaAssetConsumer(asset_uuid, consumer_group_id, on_message, ksqlClient, bootstrap_servers='${KAFKA_BROKER}')[source]#

Bases: object

Kafka consumer for OpenFactory’s ASSETS_STREAM.

Consumes messages for an Asset with UUID asset_uuid Messages are filtered by key, and optionally via the filter_messages method.

Example usage:
from openfactory.kafka import KafkaAssetConsumer, KSQLDBClient

def on_message(msg_key, msg_value):
    print(f"[{msg_key}] {msg_value}")

class PROVER3018_EventsConsumer(KafkaAssetConsumer):

    def filter_messages(self, msg_value):
        # filter Events messages
        if msg_value['type'] == 'Events':
            return msg_value
        else:
            return None

consumer = PROVER3018_EventsConsumer(
    asset_uuid="PROVER3018",
    consumer_group_id="demo_ofa_assets_consumer_group",
    on_message=on_message,
    ksqlClient=KSQLDBClient('http://localhost:8088'),
    bootstrap_servers="localhost:9092"
)

consumer.consume()
KSQL_ASSET_STREAM = 'ASSETS_STREAM'#
__init__(asset_uuid, consumer_group_id, on_message, ksqlClient, bootstrap_servers='${KAFKA_BROKER}')[source]#

Initialize the Kafka consumer.

Parameters:
  • asset_uuid (str) – UUID of the asset to filter messages by.

  • consumer_group_id (str) – Kafka consumer group ID.

  • on_message (Callable) – Callback to process each valid message.

  • ksqlClient (KSQLDBClient) – Client object to get Kafka topic information.

  • bootstrap_servers (str) – Kafka bootstrap servers (default from config).

consume()[source]#

Start consuming messages from the Kafka topic.

Polls for new messages, applies key and value filtering, and dispatches valid messages to the on_message callback.

Return type:

None

Raises:
consumer_timeout = 0.1#
filter_messages(msg_value)[source]#

Optional filter hook to refine messages before processing.

Can be overridden in subclasses to apply custom filtering logic.

Return type:

dict

Parameters:

msg_value (dict) – The parsed Kafka message value.

Returns:

dict – The (possibly filtered) message value.

stop()[source]#

Signal the consumer to stop.

Return type:

None