OPCUASupervisor#

class openfactory.apps.supervisor.OPCUASupervisor(supervisor_uuid, device_uuid, adapter_ip, adapter_port, ksqlClient, bootstrap_servers='${KAFKA_BROKER}', loglevel='INFO')[source]#

Bases: BaseSupervisor

OpenFactory Supervisor using OPC UA to communicate with the device command adapter.

This supervisor extends the BaseSupervisor class and provides functionality to interact with an OPC UA adapter. It connects to the device command adapter via OPC UA, monitors the connection, and processes incoming commands for the associated device.

namespace_uri#

The namespace URI for the OPC UA server.

Type:

str

browseName#

The browse name used to access the device in the OPC UA server.

Type:

str

RECONNECT_INTERVAL#

The interval (in seconds) to wait before attempting to reconnect to the adapter in case of a failure.

Type:

int

adapter_ip#

Instance attribute - The IP address of the OPC UA adapter.

Type:

str

adapter_port#

Instance attribute - The port of the OPC UA adapter.

Type:

int

opcua_client#

Instance attribute - The OPC UA client instance used for connecting to the adapter.

Type:

asyncua.Client

idx#

Instance attribute - The index of the namespace in the OPC UA server.

Type:

int or None

opcua_adapter#

Instance attribute - A reference to the OPC UA adapter (for internal use).

Type:

str

_stop_reconnect#

Instance attribute - A flag to stop reconnecting to the adapter once the supervisor is stopped.

Type:

bool

_event_loop#

Instance attribute - The event loop used to run asynchronous operations related to the adapter connection.

Type:

asyncio.EventLoop

Example usage:
from openfactory.apps.supervisor import OPCUASupervisor
from openfactory.kafka import KSQLDBClient

supervisor = OPCUASupervisor(
    supervisor_uuid='DEMO-SUPERVISOR',
    device_uuid='PROVER3018',
    adapter_ip='192.168.0.201',
    adapter_port=4840,
    ksqlClient=KSQLDBClient("http://localhost:8088"),
    bootstrap_servers='localhost:9092'
)

supervisor.run()
__init__(supervisor_uuid, device_uuid, adapter_ip, adapter_port, ksqlClient, bootstrap_servers='${KAFKA_BROKER}', loglevel='INFO')[source]#

Initialize the OPCUASupervisor.

Initializes the supervisor by setting up the OPC UA client and connecting to the OPC UA adapter using the provided IP address and port. It also initializes necessary attributes, including the connection URI, connection status, and other OPC UA-related settings.

Parameters:
  • supervisor_uuid (str) – UUID of the supervisor.

  • device_uuid (str) – UUID of the device to listen for commands.

  • adapter_ip (str) – The IP address of the OPC UA adapter.

  • adapter_port (int) – The port on which the OPC UA adapter is accessible.

  • ksqlClient (KSQLDBClient) – The KSQL client instance used for querying.

  • bootstrap_servers (str) – Kafka bootstrap server URL.

  • loglevel (str) – Logging level for the supervisor (e.g., ‘INFO’, ‘DEBUG’). Defaults to ‘INFO’.

app_event_loop_stopped()[source]#

Gracefully shut down the supervisor and its associated connections.

Responsible for stopping the event loop, gracefully disconnecting from the OPC UA server, and performing any necessary cleanup when the application is shutting down. It also sets the _stop_reconnect flag to True to stop the connection monitoring loop.

Return type:

None

Raises:

Exception – If any error occurs during the disconnection from the OPC UA server, it is caught and logged.

available_commands()[source]#

Return the list of commands handled by the supervisor.

Return type:

list[str]

Returns:

list – A list of dictionaries, each containing ‘command’ (str) and ‘description’ (str).

on_command(msg_key, msg_value)[source]#

Callback method to process received commands.

Is invoked when a command message is received for the device associated with this supervisor. It extracts the command and its arguments from the message and schedules the execution of the corresponding command on the OPC UA server in the event loop.

Return type:

None

Parameters:
  • msg_key (str) – The key of the Kafka message, typically identifying the source of the command.

  • msg_value (dict) – Dictionary with required keys ‘CMD’ (command string) and ‘ARGS’ (space-separated args).

Example

self.on_command(
    "DEVICE-456",
    {"CMD": "start", "ARGS": "--verbose --timeout 5"}
)
welcome_banner()[source]#

Welcome banner.

Return type:

None