Source code for openfactory.assets.utils.nats_subscriber
""" NATS releated methods. """
import json
import nats
from typing import Protocol
from .async_loop import AsyncLoopThread
[docs]
class AssetNATSCallback(Protocol):
"""
Interface for callback used to handle NATS asset messages.
Args:
msg_subject (str): The NATS subject of the message (e.g., 'OPCUA-SENSOR-001.*').
msg_value (dict): The JSON-decoded payload of the message.
"""
def __call__(self, msg_subject: str, msg_value: dict) -> None:
""" Method to handle incoming NATS asset messages. """
...
[docs]
class NATSSubscriber:
"""
Represents a NATS subscriber that runs in a dedicated asyncio event loop thread.
This class connects to a NATS server, subscribes to a subject, and calls
the provided callback for each received message.
"""
[docs]
def __init__(self, loop_thread: AsyncLoopThread,
servers: str | list[str], subject: str,
on_message: AssetNATSCallback) -> None:
"""
Initializes the NATS subscriber.
Args:
loop_thread (AsyncLoopThread): Event loop thread used to run async operations.
servers (str | list[str]): NATS server URL(s).
subject (str): NATS subject to subscribe to.
on_message (AssetNATSCallback): Callback to invoke when a message is received.
"""
self.loop_thread = loop_thread
self.subject = subject
self.on_message = on_message
self.servers = servers
self.nc = None
self.sub = None
self._closing = False
async def _connect_and_subscribe(self) -> None:
""" Connects to NATS and subscribes to the subject with an internal handler. """
async def error_handler(e):
print("NATS client error:", e)
async def disconnected_cb():
if not self._closing:
print("NATS disconnected — will attempt reconnect...")
async def reconnected_cb():
print("NATS reconnected successfully.")
self.nc = await nats.connect(
servers=self.servers,
max_reconnect_attempts=-1,
connect_timeout=2,
reconnect_time_wait=1,
error_cb=error_handler,
disconnected_cb=disconnected_cb,
reconnected_cb=reconnected_cb,
)
async def _handler(msg):
data = json.loads(msg.data.decode())
self.on_message(msg.subject, data)
self.sub = await self.nc.subscribe(self.subject, cb=_handler)
[docs]
def start(self) -> None:
""" Starts the NATS subscriber in the background event loop. """
future = self.loop_thread.run_coro(self._connect_and_subscribe())
try:
future.result() # <-- this will raise the exception if connection fails
except nats.errors.NoServersError as e:
print(f"❌ Failed to connect to NATS servers {self.servers}: {e}")
except Exception as e:
print(f"❌ Unexpected error connecting to NATS: {e}")
[docs]
def stop(self) -> None:
""" Stops the NATS subscription and closes the connection. """
self._closing = True
async def _stop_all():
if self.sub:
try:
await self.sub.unsubscribe()
except Exception as e:
print(f"Warning: unsub error: {e}")
if self.nc:
try:
await self.nc.close()
except Exception as e:
print(f"Warning: nc close error: {e}")
# Schedule the coroutine on the loop thread and wait
try:
self.loop_thread.run_coro(_stop_all()).result()
except Exception as e:
print(f"Warning: NATS stop failed: {e}")