"""
OpenFactory Manager API.
This module provides the `OpenFactoryManager` class, which manages the deployment, configuration,
and teardown of devices and applications within an OpenFactory environment.
Core responsibilities:
- Deploy MTConnect agents, and OpenFactory applications
- Manage Docker-based services via the configured deployment strategy
- Register and deregister assets in the OpenFactory environment
- Integrate deployed services with Kafka, ksqlDB, and other OpenFactory components
- Validate configuration files against the UNS schema
- Notify users of deployment results, warnings, and failures
Key integrations:
- Docker for container lifecycle management
- Kafka and ksqlDB for data streaming and querying
- UNS Schema for configuration validation
- User notifications for communicating operational outcomes
- Plugin system for selecting deployment strategies
Usage Example:
.. code-block:: python
from openfactory import OpenFactoryManager
from openfactory.kafka.ksql import KSQLDBClient
import openfactory.config as config
ofa_manager = OpenFactoryManager(ksqlClient=KSQLDBClient(config.KSQLDB_URL))
# deploy some devices
ofa_manager.deploy_devices_from_config_file('/path/to/device_config.yml')
Error handling:
- Raises `OFAException` for critical operational failures
- Catches and logs Docker API errors
- Skips existing or invalid deployments without stopping other deployments
Important:
User requires Docker access on the OpenFactory cluster.
"""
import os
import docker
import json
import openfactory.config as config
from openfactory import OpenFactory
from openfactory.schemas.devices import get_devices_from_config_file
from openfactory.schemas.apps import OpenFactoryAppSchema, get_apps_from_config_file, normalize_name
from openfactory.schemas.uns import UNSSchema
from openfactory.schemas.common import constraints, resources
from openfactory.assets import Asset
from openfactory.exceptions import OFAException
from openfactory.models.user_notifications import user_notify
from openfactory.utils import register_asset, deregister_asset, load_plugin, register_device_connector, deregister_device_connector
from openfactory.kafka.ksql import KSQLDBClient
from openfactory.openfactory_deploy_strategy import OpenFactoryServiceDeploymentStrategy, SwarmDeploymentStrategy
from openfactory.connectors.registry import build_connector
[docs]
class OpenFactoryManager(OpenFactory):
"""
OpenFactory Manager API.
Allows to deploy services on OpenFactory.
Important:
User requires Docker access on the OpenFactory cluster.
Attributes:
deployment_strategy (OpenFactoryServiceDeploymentStrategy): The strategy used to deploy services.
"""
[docs]
def __init__(self, ksqlClient: KSQLDBClient,
bootstrap_servers: str | None = None,
asset_url: str | None = None):
"""
Initializes the OpenFactoryManager.
Args:
ksqlClient (KSQLDBClient): The client for interacting with ksqlDB.
bootstrap_servers (str | None):
Kafka bootstrap server address.
If not provided, the value from
``openfactory.config.KAFKA_BROKER`` is used.
asset_url (str | None):
URL of the Asset Router.
If not provided, the value from
``openfactory.config.ASSET_ROUTER_URL`` is used.
Raises:
OFAException:
If no Asset Router URL is available (neither explicitly provided nor configured via ``.ofaenv``).
Note:
The deployment strategy to use (e.g., swarm or docker) is selected based on ``config.DEPLOYMENT_PLATFORM``
"""
super().__init__(ksqlClient, bootstrap_servers, asset_url)
platform_cls = load_plugin("openfactory.deployment_platforms", config.DEPLOYMENT_PLATFORM)
if not issubclass(platform_cls, OpenFactoryServiceDeploymentStrategy):
raise TypeError(
f"Plugin '{config.DEPLOYMENT_PLATFORM}' must inherit from OpenFactoryServiceDeploymentStrategy"
)
self.deployment_strategy: OpenFactoryServiceDeploymentStrategy = platform_cls()
self.deployment_strategy = platform_cls()
def _build_traefik_labels(self, application: OpenFactoryAppSchema) -> dict[str, str]:
"""
Build Traefik labels for exposing an OpenFactory application.
This method generates the set of Docker labels required by Traefik to route
HTTP traffic to the given application based on its routing configuration.
Routing is performed using host-based rules derived from the application's
canonical hostname and optional alias hostname.
If routing is not defined or not exposed, no labels are generated.
Args:
application (OpenFactoryAppSchema): The application configuration containing
routing information and metadata.
Returns:
dict[str, str]: A dictionary of Traefik labels to be applied to the container
or service. Returns an empty dictionary if routing is disabled.
Notes:
- Routing uses host-based rules only (no path-based routing).
- The canonical hostname is always included.
- If an alias hostname is defined, it is added as an alternative rule.
- The router and service names are derived from the normalized application UUID.
"""
routing = application.routing
if not routing or not routing.expose:
return {}
name = f"ofa-{normalize_name(application.uuid)}"
# build rule
rule = f"Host(`{routing.canonical_hostname}`)"
if routing.alias_hostname:
if routing.alias_hostname != routing.canonical_hostname:
rule += f" || Host(`{routing.alias_hostname}`)"
labels = {
"traefik.enable": "true",
f"traefik.http.routers.{name}.service": name,
f"traefik.http.routers.{name}.rule": rule,
f"traefik.http.routers.{name}.entrypoints": "web",
f"traefik.http.services.{name}.loadbalancer.server.port": str(routing.port),
}
# in development environment add localhost + path-based access
if os.environ.get("OPENFACTORY_ENV") == "dev":
path_prefix = f"/{normalize_name(application.uuid)}"
external_router = f"{name}-external"
middleware = f"{name}-strip"
redirect_middleware = f"{name}-slash"
labels.update({
# external router (localhost + path)
f"traefik.http.routers.{external_router}.rule":
f"Host(`localhost`) && PathPrefix(`{path_prefix}`)",
f"traefik.http.routers.{external_router}.entrypoints": "web",
f"traefik.http.routers.{external_router}.service": name,
# redirect first, then strip
f"traefik.http.routers.{external_router}.middlewares":
f"{redirect_middleware},{middleware}",
# strip prefix middleware
f"traefik.http.middlewares.{middleware}.stripprefix.prefixes":
path_prefix,
# trailing slash redirect middleware
f"traefik.http.middlewares.{redirect_middleware}.redirectregex.regex":
f"^({path_prefix})(\\?.*)?$",
f"traefik.http.middlewares.{redirect_middleware}.redirectregex.replacement":
"${1}/${2}",
})
return labels
[docs]
def deploy_openfactory_application(self, application: OpenFactoryAppSchema) -> None:
"""
Deploy an OpenFactory application.
Args:
application (OpenFactoryAppSchema): The application configuration.
Raises:
OFAException: If the application cannot be deployed.
"""
# runtime user override
runtime_user = None
if application.runtime_uid is not None:
runtime_user = (
f"{application.runtime_uid}:{application.runtime_gid}"
)
# build environment variables
env = [f"APP_UUID={application.uuid}",
f"KAFKA_BROKER={self.bootstrap_servers}",
f"KSQLDB_URL={self.ksql.ksqldb_url}",
f"ASSET_ROUTER_URL={config.ASSET_ROUTER_URL}",
f"DOCKER_SERVICE={application.uuid.lower()}"]
# if routing section is defined export PORT variable
routing = application.routing
if routing:
# check conflict
if application.environment:
for item in application.environment:
var, _ = item.split("=", 1)
if var.strip() == "PORT":
user_notify.fail(
f"Application {application.uuid}: "
"PORT must not be defined in 'environment' when routing is enabled. "
"It is managed automatically by OpenFactory.")
return
env.append(f"PORT={routing.port}")
# In dev environment, apps are exposed via a path prefix on localhost
# (e.g. http://localhost/<app-uuid> using Traefik PathPrefix routing).
# To work correctly behind such a prefix, applications must be aware of
# their external base path when generating URLs (e.g. links, redirects, API docs, static assets).
#
# OpenFactory provides this information via OPENFACTORY_ROOT_PATH.
# Framework-specific examples:
# - FastAPI: FastAPI(root_path=...)
# - Flask: APPLICATION_ROOT / SCRIPT_NAME
# - Django: FORCE_SCRIPT_NAME
# - WSGI apps: SCRIPT_NAME
if os.environ.get("OPENFACTORY_ENV") == "dev":
env.append(f"OPENFACTORY_ROOT_PATH=/{normalize_name(application.uuid)}")
# Add STORAGE only if not None
if application.storage is not None:
# Serialize named storage backends
storage_dict = {
name: storage.model_dump(exclude_none=True)
for name, storage in application.storage.items()
}
env.append(f"STORAGE={json.dumps(storage_dict)}")
if application.environment is not None:
for item in application.environment:
var, val = item.split('=')
env.append(f"{var.strip()}={val.strip()}")
# if KSQLDB_LOG_LEVEL is not set by user, set it to the default value
if not any(var.startswith("KSQLDB_LOG_LEVEL=") for var in env):
env.append(f"KSQLDB_LOG_LEVEL={config.KSQLDB_LOG_LEVEL}")
# add storage mounts
mounts = []
if application.storage:
for storage_name, storage_cfg in application.storage.items():
backend_instance = storage_cfg.create_backend_instance()
if isinstance(self.deployment_strategy, SwarmDeploymentStrategy):
if not backend_instance.compatible_with_swarm():
raise ValueError(
f"{type(backend_instance).__name__} "
"cannot be used with SwarmDeploymentStrategy"
)
mount_spec = backend_instance.get_mount_spec()
if mount_spec:
mounts.append(mount_spec)
try:
self.deployment_strategy.deploy(
image=application.image,
user=runtime_user,
name=application.uuid.lower(),
mode={"Replicated": {"Replicas": 1}},
env=env,
resources=resources(application.deploy),
constraints=constraints(application.deploy),
networks=application.networks,
mounts=mounts,
labels=self._build_traefik_labels(application),
)
except docker.errors.APIError as err:
user_notify.fail(f"Application {application.uuid} could not be deployed\n{err}")
return
register_asset(application.uuid, uns=application.uns, asset_type='OpenFactoryApp',
ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers, docker_service=application.uuid.lower())
user_notify.success(f"Application {application.uuid} deployed successfully")
[docs]
def deploy_devices_from_config_file(self, yaml_config_file: str) -> None:
"""
Deploy OpenFactory devices from a YAML configuration file.
This method loads and validates the UNS schema, parses the device configurations
from the specified YAML file, and deploys each device that is not already deployed.
Deployment includes registering the device asset, deploying the MTConnect agent,
Kafka producer, and KSQLDB tables (if defined).
Args:
yaml_config_file (str): Path to the YAML configuration file containing device definitions.
Note:
- If the UNS schema is invalid, a failure notification will be triggered, and deployment will abort early.
- If device configurations fail to load or validate, deployment will abort early after notifying the user.
- Deployment skips devices that are already deployed.
"""
# load UNS schema and yaml description file
try:
uns_schema = UNSSchema(schema_yaml_file=config.OPENFACTORY_UNS_SCHEMA)
except ValueError as e:
user_notify.fail(f"The UNS schema '{config.OPENFACTORY_UNS_SCHEMA}' is invalid: {e}")
return
except FileNotFoundError:
user_notify.fail(f"The UNS schema '{config.OPENFACTORY_UNS_SCHEMA}' could not be found.")
return
# load devices
devices = get_devices_from_config_file(yaml_config_file, uns_schema)
if devices is None:
return
deployed_devices = self.assets_uuid()
for dev_name, device in devices.items():
user_notify.info(f"{dev_name} - {device.uuid}:")
if device.uuid in deployed_devices:
user_notify.info(f"Device {device.uuid} exists already and was not deployed")
continue
schema = device.connector
try:
connector = build_connector(schema, self.deployment_strategy, self.ksql, self.bootstrap_servers)
except ValueError:
user_notify.warning(f"Device {device.uuid} has an unknown connector {schema.type}")
continue
try:
connector.deploy(device, yaml_config_file)
register_device_connector(device, self.ksql)
user_notify.success(f"Device {device.uuid} deployed successfully")
except OFAException as e:
user_notify.fail(f"Device {device.uuid} not deployed: {e}")
[docs]
def deploy_apps_from_config_file(self, yaml_config_file: str) -> None:
"""
Deploy OpenFactory applications from a YAML configuration file.
This method loads and validates the UNS schema, parses the application
configurations from the specified YAML file, and deploys each application
that is not already deployed.
Args:
yaml_config_file (str): Path to the YAML configuration file containing application definitions.
Note:
- If the UNS schema is invalid, a failure notification will be triggered, and deployment will abort early.
- If application configurations fail to load or validate, deployment will abort early after notifying the user.
- Deployment skips applications that are already deployed.
"""
# load UNS schema and yaml description file
try:
uns_schema = UNSSchema(schema_yaml_file=config.OPENFACTORY_UNS_SCHEMA)
except ValueError as e:
user_notify.fail(f"The UNS schema '{config.OPENFACTORY_UNS_SCHEMA}' is invalid: {e}")
return
except FileNotFoundError:
user_notify.fail(f"The UNS schema '{config.OPENFACTORY_UNS_SCHEMA}' could not be found.")
return
# load apps
apps = get_apps_from_config_file(yaml_config_file, uns_schema)
if apps is None:
return
for app_name, app in apps.items():
user_notify.info(f"{app_name}:")
if app.uuid in self.applications_uuid():
user_notify.info(f"Application {app.uuid} exists already and was not deployed")
continue
self.deploy_openfactory_application(app)
[docs]
def shut_down_devices_from_config_file(self, yaml_config_file: str) -> None:
"""
Shut down devices based on a config file.
Args:
yaml_config_file (str): Path to the yaml configuration file.
Raises:
OFAException: If the device cannot be shut down.
"""
# Load yaml description file
try:
uns_schema = UNSSchema(schema_yaml_file=config.OPENFACTORY_UNS_SCHEMA)
except ValueError as e:
user_notify.fail(f"The UNS schema '{config.OPENFACTORY_UNS_SCHEMA}' is invalid: {e}")
return
except FileNotFoundError:
user_notify.fail(f"The UNS schema '{config.OPENFACTORY_UNS_SCHEMA}' could not be found.")
return
devices = get_devices_from_config_file(yaml_config_file, uns_schema=uns_schema)
if devices is None:
return
uuid_list = self.devices_uuid()
for dev_name, device in devices.items():
user_notify.info(f"{dev_name}:")
if device.uuid not in uuid_list:
user_notify.info(f"No device {device.uuid} deployed in OpenFactory")
continue
# Tear down Connector
schema = device.connector
try:
connector = build_connector(schema, self.deployment_strategy, self.ksql, self.bootstrap_servers)
except ValueError:
user_notify.warning(f"Device {device.uuid} has an unknown connector {schema.type}")
continue
try:
connector.tear_down(device.uuid)
except OFAException as err:
user_notify.fail(f"Device {device.uuid} could not be torn down: {err}")
deregister_device_connector(device.uuid, bootstrap_servers=self.bootstrap_servers)
deregister_asset(device.uuid, ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers)
user_notify.success(f"Device {device.uuid} shut down successfully")
[docs]
def tear_down_application(self, app_uuid: str) -> None:
"""
Tear down a deployed OpenFactory application.
Args:
app_uuid (str): The UUID of the application to be torn down.
Raises:
OFAException: If the application cannot be torn down.
"""
try:
app = Asset(app_uuid, ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers)
self.deployment_strategy.remove(app.DockerService.value)
app.close()
except docker.errors.NotFound:
# the application was not running as a Docker swarm service
deregister_asset(app_uuid, ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers)
pass
except docker.errors.APIError as err:
raise OFAException(err)
deregister_asset(app_uuid, ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers)
user_notify.success(f"OpenFactory application {app_uuid} shut down successfully")
[docs]
def shut_down_apps_from_config_file(self, yaml_config_file: str) -> None:
"""
Shut down OpenFactory applications based on a config file.
Args:
yaml_config_file (str): Path to the yaml configuration file.
Raises:
OFAException: If the application cannot be shut down.
"""
# Load yaml description file
try:
uns_schema = UNSSchema(schema_yaml_file=config.OPENFACTORY_UNS_SCHEMA)
except ValueError as e:
user_notify.fail(f"The UNS schema '{config.OPENFACTORY_UNS_SCHEMA}' is invalid: {e}")
return
except FileNotFoundError:
user_notify.fail(f"The UNS schema '{config.OPENFACTORY_UNS_SCHEMA}' could not be found.")
return
apps = get_apps_from_config_file(yaml_config_file, uns_schema)
if apps is None:
return
for app_name, app in apps.items():
user_notify.info(f"{app_name}:")
if app.uuid not in self.applications_uuid():
user_notify.info(f"No application {app.uuid} deployed in OpenFactory")
continue
self.tear_down_application(app.uuid)
[docs]
def get_asset_uuid_from_docker_service(self, docker_service_name: str) -> str:
"""
Return ASSET_UUID of the asset running on the Docker service docker_service_name.
Args:
docker_service_name (str): The name of the Docker service.
Returns:
str: The ASSET_UUID of the asset running on the Docker service.
"""
query = f"select ASSET_UUID from DOCKER_SERVICES where DOCKER_SERVICE='{docker_service_name}';"
df = self.ksql.query(query)
if df.empty:
return ""
return df['ASSET_UUID'][0]