"""
OPC UA Connector Schemas
This module provides Pydantic models to define and validate configuration
schemas for OPC UA devices within OpenFactory.
Key Models:
-----------
- OPCUAServerConfig:
Configuration for the OPC UA server, including the endpoint URI and
optional default subscription parameters.
- OPCUASubscriptionConfig:
Optional subscription parameters (publishing interval, queue size,
sampling interval). Can be defined at the server level or overridden
per variable. If not provided at the server level, default values are
applied (publishing_interval=100 ms, queue_size=1, sampling_interval=0 ms).
- OPCUAVariableConfig:
Configuration for a single variable. Contains `node_id` or `browse_path`
to identify the OPC UA node and a `tag` used by OpenFactory to label data.
Variables may also declare an `access_level` (`ro` or `rw`) describing
how OpenFactory is allowed to interact with the node.
Optional overrides for queue size and sampling interval can be provided.
- OPCUAEventConfig:
Configuration for an event source. Contains `node_id` or `browse_path`
identifying the OPC UA node that emits events. Events do not use tags
or subscription overrides.
- OPCUAMethodConfig:
Configuration for a method source. Each entry references an OPC UA node
(`node_id` or `browse_path`) from which all available OPC UA methods
will be discovered at runtime. Methods are not individually declared in
the schema; instead, the node acts as a method container.
- OPCUAConnectorSchema:
Wrapper schema that encapsulates the server configuration along with
variables, events, and method sources. During initialization, all
variables, events, and methods are normalized into their respective
config models. Server-level subscription defaults are applied to
variables where necessary.
Validation Features:
--------------------
- Validates `node_id` format and parses it into namespace_index, identifier_type, and identifier fields.
- Enforces that exactly one of `node_id` or `browse_path` is provided for variables, events, and methods.
- Normalizes variables into OPCUAVariableConfig, applying server-level subscription defaults when not overridden. If the server subscription is omitted, default values are applied.
- Ensures uniqueness of `node_id` and `browse_path` within variables, within events, and within methods.
- Prevents local name conflicts between variables, events, and methods.
- Forbids unknown fields to ensure strict schema conformance.
YAML Example:
-------------
.. code-block:: yaml
# ---------------------------------------------------------
# Example 1: Server subscription omitted → defaults applied
# ---------------------------------------------------------
type: opcua
server:
uri: opc.tcp://127.0.0.1:4840/freeopcua/server/
# subscription omitted → defaults will be used:
# publishing_interval: 100
# queue_size: 1
# sampling_interval: 0
variables:
temp:
node_id: ns=3;i=1050
tag: Temperature
deadband: 0.1
hum:
browse_path: 0:Root/0:Objects/2:Sensors/2:Humidity
tag: Humidity
events:
iolinkmaster:
node_id: ns=6;i=43
methods:
tempSensor:
browse_path: 0:Root/0:Objects/2:Sensors/2:TemperatureSensor
humiSensor:
node_id: ns=5;i=12
# ---------------------------------------------------------
# Example 2: Server subscription explicitly provided
# ---------------------------------------------------------
type: opcua
server:
uri: opc.tcp://127.0.0.1:4840/freeopcua/server/
subscription:
publishing_interval: 200
queue_size: 10
sampling_interval: 25
variables:
temp:
node_id: ns=3;i=1050
tag: Temperature
queue_size: 5 # overrides server subscription
sampling_interval: 50 # overrides server subscription
setpoint:
node_id: ns=2;i=10
tag: Setpoint
access_level: rw # read-write variable
.. seealso::
The runtime class of the OPCUAConnectorSchema schema is :class:`openfactory.connectors.opcua.opcua_connector.OPCUAConnector`.
"""
import re
from typing import Optional, Dict, Any, Literal
from pydantic import BaseModel, ConfigDict, model_validator, Field
[docs]
class OPCUASubscriptionConfig(BaseModel):
""" Optional subscription parameters for server or individual variables. """
publishing_interval: Optional[float] = Field(
default=100.0,
description="Publishing interval in ms for subscription object (server-level default)."
)
queue_size: Optional[int] = Field(
default=1,
description="Queue size per monitored item (server or variable-level default)."
)
sampling_interval: Optional[float] = Field(
default=0.0,
description="Sampling interval in ms for monitored item; 0 = event-driven if supported."
)
model_config = ConfigDict(extra="forbid")
[docs]
class OPCUANodeConfig(BaseModel):
""" Base class for configs that can have node_id or path to identify an OPC UA node. """
node_id: Optional[str] = Field(
default=None,
description="NodeId of the object, e.g., 'ns=3;i=1050'.",
pattern=r'^ns=\d+;(i|s)=.+$'
)
browse_path: Optional[str] = Field(
default=None,
description="Optional hierarchical BrowsePath instead of node_id. Format: 0:Root/nsIndex:Identifier/nsIndex:Identifier/…"
)
# Parsed fields (not in input YAML)
namespace_index: Optional[int] = Field(default=None, exclude=True)
identifier_type: Optional[str] = Field(default=None, exclude=True)
identifier: Optional[str] = Field(default=None, exclude=True)
model_config = ConfigDict(extra="forbid", frozen=True)
[docs]
def model_dump(self, *args, **kwargs):
kwargs.setdefault("exclude_none", True)
return super().model_dump(*args, **kwargs)
[docs]
@model_validator(mode="before")
def validate_node_id_or_browse_path(cls, values: dict) -> dict:
node_id, browse_path = values.get("node_id"), values.get("browse_path")
if not (node_id or browse_path):
raise ValueError("Either 'node_id' or 'browse_path' must be provided.")
if node_id and browse_path:
raise ValueError("Provide only one of 'node_id' or 'browse_path', not both.")
return values
[docs]
@model_validator(mode="before")
def validate_and_parse_node_id(cls, values: dict) -> dict:
node_id = values.get("node_id")
if not node_id:
return values # skip if using path
pattern = r"^ns=\d+;(i|s)=.+$"
if not re.match(pattern, node_id):
raise ValueError(f"Invalid node_id format: {node_id}")
ns_part, id_part = node_id.split(";")
ns_index = int(ns_part.replace("ns=", ""))
id_type, identifier = id_part.split("=", 1)
values["namespace_index"] = ns_index
values["identifier_type"] = id_type
values["identifier"] = identifier
return values
AccessLevel = Literal["ro", "rw"]
[docs]
class OPCUAVariableConfig(OPCUANodeConfig):
""" Configuration for a single OPC UA variable. """
tag: str = Field(..., description="Tag used by OpenFactory to label the variable's data.")
access_level: AccessLevel = Field(
default="ro",
description="Access level declared by the connector: 'ro' = read-only, 'rw' = read-write."
)
queue_size: Optional[int] = Field(
default=None,
description="Override server-level queue size for this variable."
)
sampling_interval: Optional[float] = Field(
default=None,
description="Override server-level sampling interval for this variable."
)
deadband: float = Field(
default=0.0,
description="Deadband for the variable; values changes smaller than this are ignored."
)
[docs]
class OPCUAEventConfig(OPCUANodeConfig):
""" Configuration for an OPC UA event source. """
pass
[docs]
class OPCUAMethodConfig(OPCUANodeConfig):
""" Configuration for an OPC UA method source. """
pass
[docs]
class OPCUAServerConfig(BaseModel):
""" OPC UA Server configuration with variables. """
uri: str = Field(..., description="OPC UA server endpoint URI.")
subscription: Optional[OPCUASubscriptionConfig] = Field(
default_factory=OPCUASubscriptionConfig,
description="Server-level default subscription parameters."
)
model_config = ConfigDict(extra="forbid")
[docs]
class OPCUAConnectorSchema(BaseModel):
"""
OPC UA Connector schema wrapping the server configuration.
During initialization, all variables are normalized into `OPCUAVariableConfig` instances,
inheriting server-level subscription defaults where no overrides are given.
The `type` field is a discriminator for Pydantic to select this schema.
.. seealso::
The runtime class of the class OPCUAConnectorSchema schema is :class:`openfactory.connectors.opcua.opcua_connector.OPCUAConnector`.
"""
type: Literal['opcua'] = Field(
..., # no default, means required
description="Discriminator field to identify OPC UA connector type."
)
server: OPCUAServerConfig = Field(..., description="OPC UA server configuration.")
variables: Optional[Dict[str, OPCUAVariableConfig]] = Field(
default=None,
description="Mapping of local variable names to their configurations."
)
events: Optional[Dict[str, OPCUAEventConfig]] = Field(
default=None,
description="Mapping of named event sources."
)
methods: Optional[Dict[str, OPCUAMethodConfig]] = Field(
default=None,
description="Mapping of named OPC UA nodes from which methods will be discovered."
)
model_config = ConfigDict(extra="forbid")
[docs]
def model_post_init(self, __context: Any) -> None:
""" Normalize all server variables with subscription defaults and enforce unique keys. """
if self.variables:
server_sub = self.server.subscription or OPCUASubscriptionConfig()
normalized_vars = {}
seen_ids_vars = set()
for local_name, var_cfg in self.variables.items():
# Local name uniqueness
if local_name in normalized_vars:
raise ValueError(f"Duplicate variable local name: {local_name}")
# Determine unique key for uniqueness check
unique_key = var_cfg.node_id or var_cfg.browse_path
if unique_key in seen_ids_vars:
if var_cfg.node_id:
raise ValueError(f"Duplicate node_id within variables: {var_cfg.node_id}")
else:
raise ValueError(f"Duplicate path within variables: {var_cfg.browse_path}")
seen_ids_vars.add(unique_key)
normalized_vars[local_name] = OPCUAVariableConfig(
node_id=var_cfg.node_id,
browse_path=var_cfg.browse_path,
tag=var_cfg.tag,
access_level=var_cfg.access_level,
queue_size=(
var_cfg.queue_size
if var_cfg.queue_size is not None else
server_sub.queue_size
),
sampling_interval=(
var_cfg.sampling_interval
if var_cfg.sampling_interval is not None else
server_sub.sampling_interval
),
deadband=getattr(var_cfg, "deadband", 0.0)
)
self.variables = normalized_vars
if self.events:
normalized_events = {}
seen_ids_events = set()
for local_name, evt_cfg in self.events.items():
# Local name uniqueness
if local_name in normalized_events:
raise ValueError(f"Duplicate event local name: {local_name}")
# Local name conflict with variables
if self.variables and local_name in self.variables:
raise ValueError(
f"Local name conflict: '{local_name}' exists in both variables and events"
)
# Determine unique key for uniqueness check
unique_key = evt_cfg.node_id or evt_cfg.browse_path
if unique_key in seen_ids_events:
if evt_cfg.node_id:
raise ValueError(f"Duplicate node_id within events: {evt_cfg.node_id}")
else:
raise ValueError(f"Duplicate path within events: {evt_cfg.browse_path}")
seen_ids_events.add(unique_key)
normalized_events[local_name] = evt_cfg
self.events = normalized_events
if self.methods:
normalized_methods = {}
seen_ids_methods = set()
for local_name, m_cfg in self.methods.items():
# Local name uniqueness
if local_name in normalized_methods:
raise ValueError(f"Duplicate method source local name: {local_name}")
# Name conflicts with variables or events
if self.variables and local_name in self.variables:
raise ValueError(
f"Local name conflict: '{local_name}' exists in both variables and methods"
)
if self.events and local_name in self.events:
raise ValueError(
f"Local name conflict: '{local_name}' exists in both events and methods"
)
# Unique node check within methods
unique_key = m_cfg.node_id or m_cfg.browse_path
if unique_key in seen_ids_methods:
if m_cfg.node_id:
raise ValueError(f"Duplicate node_id within methods: {m_cfg.node_id}")
else:
raise ValueError(f"Duplicate path within methods: {m_cfg.browse_path}")
seen_ids_methods.add(unique_key)
normalized_methods[local_name] = m_cfg
self.methods = normalized_methods