Source code for openfactory.connectors.mtconnect.mtc_connector

"""
**MTConnectConnector Class**

Provides orchestration for deploying MTConnect-based devices into the OpenFactory
environment. This includes:

- Registering the device with OpenFactory.
- Deploying the MTConnect agent and its adapter (if required) via `MTConnectAgentDeployer`.
- Deploying a Kafka producer to publish MTConnect data into the Kafka cluster.
- Tearing down all associated components when the device is removed.

Note:
    This connector delegates agent and adapter deployment to
    `MTConnectAgentDeployer`, ensuring separation between orchestration logic
    and MTConnect-specific deployment details.

.. seealso::

   The schema of the MTConnectConnector is :class:`openfactory.schemas.connectors.mtconnect.MTConnectConnectorSchema`.
"""

import docker
import openfactory.config as config
from openfactory.connectors.base_connector import Connector
from openfactory.assets import Asset
from openfactory.exceptions import OFAException
from openfactory.models.user_notifications import user_notify
from openfactory.utils import register_asset, deregister_asset
from openfactory.kafka.ksql import KSQLDBClient
from openfactory.schemas.devices import Device
from openfactory.schemas.common import constraints, cpus_limit, cpus_reservation
from openfactory.schemas.connectors.mtconnect import MTConnectConnectorSchema
from openfactory.openfactory_deploy_strategy import OpenFactoryServiceDeploymentStrategy
from openfactory.connectors.registry import register_connector
from openfactory.connectors.mtconnect.mtcagent_deployer import MTConnectAgentDeployer


[docs] @register_connector(MTConnectConnectorSchema) class MTConnectConnector(Connector): """ Connector for MTConnect devices that manages deployment of MTConnect agents, adapters, and Kafka producers, leveraging a pluggable deployment strategy. .. seealso:: The schema of the MTConnectConnector is :class:`openfactory.schemas.connectors.mtconnect.MTConnectConnectorSchema`. """
[docs] def __init__(self, deployment_strategy: OpenFactoryServiceDeploymentStrategy, ksqlClient: KSQLDBClient, bootstrap_servers: str = config.KAFKA_BROKER): """ Initializes the MTConnectConnector. Args: deployment_strategy (OpenFactoryServiceDeploymentStrategy) : The deployment strategy to use. ksqlClient (KSQLDBClient): The client for interacting with ksqlDB. bootstrap_servers (str): The Kafka bootstrap server address. Defaults to config.KAFKA_BROKER. """ self.deployment_strategy = deployment_strategy self.ksql = ksqlClient self.bootstrap_servers = bootstrap_servers
[docs] def deploy(self, device: Device, yaml_config_file: str) -> None: """ Deploy a device based on its configuration. Args: device (Device): Device to deploy. yaml_config_file (str): Path to the YAML configuration file. """ if device.connector.type != 'mtconnect': raise OFAException(f"Device {device.uuid} is not configured with an MTConnect connector") # Register device asset register_asset(device.uuid, uns=device.uns, asset_type="Device", ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers, docker_service="") # Deploy MTConnect agent (which itself deploys adapter if needed) agent_deployer = MTConnectAgentDeployer(device, yaml_config_file, self.deployment_strategy, self.ksql, self.bootstrap_servers) agent_deployer.deploy() # Deploy Kafka producer self.deploy_kafka_producer(device)
def _get_mtc_agent_url(self, device): """ Returns the URL to use for the MTConnect Agent """ agent = device.connector.agent if agent.ip: if agent.port == 443: return f"https://{agent.ip}:443" else: return f"http://{agent.ip}:{agent.port}" return f"http://{device.uuid.lower()}-agent:5000"
[docs] def deploy_kafka_producer(self, device: Device) -> None: """ Deploy a Kafka producer. Args: device (Device): The device for which the producer is to be deployed. Raises: OFAException: If the producer cannot be deployed. """ service_name = device.uuid.lower() + '-producer' producer_uuid = device.uuid.upper() + '-PRODUCER' try: self.deployment_strategy.deploy( image=config.MTCONNECT_PRODUCER_IMAGE, name=service_name, mode={"Replicated": {"Replicas": 1}}, env=[f'KAFKA_BROKER={config.KAFKA_BROKER}', f'KAFKA_PRODUCER_UUID={producer_uuid}', f'MTC_AGENT={self._get_mtc_agent_url(device)}'], constraints=constraints(device.connector.agent.deploy), resources={ "Limits": {"NanoCPUs": int(1000000000*cpus_limit(device.connector.agent.deploy, 1.0))}, "Reservations": {"NanoCPUs": int(1000000000*cpus_reservation(device.connector.agent.deploy, 0.5))} }, networks=[config.OPENFACTORY_NETWORK] ) except docker.errors.APIError as err: raise OFAException(f"Producer {service_name} could not be created\n{err}") # register producer in OpenFactory register_asset(producer_uuid, uns=None, asset_type="KafkaProducer", ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers, docker_service=service_name) dev = Asset(device.uuid, ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers) dev.add_reference_below(producer_uuid) producer = Asset(producer_uuid, ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers) producer.add_reference_above(device.uuid) user_notify.success(f"Kafka producer {producer_uuid} deployed successfully")
[docs] def tear_down(self, device_uuid: str) -> None: """ Tear down a deployed device given its UUID. Args: device_uuid (str): Unique identifier of the device to be torn down. """ # Tear down adapter try: self.deployment_strategy.remove(device_uuid.lower() + '-adapter') user_notify.success(f"Adapter for device {device_uuid} shut down successfully") except docker.errors.NotFound: # no adapter running as a Docker swarm service pass except docker.errors.APIError as err: raise OFAException(err) # Tear down Producer try: self.deployment_strategy.remove(device_uuid.lower() + '-producer') deregister_asset(device_uuid + '-PRODUCER', ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers) user_notify.success(f"Kafka producer for device {device_uuid} shut down successfully") except docker.errors.NotFound: user_notify.info(f"Kafka producer for device {device_uuid} was not running") except docker.errors.APIError as err: raise OFAException(err) # Tear down MTConnect agent try: self.deployment_strategy.remove(device_uuid.lower() + '-agent') deregister_asset(device_uuid.upper() + '-AGENT', ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers) user_notify.success(f"MTConnect Agent for device {device_uuid} shut down successfully") except docker.errors.NotFound: pass except docker.errors.APIError as err: raise OFAException(err)