Source code for openfactory.connectors.shdr.shdr_connector

"""
SHDRConnector Class

Provides orchestration for deploying SHDR-based devices into the OpenFactory
environment. This includes:

- Registering the device with OpenFactory.
- Registering an SHDR device with the SHDR coordinator to publish device data into the Kafka cluster.
- Tearing down all associated components when the device is removed.

.. seealso::

   The schema of the SHDRConnector is :class:`openfactory.schemas.connectors.shdr.SHDRConnectorSchema`.
"""

import openfactory.config as config
from openfactory.models.user_notifications import user_notify
from openfactory.connectors.base_connector import Connector
from openfactory.assets import Asset
from openfactory.openfactory_deploy_strategy import OpenFactoryServiceDeploymentStrategy
from openfactory.kafka.ksql import KSQLDBClient
from openfactory.utils import register_asset, deregister_asset
from openfactory.schemas.devices import Device
from openfactory.schemas.connectors.shdr import SHDRConnectorSchema
from openfactory.exceptions import OFAException
from openfactory.connectors.registry import register_connector


[docs] @register_connector(SHDRConnectorSchema) class SHDRConnector(Connector): """ Connector for SHDR devices that manages deployment and registration of SHDR devices. Responsibilities include: - Registering the device as an OpenFactory asset. - Registering the SHDR device with the SHDR coordinator to stream device data to Kafka. .. seealso:: The schema of the SHDRConnector is :class:`openfactory.schemas.connectors.shdr.SHDRConnectorSchema`. """ CONNECTOR_NAME = "SHDR"
[docs] def __init__(self, deployment_strategy: OpenFactoryServiceDeploymentStrategy, ksqlClient: KSQLDBClient, bootstrap_servers: str = config.KAFKA_BROKER): """ Initializes the SHDRConnector. Args: deployment_strategy (OpenFactoryServiceDeploymentStrategy) : The deployment strategy to use. ksqlClient (KSQLDBClient): The client for interacting with ksqlDB. bootstrap_servers (str): The Kafka bootstrap server address. Defaults to config.KAFKA_BROKER. """ self.deployment_strategy = deployment_strategy self.ksql = ksqlClient self.bootstrap_servers = bootstrap_servers
def _get_coordinator(self) -> Asset: """ Discover and returns the SHDR coordinator asset. Returns: Asset: The SHDR coordinator asset. Raises: OFAException: If the SHDR coordinator is not configured or unavailable. """ query = f"select ASSET_UUID FROM ASSETS_TYPE WHERE TYPE='{self.CONNECTOR_NAME}.Coordinator';" res = self.ksql.query(query) if res: coordinator = Asset(asset_uuid=res[0]["ASSET_UUID"], ksqlClient=self.ksql) else: raise OFAException("SHDR Coordinator is not deployed") if coordinator.avail.value != "AVAILABLE": raise OFAException("SHDR Coordinator is not AVAILABLE") return coordinator
[docs] def deploy(self, device: Device, yaml_config_file: str) -> None: """ Deploy a device based on its configuration. Args: device (Device): Device to deploy. yaml_config_file (str): Path to the YAML configuration file. Raises: OFAException: If the device cannot be deployed. """ if device.connector.type != "shdr": raise OFAException(f"Device {device.uuid} is not configured with an SHDR connector") coordinator = self._get_coordinator() try: coordinator.register_device(sender_uuid='ofa-cli', device_config=str(device.model_dump_json())) except TypeError: raise OFAException(f"Asset '{coordinator.asset_uuid}' does not appear to be a valid SHDR coordinator.") user_notify.success(f"SHDR device {device.uuid} registered successfully") # Register device asset register_asset( asset_uuid=device.uuid, uns=device.uns, asset_type="Device", ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers )
[docs] def tear_down(self, device_uuid: str) -> None: """ Tear down a deployed device given its UUID. Args: device_uuid (str): Unique identifier of the device to be torn down. Raises: OFAException: If the device cannot be torn down. """ coordinator = self._get_coordinator() try: coordinator.deregister_device(sender_uuid='shdr-connector', device_uuid=device_uuid) except TypeError: raise OFAException(f"Asset {coordinator.asset_uuid} does not appear to be a valid SHDR coordinator") user_notify.success(f"SHDR device {device_uuid} deregistered successfully") # De-register device asset deregister_asset( asset_uuid=device_uuid, ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers )