NATS Cluster Wrapper#
This module provides the NatsCluster class, which wraps a connection
to a single NATS cluster for use by the OpenFactory Asset Forwarder.
The class handles:
Connection management and reconnection logic.
Safe publishing of messages to NATS subjects.
Graceful shutdown and close of connections.
It is designed to ensure that asset messages are routed reliably to the correct NATS cluster.
Logging is delegated to the caller by supplying a logger instance during construction.
Usage#
from nats_cluster import NatsCluster
import asyncio
cluster = NatsCluster(name="C1", servers=["nats://localhost:4222"], logger=logger)
asyncio.run(cluster.connect())
asyncio.run(cluster.publish("asset.123", b"payload"))
asyncio.run(cluster.close())
Attributes#
name(str): Logical name of the NATS cluster (e.g., “C1”).servers(list[str]): List of NATS server URLs.logger(Logger): Logger used for connection and publish events.reconnect_time_wait(int): Seconds to wait between reconnect attempts.nc(Optional[NATS]): Active NATS client connection._lock(asyncio.Lock): Ensures only one concurrent connection attempt.
- class openfactory.fanoutlayer.asset_forwarder.src.nats_cluster.NatsCluster(name, servers, logger, reconnect_time_wait=2)[source]#
Bases:
objectWrapper around a NATS cluster connection.
This class manages a connection to a single NATS cluster, handling reconnection logic and providing safe publish/close operations.
It is designed to be used by the Asset Forwarder to route asset messages consistently to the correct cluster.
- logger#
Logger used for connection and publish events.
- Type:
- nc#
Active NATS client connection, or None if not connected.
- _lock#
Ensures only one connection attempt runs at a time.
- Type:
- __init__(name, servers, logger, reconnect_time_wait=2)[source]#
Initialize a NATS cluster wrapper.
- Parameters:
name (str) – Cluster name identifier (used for logging and routing).
servers (list[str]) – List of server URLs for connecting to this cluster.
logger (logging.Logger) – Logger used for connection and publish events.
reconnect_time_wait (int, Optional) – Delay (in seconds) between reconnect attempts. Defaults to 2.
- async close()[source]#
Close the connection to the NATS cluster.
Attempts a graceful drain before closing the client. Safe to call multiple times.
- Return type:
- async connect()[source]#
Establish a connection to the NATS cluster.
If already connected, this method does nothing. Ensures that only one connection attempt is active at a time.
- async publish(subject, payload)[source]#
Publish a message to this NATS cluster.
Ensures the cluster is connected before publishing.
- Return type:
- Parameters:
- Raises:
RuntimeError – If unable to connect to the cluster.