Source code for openfactory.connectors.opcua.opcua_connector

"""
**OPCUAConnector Class**

Provides orchestration for deploying OPC UA-based devices into the OpenFactory
environment. This includes:

- Registering the device with OpenFactory.
- Deploying an OPC UA producer to publish device data into the Kafka cluster.
- Tearing down all associated components when the device is removed.

.. seealso::

   The schema of the OPCUAConnector is :class:`openfactory.schemas.connectors.opcua.OPCUAConnectorSchema`.
"""

import openfactory.config as config
from openfactory.models.user_notifications import user_notify
from openfactory.connectors.base_connector import Connector
from openfactory.assets import Asset
from openfactory.openfactory_deploy_strategy import OpenFactoryServiceDeploymentStrategy
from openfactory.kafka.ksql import KSQLDBClient
from openfactory.utils import register_asset, deregister_asset
from openfactory.schemas.devices import Device
from openfactory.schemas.connectors.opcua import OPCUAConnectorSchema
from openfactory.exceptions import OFAException
from openfactory.connectors.registry import register_connector


[docs] @register_connector(OPCUAConnectorSchema) class OPCUAConnector(Connector): """ Connector for OPC UA devices that manages deployment of OPC UA producers. Responsibilities include: - Registering the device as an OpenFactory asset. - Deploying an OPC UA producer to stream device data to Kafka. - Managing references between the device and its producer in OpenFactory. - Tearing down the producer when the device is removed. .. seealso:: The schema of the OPCUAConnector is :class:`openfactory.schemas.connectors.opcua.OPCUAConnectorSchema`. """ CONNECTOR_NAME = "OPCUA"
[docs] def __init__(self, deployment_strategy: OpenFactoryServiceDeploymentStrategy, ksqlClient: KSQLDBClient, bootstrap_servers: str = config.KAFKA_BROKER): """ Initializes the OPCUAConnector. Args: deployment_strategy (OpenFactoryServiceDeploymentStrategy) : The deployment strategy to use. ksqlClient (KSQLDBClient): The client for interacting with ksqlDB. bootstrap_servers (str): The Kafka bootstrap server address. Defaults to config.KAFKA_BROKER. """ self.deployment_strategy = deployment_strategy self.ksql = ksqlClient self.bootstrap_servers = bootstrap_servers
def _get_coordinator(self) -> Asset: """ Discover and returns the OPC UA coordinator asset. Returns: Asset: The OPC UA coordinator asset. Raises: OFAException: If the OPC UA coordinator is not configured or unavailable. """ query = f"select ASSET_UUID FROM ASSETS_TYPE WHERE TYPE='{self.CONNECTOR_NAME}.Coordinator';" res = self.ksql.query(query) if res: coordinator = Asset(asset_uuid=res[0]["ASSET_UUID"], ksqlClient=self.ksql) else: raise OFAException("OPC UA Coordinator is not deployed") if coordinator.avail.value != "AVAILABLE": raise OFAException("OPC UA Coordinator is not AVAILABLE") return coordinator
[docs] def deploy(self, device: Device, yaml_config_file: str) -> None: """ Deploy a device based on its configuration. Args: device (Device): Device to deploy. yaml_config_file (str): Path to the YAML configuration file. """ if device.connector.type != 'opcua': raise OFAException(f"Device {device.uuid} is not configured with an OPC UA connector") coordinator = self._get_coordinator() try: coordinator.register_device(sender_uuid='opcua-connector', device_config=str(device.model_dump_json())) except TypeError: raise OFAException(f"Asset '{coordinator.asset_uuid}' does not appear to be a valid OPC UA coordinator.") user_notify.success(f"OPC UA device {device.uuid} registered successfully") # Register device asset register_asset( asset_uuid=device.uuid, uns=device.uns, asset_type="Device", ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers )
[docs] def tear_down(self, device_uuid: str) -> None: """ Tear down a deployed device given its UUID. Args: device_uuid (str): Unique identifier of the device to be torn down. """ coordinator = self._get_coordinator() try: coordinator.deregister_device(sender_uuid='opcua-connector', device_uuid=device_uuid) except TypeError: raise OFAException(f"Asset {coordinator.asset_uuid} does not appear to be a valid OPC UA coordinator") user_notify.success(f"OPC UA device {device_uuid} deregistered successfully") # De-register device asset deregister_asset( asset_uuid=device_uuid, ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers ) return