Source code for openfactory.assets.utils.get_nats_cluster

import requests
import openfactory.config as config
from openfactory.exceptions import OFAConfigurationException


[docs] def get_nats_cluster_url(asset_uuid: str) -> str: """ Fetch the NATS URL for a given asset UUID. Args: asset_uuid (str): The asset identifier (e.g., "toto"). Returns: str: The nats_url of the asset. Raises: requests.exceptions.RequestException: If the HTTP request fails. ValueError: If the response is not valid JSON or nats_url is missing. """ url = f"{config.ASSET_ROUTER_URL}/asset/{asset_uuid}" try: response = requests.get(url) except Exception as e: raise OFAConfigurationException(f'Could not connect to Asset Router. Is ASSET_ROUTER_URL={config.ASSET_ROUTER_URL} defined correctly?\n{e}') response.raise_for_status() data = response.json() nats_url = data.get("nats_url") if not nats_url: raise ValueError(f"'nats_url' not found in response for asset {asset_uuid}") return nats_url