Source code for openfactory.assets.utils.get_nats_cluster
import requests
from openfactory.exceptions import OFAConfigurationException
[docs]
def get_nats_cluster_url(asset_uuid: str, asset_router_url: str) -> str:
"""
Fetch the NATS URL for a given asset UUID.
Args:
asset_uuid (str): The asset UUID.
asset_router_url (str): Asset router URL.
Returns:
str: The nats_url of the asset.
Raises:
requests.exceptions.RequestException: If the HTTP request fails.
ValueError: If the response is not valid JSON or nats_url is missing.
"""
url = f"{asset_router_url}/asset/{asset_uuid}"
try:
response = requests.get(url)
except Exception as e:
raise OFAConfigurationException(f'Could not connect to Asset Router. Is ASSET_ROUTER_URL={asset_router_url} defined correctly?\n{e}')
response.raise_for_status()
data = response.json()
nats_url = data.get("nats_url")
if not nats_url:
raise ValueError(f"'nats_url' not found in response for asset {asset_uuid}")
return nats_url