OpenFactory Asset Forwarder#

This module implements the AssetForwarderService, an OpenFactory application responsible for forwarding asset updates from Kafka to NATS clusters.

The service consumes messages from the configured Kafka topic, enriches them with forwarding metadata, determines the target NATS cluster using consistent hashing of the asset UUID, and publishes the message to the corresponding NATS subject.

Architecture#

The service is built on OpenFactoryFastAPIApp and provides:

  • Kafka consumer integration.

  • NATS cluster management.

  • Consistent hash based routing.

  • Prometheus metrics.

  • Health and lifecycle management.

  • Asynchronous worker processing.

Message Flow#

  1. Consume asset messages from Kafka.

  2. Decode and validate payloads.

  3. Add Kafka and forwarder timestamps.

  4. Queue messages for worker processing.

  5. Determine destination NATS cluster.

  6. Publish to the target NATS subject.

  7. Store Kafka offsets after successful handling.

Routing#

Assets are routed using a ConsistentHashRing based on the Kafka message key (asset UUID). This ensures stable routing of a given asset to the same NATS cluster while minimizing redistribution when clusters are added or removed.

Observability#

The service exposes Prometheus metrics including:

  • Kafka consumption rate.

  • Kafka consumer errors.

  • Invalid message count.

  • NATS publish success rate.

  • NATS publish failure rate.

  • Queue backlog.

  • Queue waiting time.

  • Message processing latency.

The Prometheus endpoint is exposed through the OpenFactory application framework.

class openfactory.fanoutlayer.asset_forwarder.src.asset_forwarder_service.AssetForwarderService(*args, **kwargs)[source]#

Bases: OpenFactoryFastAPIApp

__init__(*args, **kwargs)[source]#

Initialize the AssetForwarderService.

This constructor forwards all parameters to OpenFactoryFastAPIApp

Parameters:
  • ksqlClient – KSQL client instance.

  • bootstrap_servers – Kafka bootstrap server address.

  • asset_router_url – Asset Router URL.

  • loglevel – Logging level (e.g., INFO, DEBUG).

  • test_mode – Enables test mode (disables live Kafka/ksql interaction).

See also

OpenFactoryFastAPIApp for full initialization details and environment variable handling.

add_timestamps(value, msg)[source]#

Add Kafka and forwarder timestamps to a message payload.

Return type:

dict[str, Any]

Parameters:
  • value – Decoded message payload.

  • msg – Kafka message.

Returns:

Updated payload.

async async_main_loop()[source]#

Main service loop.

Starts worker tasks, subscribes to Kafka, consumes messages, and queues them for asynchronous processing.

Return type:

None

build_nats_message(msg, value)[source]#

Determine the target NATS cluster and build a NATS message.

Return type:

tuple[NatsCluster, str, bytes]

Parameters:
  • msg – Kafka message.

  • value – Decoded message payload.

Returns:

Tuple containing – - Target NATS cluster - NATS subject - Serialized payload

Raises:

ValueError – If the asset UUID is missing or the message does not contain a valid ID field.

decode_message_value(raw_value)[source]#

Decode a Kafka message payload from JSON.

Return type:

dict[str, Any]

Parameters:

raw_value – Raw payload returned by Kafka.

Returns:

Parsed JSON payload as a dictionary.

Raises:
setup_consumer()[source]#

Build the Kafka consumer.

Reads consumer configuration from environment variables and initializes the underlying confluent-kafka Consumer.

Return type:

None

Environment variables:

KAFKA_TOPIC: Kafka topic to consume from. KAFKA_GROUP: Kafka consumer group. KAFKA_CONSUMER_BATCH_SIZE: Maximum messages to consume per poll. KAFKA_CONSUMER_TIME_OUT_MS: Poll timeout in milliseconds. KAFKA_CONSUMER_COMMIT_INTERVAL_MS: Auto commit interval. KAFKA_AUTO_OFFSET_RESET: Offset reset policy.

Raises:

KeyError – If KAFKA_BROKER is not defined.

setup_nats_clusters()[source]#

Build NATS cluster connections and hash ring.

Discovers NATS clusters from environment variables, creates NatsCluster instances, and initializes the consistent hash ring used for routing assets.

Return type:

None

async worker()[source]#

Process queued Kafka messages and forward them to NATS.

The worker validates messages, publishes them to the appropriate NATS cluster, updates Prometheus metrics, and stores Kafka offsets after successful processing.

Return type:

None