Source code for openfactory.schemas.connectors.opcua

"""
OPC UA Connector Schemas

This module provides Pydantic models to define and validate configuration
schemas for OPC UA devices within OpenFactory.

Key Models:
-----------
- OPCUASubscriptionConfig:
  Optional subscription parameters (publishing interval, queue size,
  sampling interval). Can be defined at the server level or overridden
  per variable. If not provided at the server level, default values are
  applied (publishing_interval=100 ms, queue_size=1, sampling_interval=0 ms).

- OPCUAServerConfig:
  Configuration for the OPC UA server, including the endpoint URI,
  namespace URI, and optional default subscription parameters.
  If `subscription` is omitted, default subscription values are used.

- OPCUAVariableConfig:
  Configuration for a single variable. Contains a `browse_name` plus
  optional overrides for queue size and sampling interval. If overrides
  are not provided, server-level defaults (or the automatic defaults
  if server subscription is omitted) are applied.

- OPCUADeviceConfig:
  Configuration for a device on the OPC UA server. Devices may be
  specified either by a hierarchical `path` or a `node_id`. Variables
  can be defined as simple strings (which are normalized to
  OPCUAVariableConfig using server defaults) or as full
  OPCUAVariableConfig objects with overrides. Methods are mapped
  by local name to OPC UA BrowseNames. If `node_id` is given, its
  namespace index, identifier type, and identifier are parsed out.

- OPCUAConnectorSchema:
  Wrapper schema that encapsulates the server and device
  configurations. During initialization, all device variables are
  normalized into OPCUAVariableConfig instances, with server-level
  subscription defaults applied where necessary.

Validation Features:
--------------------
- Ensures exactly one of `path` or `node_id` is provided for a device.
- Validates `node_id` format and parses it into namespace_index,
  identifier_type, and identifier fields.
- Normalizes all variables into OPCUAVariableConfig, applying
  server-level subscription defaults when not overridden. If the server
  subscription is omitted, default values are applied.
- Variables and methods are optional, providing flexibility for
  different server setups.
- Forbids unknown fields to ensure strict schema conformance.

YAML Example:
-------------
.. code-block:: yaml

    type: opcua

    # ---------------------------------------------------------
    # Example 1: Server subscription omitted → defaults applied
    # ---------------------------------------------------------
    server:
      uri: opc.tcp://127.0.0.1:4840/freeopcua/server/
      namespace_uri: http://examples.openfactory.local/opcua
      # subscription omitted → defaults will be used:
      #   publishing_interval: 100
      #   queue_size: 1
      #   sampling_interval: 0

    device:
      path: Sensors/TemperatureSensor_1
      variables:
        temp: Temperature         # simple string → inherits server defaults
        hum:                      # explicit variable config → overrides defaults
          browse_name: Humidity
          queue_size: 5
          sampling_interval: 50
      methods:
        calibrate: Calibrate

    # ---------------------------------------------------------
    # Example 2: Server subscription explicitly provided
    # ---------------------------------------------------------
    server:
      uri: opc.tcp://127.0.0.1:4840/freeopcua/server/
      namespace_uri: http://examples.openfactory.local/opcua
      subscription:
        publishing_interval: 200
        queue_size: 10
        sampling_interval: 25

    device:
      path: Sensors/TemperatureSensor_2
      variables:
        temp: Temperature          # inherits server subscription values
        hum:
          browse_name: Humidity    # overrides server subscription values
          queue_size: 5
          sampling_interval: 50
      methods:
        calibrate: Calibrate

.. seealso::

   The runtime class of the OPCUAConnectorSchema schema is :class:`openfactory.connectors.opcua.opcua_connector.OPCUAConnector`.
"""

import re
from typing import Optional, Dict, Literal, Union, Any
from pydantic import BaseModel, ConfigDict, model_validator, Field


[docs] class OPCUASubscriptionConfig(BaseModel): """ Optional subscription parameters for server or individual variables. """ publishing_interval: Optional[float] = Field( default=100.0, description="Publishing interval in ms for subscription object (server-level default)." ) queue_size: Optional[int] = Field( default=1, description="Queue size per monitored item (server or variable-level default)." ) sampling_interval: Optional[float] = Field( default=0.0, description="Sampling interval in ms for monitored item; 0 = event-driven if supported." ) model_config = ConfigDict(extra="forbid")
[docs] class OPCUAServerConfig(BaseModel): """ OPC UA Server configuration. """ uri: str = Field(..., description="OPC UA server endpoint URI.") namespace_uri: str = Field(..., description="Namespace URI of the OPC UA server.") subscription: Optional[OPCUASubscriptionConfig] = Field( default_factory=OPCUASubscriptionConfig, description="Server-level default subscription parameters." ) model_config = ConfigDict(extra="forbid")
[docs] class OPCUAVariableConfig(BaseModel): """ Configuration for a single OPC UA variable. All device variables are normalized into this model during initialization. If the YAML contains only a string (BrowseName), it is automatically expanded into an OPCUAVariableConfig using server-level subscription defaults. If overrides are provided here, they take precedence over server defaults. """ browse_name: str = Field(..., description="OPC UA BrowseName of the variable.") queue_size: Optional[int] = Field( default=None, description="Override server-level queue size for this variable." ) sampling_interval: Optional[float] = Field( default=None, description="Override server-level sampling interval for this variable." ) model_config = ConfigDict(extra="forbid")
[docs] class OPCUADeviceConfig(BaseModel): """ OPC UA Device configuration. """ path: Optional[str] = Field( default=None, description="Hierarchical path to the device (e.g., 'Sensors/TemperatureSensor_1')." ) node_id: Optional[str] = Field( default=None, description=( "NodeId of the device, used if 'path' is not defined. " "Must follow the format 'ns=<namespace_index>;(i|s)=<identifier>'" ), pattern=r'^ns=\d+;(i|s)=.+$' ) variables: Optional[Dict[str, Union[str, OPCUAVariableConfig]]] = Field( default=None, description=( "Mapping of local names to variables. " "Accepts either simple BrowseName strings or full OPCUAVariableConfig objects. " "After validation, all variables are normalized into OPCUAVariableConfig." ) ) methods: Optional[Dict[str, str]] = Field( default=None, description="Mapping of local names to OPC UA method BrowseNames." ) # Parsed fields (not in input YAML) namespace_index: Optional[int] = Field(default=None, exclude=True, allow_mutation=False) identifier_type: Optional[str] = Field(default=None, exclude=True, allow_mutation=False) identifier: Optional[str] = Field(default=None, exclude=True, allow_mutation=False) model_config = ConfigDict(extra="forbid")
[docs] @model_validator(mode="before") def validate_and_parse_node_id(cls, values: dict) -> dict: """ Ensure that exactly one of 'path' or 'node_id' is provided. If node_id is provided, validate its format and parse namespace_index, identifier_type, and identifier. """ path = values.get('path') node_id = values.get('node_id') # XOR check: exactly one must be provided if bool(path) == bool(node_id): raise ValueError("Exactly one of 'path' or 'node_id' must be specified for the device") if node_id: # Validate format pattern = r"^ns=\d+;(i|s)=.+$" if not re.match(pattern, node_id): raise ValueError("Invalid node_id format") # Parse node_id into fields ns_part, id_part = node_id.split(";") ns_index = int(ns_part.replace("ns=", "")) id_type, identifier = id_part.split("=", 1) values["namespace_index"] = ns_index values["identifier_type"] = id_type values["identifier"] = identifier return values
[docs] class OPCUAConnectorSchema(BaseModel): """ OPC UA Connector schema wrapping the server and device configuration. During initialization, all device variables are normalized into `OPCUAVariableConfig` instances, inheriting server-level subscription defaults where no overrides are given. The `type` field is a discriminator for Pydantic to select this schema. .. seealso:: The runtime class of the class OPCUAConnectorSchema schema is :class:`openfactory.connectors.opcua.opcua_connector.OPCUAConnector`. """ type: Literal['opcua'] = Field( ..., # no default, means required description="Discriminator field to identify OPC UA connector type." ) server: OPCUAServerConfig = Field(..., description="OPC UA server configuration.") device: OPCUADeviceConfig = Field(..., description="Device configuration on the server.") model_config = ConfigDict(extra="forbid")
[docs] def model_post_init(self, __context: Any) -> None: """ Normalize all device variables to OPCUAVariableConfig with defaults applied. """ if not self.device.variables: return server_sub = self.server.subscription or OPCUASubscriptionConfig() normalized_vars = {} for local_name, var_cfg in self.device.variables.items(): # Start with server defaults base_cfg = dict( queue_size=server_sub.queue_size, sampling_interval=server_sub.sampling_interval, ) if isinstance(var_cfg, str): # Simple string → set browse_name normalized_vars[local_name] = OPCUAVariableConfig( browse_name=var_cfg, **base_cfg ) elif isinstance(var_cfg, OPCUAVariableConfig): # Full config → override server defaults where provided normalized_vars[local_name] = OPCUAVariableConfig( browse_name=var_cfg.browse_name, queue_size=var_cfg.queue_size if var_cfg.queue_size is not None else base_cfg["queue_size"], sampling_interval=var_cfg.sampling_interval if var_cfg.sampling_interval is not None else base_cfg["sampling_interval"], ) else: raise ValueError(f"Invalid variable config for {local_name}") self.device.variables = normalized_vars