NATS Cluster Wrapper#

This module provides the NatsCluster class, which wraps a connection to a single NATS cluster for use by the OpenFactory Asset Forwarder.

The class handles:

  • Connection management and reconnection logic.

  • Safe publishing of messages to NATS subjects.

  • Graceful shutdown and close of connections.

It is designed to ensure that asset messages are routed reliably to the correct NATS cluster.

Logging is delegated to the caller by supplying a logger instance during construction.

Usage#

from nats_cluster import NatsCluster
import asyncio

cluster = NatsCluster(name="C1", servers=["nats://localhost:4222"], logger=logger)
asyncio.run(cluster.connect())
asyncio.run(cluster.publish("asset.123", b"payload"))
asyncio.run(cluster.close())

Attributes#

  • name (str): Logical name of the NATS cluster (e.g., “C1”).

  • servers (list[str]): List of NATS server URLs.

  • logger (Logger): Logger used for connection and publish events.

  • reconnect_time_wait (int): Seconds to wait between reconnect attempts.

  • nc (Optional[NATS]): Active NATS client connection.

  • _lock (asyncio.Lock): Ensures only one concurrent connection attempt.

class openfactory.fanoutlayer.asset_forwarder.src.nats_cluster.NatsCluster(name, servers, logger, reconnect_time_wait=2)[source]#

Bases: object

Wrapper around a NATS cluster connection.

This class manages a connection to a single NATS cluster, handling reconnection logic and providing safe publish/close operations.

It is designed to be used by the Asset Forwarder to route asset messages consistently to the correct cluster.

name#

Logical name of the NATS cluster (e.g., “C1”).

Type:

str

servers#

List of NATS server URLs for this cluster.

Type:

List[str]

logger#

Logger used for connection and publish events.

Type:

logging.Logger

reconnect_time_wait#

Time in seconds to wait before reconnect attempts.

Type:

int

nc#

Active NATS client connection, or None if not connected.

_lock#

Ensures only one connection attempt runs at a time.

Type:

asyncio.Lock

__init__(name, servers, logger, reconnect_time_wait=2)[source]#

Initialize a NATS cluster wrapper.

Parameters:
  • name (str) – Cluster name identifier (used for logging and routing).

  • servers (list[str]) – List of server URLs for connecting to this cluster.

  • logger (logging.Logger) – Logger used for connection and publish events.

  • reconnect_time_wait (int, Optional) – Delay (in seconds) between reconnect attempts. Defaults to 2.

async close()[source]#

Close the connection to the NATS cluster.

Attempts a graceful drain before closing the client. Safe to call multiple times.

Return type:

None

async connect()[source]#

Establish a connection to the NATS cluster.

If already connected, this method does nothing. Ensures that only one connection attempt is active at a time.

Return type:

None

Raises:

Exception – If the connection attempt fails (NATS will retry internally).

async publish(subject, payload)[source]#

Publish a message to this NATS cluster.

Ensures the cluster is connected before publishing.

Return type:

None

Parameters:
  • subject (str) – The NATS subject (topic) to publish to.

  • payload (bytes) – The raw message payload.

Raises:

RuntimeError – If unable to connect to the cluster.