OpenFactory Asset Forwarder#
This module implements the AssetForwarderService, an OpenFactory application responsible for forwarding asset updates from Kafka to NATS clusters.
The service consumes messages from the configured Kafka topic, enriches them with forwarding metadata, determines the target NATS cluster using consistent hashing of the asset UUID, and publishes the message to the corresponding NATS subject.
Architecture#
The service is built on OpenFactoryFastAPIApp and provides:
Kafka consumer integration.
NATS cluster management.
Consistent hash based routing.
Prometheus metrics.
Health and lifecycle management.
Asynchronous worker processing.
Message Flow#
Consume asset messages from Kafka.
Decode and validate payloads.
Add Kafka and forwarder timestamps.
Queue messages for worker processing.
Determine destination NATS cluster.
Publish to the target NATS subject.
Store Kafka offsets after successful handling.
Routing#
Assets are routed using a ConsistentHashRing based on the Kafka message key (asset UUID). This ensures stable routing of a given asset to the same NATS cluster while minimizing redistribution when clusters are added or removed.
Observability#
The service exposes Prometheus metrics including:
Kafka consumption rate.
Kafka consumer errors.
Invalid message count.
NATS publish success rate.
NATS publish failure rate.
Queue backlog.
Queue waiting time.
Message processing latency.
The Prometheus endpoint is exposed through the OpenFactory application framework.
- class openfactory.fanoutlayer.asset_forwarder.src.asset_forwarder_service.AssetForwarderService(*args, **kwargs)[source]#
Bases:
OpenFactoryFastAPIApp- __init__(*args, **kwargs)[source]#
Initialize the AssetForwarderService.
This constructor forwards all parameters to
OpenFactoryFastAPIApp- Parameters:
ksqlClient – KSQL client instance.
bootstrap_servers – Kafka bootstrap server address.
asset_router_url – Asset Router URL.
loglevel – Logging level (e.g.,
INFO,DEBUG).test_mode – Enables test mode (disables live Kafka/ksql interaction).
See also
OpenFactoryFastAPIAppfor full initialization details and environment variable handling.
- async async_main_loop()[source]#
Main service loop.
Starts worker tasks, subscribes to Kafka, consumes messages, and queues them for asynchronous processing.
- Return type:
- build_nats_message(msg, value)[source]#
Determine the target NATS cluster and build a NATS message.
- Return type:
tuple[NatsCluster,str,bytes]- Parameters:
msg – Kafka message.
value – Decoded message payload.
- Returns:
Tuple containing – - Target NATS cluster - NATS subject - Serialized payload
- Raises:
ValueError – If the asset UUID is missing or the message does not contain a valid ID field.
- decode_message_value(raw_value)[source]#
Decode a Kafka message payload from JSON.
- Return type:
- Parameters:
raw_value – Raw payload returned by Kafka.
- Returns:
Parsed JSON payload as a dictionary.
- Raises:
ValueError – If the payload type is unsupported.
json.JSONDecodeError – If the payload is not valid JSON.
- setup_consumer()[source]#
Build the Kafka consumer.
Reads consumer configuration from environment variables and initializes the underlying confluent-kafka Consumer.
- Return type:
- Environment variables:
KAFKA_TOPIC: Kafka topic to consume from. KAFKA_GROUP: Kafka consumer group. KAFKA_CONSUMER_BATCH_SIZE: Maximum messages to consume per poll. KAFKA_CONSUMER_TIME_OUT_MS: Poll timeout in milliseconds. KAFKA_CONSUMER_COMMIT_INTERVAL_MS: Auto commit interval. KAFKA_AUTO_OFFSET_RESET: Offset reset policy.
- Raises:
KeyError – If KAFKA_BROKER is not defined.