Source code for openfactory.connectors.opcua.opcua_connector
"""
**OPCUAConnector Class**
Provides orchestration for deploying OPC UA-based devices into the OpenFactory
environment. This includes:
- Registering the device with OpenFactory.
- Deploying an OPC UA producer to publish device data into the Kafka cluster.
- Tearing down all associated components when the device is removed.
.. seealso::
The schema of the OPCUAConnector is :class:`openfactory.schemas.connectors.opcua.OPCUAConnectorSchema`.
"""
import docker
import json
import openfactory.config as config
from openfactory.models.user_notifications import user_notify
from openfactory.connectors.base_connector import Connector
from openfactory.assets import Asset
from openfactory.openfactory_deploy_strategy import OpenFactoryServiceDeploymentStrategy
from openfactory.kafka.ksql import KSQLDBClient
from openfactory.utils import register_asset, deregister_asset
from openfactory.schemas.devices import Device
from openfactory.schemas.connectors.opcua import OPCUAConnectorSchema
from openfactory.exceptions import OFAException
from openfactory.connectors.registry import register_connector
[docs]
@register_connector(OPCUAConnectorSchema)
class OPCUAConnector(Connector):
"""
Connector for OPC UA devices that manages deployment of OPC UA producers.
Responsibilities include:
- Registering the device as an OpenFactory asset.
- Deploying an OPC UA producer to stream device data to Kafka.
- Managing references between the device and its producer in OpenFactory.
- Tearing down the producer when the device is removed.
.. seealso::
The schema of the OPCUAConnector is :class:`openfactory.schemas.connectors.opcua.OPCUAConnectorSchema`.
"""
[docs]
def __init__(self,
deployment_strategy: OpenFactoryServiceDeploymentStrategy,
ksqlClient: KSQLDBClient,
bootstrap_servers: str = config.KAFKA_BROKER):
"""
Initializes the OPCUAConnector.
Args:
deployment_strategy (OpenFactoryServiceDeploymentStrategy) : The deployment strategy to use.
ksqlClient (KSQLDBClient): The client for interacting with ksqlDB.
bootstrap_servers (str): The Kafka bootstrap server address. Defaults to config.KAFKA_BROKER.
"""
self.deployment_strategy = deployment_strategy
self.ksql = ksqlClient
self.bootstrap_servers = bootstrap_servers
[docs]
def deploy(self, device: Device, yaml_config_file: str) -> None:
"""
Deploy a device based on its configuration.
Args:
device (Device): Device to deploy.
yaml_config_file (str): Path to the YAML configuration file.
"""
if device.connector.type != 'opcua':
raise OFAException(f"Device {device.uuid} is not configured with an OPC UA connector")
# Register device asset
register_asset(device.uuid, uns=device.uns, asset_type="Device",
ksqlClient=self.ksql, docker_service="")
# Deploy OPC UA connector
self.deploy_opcua_producer(device)
[docs]
def deploy_opcua_producer(self, device: Device) -> None:
"""
Deploy an OPC UA producer.
Args:
device (Device): The device for which the producer is to be deployed.
Raises:
OFAException: If the producer cannot be deployed.
"""
service_name = device.uuid.lower() + '-producer'
producer_uuid = device.uuid.upper() + '-PRODUCER'
connector_dict = device.connector.model_dump(exclude_none=True)
try:
self.deployment_strategy.deploy(
image=config.OPCUA_PRODUCER_IMAGE,
name=service_name,
mode={"Replicated": {"Replicas": 1}},
env=[f'KAFKA_BROKER={config.KAFKA_BROKER}',
f"KSQLDB_URL={self.ksql.ksqldb_url}",
f'OPCUA_CONNECTOR={json.dumps(connector_dict)}',
f'OPCUA_PRODUCER_UUID={producer_uuid}',
f"DOCKER_SERVICE={service_name}",
f"DEVICE_UUID={device.uuid}",
"APPLICATION_MANUFACTURER=OpenFactory",
"APPLICATION_LICENSE=Polyform Noncommercial License 1.0.0"],
networks=[config.OPENFACTORY_NETWORK]
)
except docker.errors.APIError as err:
raise OFAException(f"Connector {service_name} could not be created\n{err}")
# register producer in OpenFactory
register_asset(producer_uuid, uns=None, asset_type="KafkaProducer",
ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers, docker_service=service_name)
dev = Asset(device.uuid, ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers)
dev.add_reference_below(producer_uuid)
producer = Asset(producer_uuid, ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers)
producer.add_reference_above(device.uuid)
[docs]
def tear_down(self, device_uuid: str) -> None:
"""
Tear down a deployed device given its UUID.
Args:
device_uuid (str): Unique identifier of the device to be torn down.
"""
try:
self.deployment_strategy.remove(device_uuid.lower() + '-producer')
deregister_asset(device_uuid + '-PRODUCER', ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers)
user_notify.success(f"OPC UA producer for device {device_uuid} shut down successfully")
except docker.errors.NotFound:
user_notify.info(f"OPC UA producer for device {device_uuid} was not running")
except docker.errors.APIError as err:
raise OFAException(err)