Source code for openfactory.fanoutlayer.asset_forwarder.src.nats_cluster
"""
This module provides the ``NatsCluster`` class, which wraps a connection
to a single NATS cluster for use by the OpenFactory Asset Forwarder.
The class handles:
- Connection management and reconnection logic.
- Safe publishing of messages to NATS subjects.
- Graceful shutdown and close of connections.
It is designed to ensure that asset messages are routed reliably to the
correct NATS cluster.
Logging is delegated to the caller by supplying a logger instance during
construction.
Usage
-----
.. code-block:: python
from nats_cluster import NatsCluster
import asyncio
cluster = NatsCluster(name="C1", servers=["nats://localhost:4222"], logger=logger)
asyncio.run(cluster.connect())
asyncio.run(cluster.publish("asset.123", b"payload"))
asyncio.run(cluster.close())
Attributes
----------
- ``name`` (str): Logical name of the NATS cluster (e.g., "C1").
- ``servers`` (list[str]): List of NATS server URLs.
- ``logger`` (Logger): Logger used for connection and publish events.
- ``reconnect_time_wait`` (int): Seconds to wait between reconnect attempts.
- ``nc`` (Optional[NATS]): Active NATS client connection.
- ``_lock`` (asyncio.Lock): Ensures only one concurrent connection attempt.
"""
import asyncio
from nats.aio.client import Client as NATS
from logging import Logger
[docs]
class NatsCluster:
"""
Wrapper around a NATS cluster connection.
This class manages a connection to a single NATS cluster, handling
reconnection logic and providing safe publish/close operations.
It is designed to be used by the Asset Forwarder to route asset messages
consistently to the correct cluster.
Attributes:
name (str): Logical name of the NATS cluster (e.g., "C1").
servers (List[str]): List of NATS server URLs for this cluster.
logger (logging.Logger): Logger used for connection and publish events.
reconnect_time_wait (int): Time in seconds to wait before reconnect attempts.
nc: Active NATS client connection, or None if not connected.
_lock (asyncio.Lock): Ensures only one connection attempt runs at a time.
"""
[docs]
def __init__(self, name: str, servers: list[str], logger: Logger, reconnect_time_wait: int = 2) -> None:
"""
Initialize a NATS cluster wrapper.
Args:
name (str): Cluster name identifier (used for logging and routing).
servers (list[str]): List of server URLs for connecting to this cluster.
logger (logging.Logger): Logger used for connection and publish events.
reconnect_time_wait (int, Optional): Delay (in seconds) between reconnect
attempts. Defaults to 2.
"""
self.name = name
self.servers = servers
self.logger: Logger = logger
self.nc: NATS | None = None
self._lock = asyncio.Lock()
self.reconnect_time_wait = reconnect_time_wait
[docs]
async def connect(self) -> None:
"""
Establish a connection to the NATS cluster.
If already connected, this method does nothing. Ensures that only
one connection attempt is active at a time.
Raises:
Exception: If the connection attempt fails (NATS will retry internally).
"""
async with self._lock:
if self.nc:
try:
if self.nc.is_connected:
return
# close dead client
await self.nc.close()
except Exception:
pass
async def disconnected_cb() -> None:
self.logger.warning(f"NATS[{self.name}] disconnected — will attempt reconnect...")
async def reconnected_cb() -> None:
self.logger.info(f"NATS[{self.name}] reconnected successfully.")
async def closed_cb() -> None:
self.logger.warning(f"NATS[{self.name}] connection permanently closed.")
async def error_handler(e: Exception) -> None:
self.logger.warning(f"NATS[{self.name}] client error: {e}")
nc = NATS()
self.logger.info(f"Connecting to NATS server {self.name} at {self.servers} ...")
await nc.connect(
servers=self.servers,
reconnect_time_wait=self.reconnect_time_wait,
max_reconnect_attempts=-1, # retry indefinitely
allow_reconnect=True,
ping_interval=10,
disconnected_cb=disconnected_cb,
reconnected_cb=reconnected_cb,
closed_cb=closed_cb,
error_cb=error_handler,
connect_timeout=5, # don't block forever on connect
verbose=False,
)
self.nc = nc
self.logger.info(f"NATS server {self.name} connected")
[docs]
async def publish(self, subject: str, payload: bytes) -> None:
"""
Publish a message to this NATS cluster.
Ensures the cluster is connected before publishing.
Args:
subject (str): The NATS subject (topic) to publish to.
payload (bytes): The raw message payload.
Raises:
RuntimeError: If unable to connect to the cluster.
"""
await self.connect()
if not self.nc:
raise RuntimeError(f"NATS[{self.name}] not connected")
await self.nc.publish(subject, payload)
[docs]
async def close(self) -> None:
"""
Close the connection to the NATS cluster.
Attempts a graceful drain before closing the client.
Safe to call multiple times.
"""
if self.nc:
try:
self.logger.info(f"Draining NATS server {self.name} ...")
await self.nc.drain()
except Exception:
pass
try:
await self.nc.close()
self.logger.info(f"Closed connection to NATS server {self.name}")
except Exception:
pass
self.nc = None