Source code for openfactory.kafka.asset_consumer

""" Kafka consumer for the OpenFactory stream ASSETS_STREAM. """

import json
import threading
import traceback
from confluent_kafka import Consumer, KafkaError
from openfactory.kafka.case_insensitive_dict import CaseInsensitiveDict
import openfactory.config as config
from openfactory.kafka.ksql import KSQLDBClient
from openfactory.kafka.kafka_logger import kafka_logger


[docs] class KafkaAssetConsumer: """ Kafka consumer for OpenFactory's ASSETS_STREAM. Consumes messages for an Asset with UUID asset_uuid Messages are filtered by key, and optionally via the `filter_messages` method. Example usage: .. code-block:: python from openfactory.kafka import KafkaAssetConsumer, KSQLDBClient def on_message(msg_key, msg_value): print(f"[{msg_key}] {msg_value}") class PROVER3018_EventsConsumer(KafkaAssetConsumer): def filter_messages(self, msg_value): # filter Events messages if msg_value['type'] == 'Events': return msg_value else: return None consumer = PROVER3018_EventsConsumer( asset_uuid="PROVER3018", consumer_group_id="demo_ofa_assets_consumer_group", on_message=on_message, ksqlClient=KSQLDBClient('http://localhost:8088'), bootstrap_servers="localhost:9092" ) consumer.consume() """ consumer_timeout = 0.1 KSQL_ASSET_STREAM = 'ASSETS_STREAM'
[docs] def __init__( self, asset_uuid: str, consumer_group_id: str, on_message: callable, ksqlClient: KSQLDBClient, bootstrap_servers: str = config.KAFKA_BROKER ): """ Initialize the Kafka consumer. Args: asset_uuid (str): UUID of the asset to filter messages by. consumer_group_id (str): Kafka consumer group ID. on_message (Callable): Callback to process each valid message. ksqlClient (KSQLDBClient): Client object to get Kafka topic information. bootstrap_servers (str): Kafka bootstrap servers (default from config). """ self.ksql = ksqlClient self.topic = self.ksql.get_kafka_topic(self.KSQL_ASSET_STREAM) self.bootstrap_servers = bootstrap_servers self.group_id = consumer_group_id self.key = asset_uuid self.on_message = on_message self.running = threading.Event() self.running.set() self.consumer = Consumer({ 'bootstrap.servers': self.bootstrap_servers, 'group.id': self.group_id, 'auto.offset.reset': 'latest', }) self.consumer.subscribe([self.topic])
[docs] def filter_messages(self, msg_value: dict) -> dict: """ Optional filter hook to refine messages before processing. Can be overridden in subclasses to apply custom filtering logic. Args: msg_value (dict): The parsed Kafka message value. Returns: dict: The (possibly filtered) message value. """ return msg_value
[docs] def consume(self) -> None: """ Start consuming messages from the Kafka topic. Polls for new messages, applies key and value filtering, and dispatches valid messages to the `on_message` callback. Raises: json.JSONDecodeError: If a message value is not valid JSON. Exception: For any other unexpected errors during consumption. """ kafka_logger.info(f"Starting consuming messages for asset {self.key}") try: while self.running.is_set(): msg = self.consumer.poll(timeout=self.consumer_timeout) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: kafka_logger.error(f"Error: {msg.error()}") break # Decode the message and filter key msg_key = msg.key().decode('utf-8') if msg.key() else None if msg_key != self.key: continue msg_value = json.loads(msg.value().decode('utf-8')) msg_value = CaseInsensitiveDict(msg_value) # Apply additional filter msg_value = self.filter_messages(msg_value) if msg_value: kafka_logger.debug(f"[{msg_key}] {msg_value}") self.on_message(msg_key, msg_value) except json.JSONDecodeError as e: kafka_logger.error(f"Topic contained a none JSON value: {e} - raw: {msg.value().decode('utf-8')}") except Exception as e: print(f"Exception in KafkaAssetConsumer: {e}") traceback.print_exc() finally: kafka_logger.info("Closing KafkaAssetConsumer ...") self.consumer.close()
[docs] def stop(self) -> None: """ Signal the consumer to stop. """ self.running.clear()
if __name__ == "__main__": """ Example usage of KafkaAssetConsumer for processing asset-related events. This example demonstrates how to: - Instantiate the KafkaAssetConsumer class with a custom asset UUID. - Create a subclass of KafkaAssetConsumer (`PROVER3018_EventsConsumer`) to filter out specific messages. - Use a custom callback (`on_message`) to handle the messages. - Consume Kafka messages related to the asset `PROVER3018` using the configured consumer. To run the example: python openfactory/kafka/asset_consumer.py """ kafka_logger.level = 'INFO' ksql = KSQLDBClient(config.KSQLDB_URL) def on_message(msg_key, msg_value): """ Callback to process received messages. """ print(f"[{msg_key}] {msg_value}") class PROVER3018_EventsConsumer(KafkaAssetConsumer): """ Example KafkaAssetConsumer. """ def filter_messages(self, msg_value): """ Filters out Events. """ if msg_value['type'] == 'Events': return msg_value else: return None consumer = PROVER3018_EventsConsumer( asset_uuid="PROVER3018", consumer_group_id="demo_ofa_assets_consumer_group", on_message=on_message, ksqlClient=ksql, bootstrap_servers="localhost:9092" ) consumer.consume()