Source code for openfactory.kafka.commands_consumer
""" Kafka consumer for OpenFactory's CMDS_STREAM. """
import json
import threading
from typing import Dict
from confluent_kafka import Consumer, KafkaError
import openfactory.config as config
from openfactory.kafka import CaseInsensitiveDict
from openfactory.kafka.ksql import KSQLDBClient
from openfactory.kafka.kafka_logger import kafka_logger
[docs]
class KafkaCommandsConsumer:
"""
Kafka consumer for OpenFactory CMDS_STREAM.
Consumes command messages intended for a specific asset identified by `asset_uuid`.
A user-provided callback (`on_command`) is called when a valid message is received.
You can override `filter_messages` to apply custom filtering on received messages.
Example usage:
.. code-block:: python
from openfactory.kafka import KafkaCommandsConsumer, KSQLDBClient
def on_command(msg_key, msg_value):
# Callback to process received messages.
print(f"[{msg_key}] {msg_value}")
consumer = KafkaCommandsConsumer(
consumer_group_id="demo_ofa_commands_consumer_group",
asset_uuid="PROVER3018",
on_command=on_command,
ksqlClient=KSQLDBClient('http://localhost:8088'),
bootstrap_servers="localhost:9092"
)
consumer.consume()
"""
consumer_timeout = 0.1
[docs]
def __init__(
self,
consumer_group_id: str,
asset_uuid: str,
on_command: callable,
ksqlClient: KSQLDBClient,
bootstrap_servers: str = config.KAFKA_BROKER
):
"""
Initialize the KafkaCommandsConsumer.
Args:
consumer_group_id (str): Kafka consumer group ID.
asset_uuid (str): UUID of the asset to filter messages for.
on_command (Callable): Callback to process valid messages.
ksqlClient (KSQLDBClient): Client to retrieve Kafka topic metadata.
bootstrap_servers (str): Kafka broker address.
"""
self.ksql = ksqlClient
self.topic = self.ksql.get_kafka_topic('CMDS_STREAM')
self.bootstrap_servers = bootstrap_servers
self.group_id = consumer_group_id
self.key = asset_uuid
self.on_command = on_command
self.running = threading.Event()
self.running.set()
self.consumer = Consumer({
'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group_id,
'auto.offset.reset': 'latest',
})
self.consumer.subscribe([self.topic])
[docs]
def filter_messages(self, msg_value: Dict) -> Dict:
"""
Optional message filter method.
Override this method to filter or transform command messages before they
are passed to the `on_command` callback.
Args:
msg_value (dict): The incoming message value as a dictionary.
Returns:
dict or None: The message value to process, or None to discard it.
"""
return msg_value
[docs]
def consume(self) -> None:
"""
Start consuming messages from Kafka.
This method runs a loop, polling the Kafka topic for new messages,
filtering them by key and (optionally) content, and dispatching
them to the `on_command` callback.
Raises:
json.JSONDecodeError: If a message is not valid JSON.
Exception: For all other runtime errors during consumption.
"""
kafka_logger.info(f"Starting consuming messages for asset {self.key}")
try:
while self.running.is_set():
msg = self.consumer.poll(timeout=self.consumer_timeout)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
kafka_logger.error(f"Error: {msg.error()}")
break
# Decode the message
msg_key = msg.key().decode('utf-8') if msg.key() else None
msg_value = json.loads(msg.value().decode('utf-8'))
msg_value = CaseInsensitiveDict(msg_value)
# Filter desired key
if msg_key != self.key:
continue
# Apply additional filter
msg_value = self.filter_messages(msg_value)
if msg_value:
kafka_logger.debug(f"[{msg_key}] {msg_value}")
self.on_command(msg_key, msg_value)
except json.JSONDecodeError as e:
kafka_logger.error(f"Commands topic contained a none JSON value: {e} - raw: {msg.value().decode('utf-8')}")
except Exception as e:
print(f"Exception in KafkaCommandsConsumer: {e}")
finally:
kafka_logger.info("Closing KafkaCommandsConsumer ...")
self.consumer.close()
[docs]
def stop(self) -> None:
""" Signal the consumer to stop. """
self.running.clear()
if __name__ == "__main__":
# Example usage of KafkaCommandsConsumer class
kafka_logger.level = 'INFO'
ksql = KSQLDBClient(config.KSQLDB_URL)
def on_command(msg_key, msg_value):
""" Callback to process received messages. """
print(f"[{msg_key}] {msg_value}")
consumer = KafkaCommandsConsumer(
consumer_group_id="demo_ofa_commands_consumer_group",
asset_uuid="PROVER3018",
on_command=on_command,
ksqlClient=ksql,
bootstrap_servers="localhost:9092"
)
consumer.consume()