BaseSupervisor#

class openfactory.apps.supervisor.BaseSupervisor(supervisor_uuid, device_uuid, ksqlClient, bootstrap_servers='${KAFKA_BROKER}', loglevel='INFO')[source]#

Bases: OpenFactoryApp

Generic OpenFactory Supervisor application.

Extends OpenFactoryApp to represent a supervisor that monitors or manages a specific device. It registers the device UUID it supervises as an asset attribute.

_device_uuid#

The UUID of the device being supervised.

Type:

str

__init__(supervisor_uuid, device_uuid, ksqlClient, bootstrap_servers='${KAFKA_BROKER}', loglevel='INFO')[source]#

Initializes the BaseSupervisor.

Parameters:
  • supervisor_uuid (str) – UUID of the supervisor application instance.

  • device_uuid (str) – UUID of the device that this supervisor monitors or controls.

  • ksqlClient (KSQLDBClient) – Instance of the KSQLDB client for streaming interaction.

  • bootstrap_servers (str) – Kafka broker address(es). Defaults to the value from config.

  • loglevel (str) – Logging level for the supervisor (e.g., ‘INFO’, ‘DEBUG’). Defaults to ‘INFO’.

available_commands()[source]#

Returns the list of commands handled by the supervisor.

Must be implemented by subclasses to specify which commands they are capable of handling.

Return type:

list[str]

Returns:

List[str] – A list of command names as strings.

Example return value:
[
    {"command": "start_device", "description": "Starts the device"},
    {"command": "stop_device", "description": "Stops the device"}
]
Raises:

NotImplementedError – If the method is not implemented by the subclass.

main_loop()[source]#

Main loop of the Supervisor.

Performs the following: 1. Sends the list of available commands to the supervised device. 2. Initializes a Kafka command consumer subscribed to the command stream for the device. 3. Starts consuming command messages and delegates handling to the on_command method.

Return type:

None

on_command(msg_key, msg_value)[source]#

Callback method to process received commands.

This method must be implemented by subclasses. It is called whenever a command message is received for the device associated with this supervisor.

Return type:

None

Parameters:
  • msg_key (str) – The key of the Kafka message (asset_uuid of the target Asset).

  • msg_value (dict) – Dictionary with required keys ‘CMD’ (command string) and ‘ARGS’ (space-separated args).

Raises:

NotImplementedError – If the method is not overridden in a subclass.

Example

self.on_command(
    "DEVICE-123",
    {"CMD": "reset", "ARGS": "--force --timeout 10"}
)