"""
OpenFactory Cluster Manager.
This module provides the `OpenFactoryCluster` class, which manages the lifecycle
of an OpenFactory Docker Swarm cluster, including manager and worker nodes.
Core responsibilities:
- Validate the current host as a Swarm manager and part of a Docker Swarm cluster
- Provision manager and worker nodes from YAML configuration files
- Add labels to nodes for identification and role assignment
- Remove worker nodes from the cluster safely
- Map node IPs to Docker Swarm node IDs for orchestration
- Notify users of operational outcomes, warnings, and failures
Key integrations:
- Docker API for managing Swarm nodes and containers
- SSH access to remote nodes for initial provisioning
- OpenFactory YAML infrastructure schemas for configuration
- User notifications to report success, warnings, and errors
Usage Example:
.. code-block:: python
from openfactory import OpenFactoryCluster
cluster = OpenFactoryCluster()
# Provision managers and workers from config file
cluster.create_infrastack_from_config_file("infra.yaml")
# Remove workers safely
cluster.remove_infrastack_from_config_file("infra.yaml")
Error handling:
- Raises `OFAException` if the local host is not part of a Swarm or is not a manager
- Catches Docker API errors and SSH connection issues during node creation
- Skips already provisioned nodes without interrupting the orchestration
Important:
User requires Docker access on all nodes of the OpenFactory cluster
and ssh access to all nodes using the config.OPENFACTORY_USER.
"""
import docker
import paramiko.ssh_exception
from socket import gaierror
from typing import Dict, Any
import openfactory.config as config
from openfactory.models.user_notifications import user_notify
from openfactory.docker.docker_access_layer import dal
from openfactory.schemas.infra import get_infrastructure_from_config_file
from openfactory.schemas.infra import Managers, Workers
from openfactory.exceptions import OFAException
[docs]
class OpenFactoryCluster():
"""
OpenFactory Cluster Manager.
Allows to manage the OpenFactory cluster.
Important:
User requires Docker access on all nodes of the OpenFactory cluster
and ssh access to all nodes using the config.OPENFACTORY_USER.
"""
[docs]
def __init__(self):
"""
Initialize the OpenFactoryCluster instance.
Performs a check to ensure the current Docker host is part of a Swarm
and is a Swarm Manager. This is required for cluster management operations.
Raises:
OFAException: If the Docker host is not part of a Swarm, is not a manager,
or if the Swarm status cannot be verified.
"""
try:
info = dal.docker_client.info()
if not info.get('Swarm', {}).get('LocalNodeState') == 'active':
raise OFAException(f"Host {info['Name']} is not part of a Swarm.")
if not info.get('Swarm', {}).get('ControlAvailable', False):
raise OFAException(f"Node {info['Name']} is not a Swarm Manager.")
except Exception as e:
raise OFAException(str(e))
[docs]
def add_label(self, node_name: str, node_details: Dict[str, Any]) -> None:
"""
Adds labels to a Docker node based on its IP address.
Looks for a node in the Docker Swarm cluster whose IP address matches
the given `node_details`. If found, it updates the node's labels by adding
a 'name' label and merging any additional labels specified in `node_details`.
Args:
node_name (str): The name to assign as a label to the Docker node.
node_details (Dict): Dictionary with required 'ip' and optional 'labels' keys.
Example:
.. code-block:: python
self.add_label("node-3", {"ip": "10.0.0.5", "labels": {"plant": "Plant-A", "role": "stream-apps"}})
"""
node = None
for n in dal.docker_client.nodes.list():
if str(node_details['ip']) in n.attrs['Status']['Addr']:
node = n
break
if node is None:
return
# add labels
labels = {'name': node_name}
if 'labels' in node_details:
labels.update(node_details['labels'])
node_spec = node.attrs['Spec']
node_spec['Labels'] = labels
node.update(node_spec)
[docs]
def create_managers(self, managers: Managers) -> None:
"""
Creates Docker Swarm manager nodes from a given configuration.
For each manager node in the input dictionary, connects via SSH, checks swarm membership,
joins as a manager if needed, applies labels, and reports success or failure.
Args:
managers (Dict): Dictionary with manager names as keys and values as dicts with required 'ip' and optional 'labels'.
Example:
.. code-block:: python
managers = {
"manager1": {"ip": "10.0.0.10", "labels": {"role": "primary"}},
"manager2": {"ip": "10.0.0.11"}
}
self.create_managers(managers)
"""
for manager, details in managers.items():
try:
ip = details['ip']
docker_url = f"ssh://{config.OPENFACTORY_USER}@{ip}"
client = docker.DockerClient(base_url=docker_url)
info = client.info()
if 'Swarm' in info and info['Swarm']['NodeID']:
continue
client.swarm.join([dal.leader_ip], join_token=dal.manager_token)
self.add_label(manager, details)
user_notify.success(f'Node "{manager} ({ip})" setup')
except (gaierror, paramiko.ssh_exception.NoValidConnectionsError, docker.errors.APIError) as err:
user_notify.fail(f'Node "{manager}" could not be setup - {err}')
[docs]
def create_workers(self, workers: Workers) -> None:
"""
Creates Docker Swarm worker nodes from a given configuration.
Connects via SSH to each worker node, checks swarm membership, joins if needed,
and applies labels.
Args:
workers (Dict): Dictionary with worker names as keys and values as dicts with required 'ip' and optional 'labels'.
Example:
.. code-block:: python
workers = {
"worker1": {"ip": "10.0.0.20", "labels": {"role": "compute"}},
"worker2": {"ip": "10.0.0.21"}
}
self.create_workers(workers)
"""
for worker, details in workers.items():
try:
ip = details['ip']
docker_url = f"ssh://{config.OPENFACTORY_USER}@{ip}"
client = docker.DockerClient(base_url=docker_url)
info = client.info()
if 'Swarm' in info and info['Swarm']['NodeID']:
continue
client.swarm.join([dal.leader_ip], join_token=dal.worker_token)
self.add_label(worker, details)
user_notify.success(f'Node "{worker} ({ip})" setup')
except (gaierror, paramiko.ssh_exception.NoValidConnectionsError, docker.errors.APIError, docker.errors.DockerException) as err:
user_notify.fail(f'Node "{worker}" could not be setup - {err}')
[docs]
def create_infrastack_from_config_file(self, stack_config_file: str) -> None:
"""
Spins up an infrastructure stack based on the provided configuration file.
Reads a YAML configuration file that defines the infrastructure,
including manager and worker nodes, and provisions them using Docker Swarm.
Args:
stack_config_file (str): Path to the YAML configuration file describing the infrastructure stack.
"""
infra = get_infrastructure_from_config_file(stack_config_file)
if 'nodes' in infra:
if infra['nodes']['managers']:
self.create_managers(infra['nodes']['managers'])
if infra['nodes']['workers']:
self.create_workers(infra['nodes']['workers'])
[docs]
def remove_workers(self, workers: Workers, node_ip_map: Dict[str, str]) -> None:
"""
Removes worker nodes from the Docker Swarm cluster.
Drains each worker node, connects remotely to remove it from the Swarm,
and removes the node from the cluster manager.
Args:
workers (Dict): Dictionary with worker names as keys and values containing node details with required 'ip'.
node_ip_map (Dict): Mapping from node IP addresses to Docker node IDs.
Example:
.. code-block:: python
workers = {
"worker1": {"ip": "10.0.0.20"},
"worker2": {"ip": "10.0.0.21"}
}
node_ip_map = {
"10.0.0.20": "node_id_123",
"10.0.0.21": "node_id_456"
}
self.remove_workers(workers, node_ip_map)
"""
for name, details in workers.items():
try:
ip = str(details['ip'])
if ip not in node_ip_map:
continue
# drain the node
user_notify.info(f'Draining node {name} ...')
node = dal.docker_client.nodes.get(node_ip_map.get(ip))
node_spec = node.attrs['Spec']
node_spec['Availability'] = 'drain'
node.update(node_spec)
# leave swarm on worker node
user_notify.info(f'Removing node {name} from cluster ...')
docker_url = f"ssh://{config.OPENFACTORY_USER}@{ip}"
node_client = docker.DockerClient(base_url=docker_url)
node_client.swarm.leave()
# remove node on OpenFactory Manger Node
dal.docker_client.api.remove_node(node_ip_map.get(ip), force=True)
user_notify.success(f'Removed node {name}')
except (docker.errors.APIError) as err:
user_notify.fail(f'Node "{name}" could not be removed. Error was:<br>"{err}"')
[docs]
def remove_infrastack_from_config_file(self, stack_config_file: str) -> None:
"""
Tears down an infrastructure stack based on a configuration file.
This function reads a YAML file describing the infrastructure stack, identifies
existing nodes in the Docker Swarm cluster by their IPs, and removes the worker
nodes accordingly.
Args:
stack_config_file (str): Path to the YAML configuration file
describing the infrastructure stack.
"""
# Load yaml description file
stack = get_infrastructure_from_config_file(stack_config_file)
# map nodes by IP
node_ip_map = {
node.attrs['Status']['Addr']: node.id for node in dal.docker_client.nodes.list()
}
self.remove_workers(stack['nodes']['workers'], node_ip_map)