Source code for openfactory.kafka.asset_uns_consumer
""" Kafka consumer for the OpenFactory stream ASSETS_STREAM_UNS. """
from openfactory.kafka import KafkaAssetConsumer
from openfactory.kafka.ksql import KSQLDBClient
import openfactory.config as config
[docs]
class KafkaAssetUNSConsumer(KafkaAssetConsumer):
"""
Kafka consumer for OpenFactory's ASSETS_STREAM_UNS.
Consumes messages for an Asset with UNS_ID asset_uns_id
Messages are filtered by key, and optionally via the `filter_messages` method.
"""
KSQL_ASSET_STREAM = 'ASSETS_STREAM_UNS'
[docs]
def __init__(
self,
asset_uns_id: str,
consumer_group_id: str,
on_message: callable,
ksqlClient: KSQLDBClient,
bootstrap_servers: str = config.KAFKA_BROKER
):
"""
Initialize the Kafka consumer.
Args:
asset_uns_id (str): UNS_ID of the asset to filter messages by.
consumer_group_id (str): Kafka consumer group ID.
on_message (Callable): Callback to process each valid message.
ksqlClient (KSQLDBClient): Client object to get Kafka topic information.
bootstrap_servers (str): Kafka bootstrap servers (default from config).
"""
super().__init__(asset_uns_id, consumer_group_id, on_message, ksqlClient, bootstrap_servers)
if __name__ == "__main__":
"""
Example usage of KafkaAssetUNSConsumer for processing asset-related events.
This example demonstrates how to:
- Instantiate the KafkaAssetUNSConsumer class with a custom asset UNS_ID.
- Create a subclass of KafkaAssetUNSConsumer (`CNC_EventsConsumer`) to filter out specific messages.
- Use a custom callback (`on_message`) to handle the messages.
- Consume Kafka messages related to the asset with UNS ID `cnc` using the configured consumer.
To run the example:
python openfactory/kafka/asset_uns_consumer.py
"""
from openfactory.kafka.ksql import KSQLDBClient
from openfactory.kafka.kafka_logger import kafka_logger
kafka_logger.level = 'INFO'
ksql = KSQLDBClient(config.KSQLDB_URL)
def on_message(msg_key, msg_value):
""" Callback to process received messages. """
print(f"[{msg_key}] {msg_value}")
class CNC_EventsConsumer(KafkaAssetUNSConsumer):
""" Example KafkaAssetUNSConsumer. """
def filter_messages(self, msg_value):
""" Filters out Events. """
if msg_value['type'] == 'Events':
return msg_value
else:
return None
consumer = CNC_EventsConsumer(
asset_uns_id="cnc",
consumer_group_id="demo_ofa_assets_uns_consumer_group",
on_message=on_message,
ksqlClient=ksql,
bootstrap_servers="localhost:9092"
)
consumer.consume()