Source code for openfactory.apps.ofaapp

""" Generic OpenFactory application. """

import os
import signal
import time
import json
import inspect
from types import FrameType
from typing import Optional
from pydantic import TypeAdapter
from openfactory import __version__
from openfactory.kafka import KSQLDBClient
from openfactory.utils.assets import deregister_asset
from openfactory.assets import Asset, AssetAttribute
from openfactory.setup_logging import configure_prefixed_logger
from openfactory.schemas.filelayer.storage import StorageBackendSchema
from openfactory.schemas.command_header import CommandEnvelope
from openfactory.filelayer.backend import FileBackend
from openfactory.apps.attributefield import OpenFactoryAppMeta, EventAttribute


[docs] class OpenFactoryApp(Asset, metaclass=OpenFactoryAppMeta): """ Generic OpenFactory application. Inherits from :class:`openfactory.assets.asset_class.Asset` and extends it to represent an OpenFactory application with standard metadata, logging, and lifecycle management. Attributes: application_version (AssetAttribute): Version from the ``APPLICATION_VERSION`` environment variable or ``"latest"``. application_manufacturer (AssetAttribute): Manufacturer from the ``APPLICATION_MANUFACTURER`` environment variable or ``"OpenFactoryIO"``. application_license (AssetAttribute): License from the ``APPLICATION_LICENSE`` environment variable or ``"BSD-3-Clause license"``. openfactory_manufacturer (AssetAttribute): OpenFacotory vendor (``"OpenFactoryIO"``) openfactory_license (AssetAttribute): OpenFacotory license (``"Polyform Noncommercial License 1.0.0"``) openfactory_version (AssetAttribute): OpenFactory version logger (logging.Logger): Prefixed logger instance configured with the app UUID. storage (Optional [FileBackend]): Storage backend instance created from the ``STORAGE`` environment variable, or ``None`` if not configured. .. admonition:: Usage Example .. code-block:: python import time import os from openfactory.apps import OpenFactoryApp, EventAttribute, SampleAttribute, ofa_method from openfactory.kafka import KSQLDBClient class DemoApp(OpenFactoryApp): # Declarative attributes are automatically registered as AssetAttributes status = EventAttribute(value="idle", tag="App.Status") sample_rate = SampleAttribute(value=42, tag="Sample.Rate") @ofa_method(description="Move to a given (x, y) position with speed") def move_axis(self, x: float, y: float, speed: int = 100): # This method is callable over OpenFactory due to the @ofa_method decorator self.logger.info(f"Moving axis to x={x}, y={y} at speed={speed}") @ofa_method() def stop_axis(self): \"""Stops all motion immediately.\""" self.logger.info("Stopping all axis") def main_loop(self): # For actual use case, add here your logic of the app self.logger.info("I don't do anything useful in this example.") counter = 1 while True: self.logger.info(f"Counter: {counter}") counter += 1 time.sleep(2) def app_event_loop_stopped(self): # Optional as it is already done by the `KSQLDBClient` class self.ksql.close() # When the Application is deployed on the OpenFactory Cluster, the # environment variables KSQLDB_URL and KAFKA_BROKER will be set. # The default values can be used for local development. app = DemoApp( ksqlClient=KSQLDBClient(os.getenv("KSQLDB_URL", "http://localhost:8088")), bootstrap_servers=os.getenv("KAFKA_BROKER", "localhost:9092") ) app.run() Note: - When deployed on the OpenFactory Cluster, the environment variables ``KSQLDB_URL``, ``KAFKA_BROKER`` and ``ASSET_ROUTER_URL`` are set and can be used. - The ``UUID`` of the App is set based on the OpenFactory configuration file of the App during the deployment process. - Subclasses must implement either :meth:`main_loop` (synchronous) or :meth:`async_main_loop` (asynchronous) to define application behavior. - Attributes are automatically added to the OpenFactory asset for version, manufacturer, license, and availability. .. seealso:: - The schema of OpenFactory Apps is :class:`openfactory.schemas.apps.OpenFactoryAppSchema`. - The decorator `@ofa_method <ofa_method.html>`_ can be used to define OpenFactory callable methods. """ avail = EventAttribute(tag="Availability") application_version = EventAttribute(value=os.getenv('APPLICATION_VERSION', 'latest'), tag="Application.Version") application_manufacturer = EventAttribute(value=os.getenv('APPLICATION_MANUFACTURER', 'OpenFactoryIO'), tag="Application.Manufacturer") application_license = EventAttribute(value=os.getenv('APPLICATION_LICENSE', 'BSD-3-Clause license'), tag="Application.License") openfactory_manufacturer = EventAttribute(value='OpenFactoryIO', tag="Library.Manufacturer") openfactory_license = EventAttribute(value='Polyform Noncommercial License 1.0.0', tag="Library.License") openfactory_version = EventAttribute(value=__version__, tag="Library.Version")
[docs] def __init__(self, ksqlClient: KSQLDBClient, bootstrap_servers: str | None = None, asset_router_url: str | None = None, loglevel: str = 'INFO', test_mode: bool = False): """ Initializes the OpenFactory application. Sets up the application UUID, storage backend (if configured), standard attributes (version, manufacturer, license), a prefixed logger, and termination signal handlers, and automatically registers all methods decorated with `@ofa_method <ofa_method.html>`_ decorator. Args: ksqlClient (KSQLDBClient): The KSQL client instance. bootstrap_servers (str | None): Kafka bootstrap server address. asset_router_url (str | None): Asset Router URL from the OpenFactory Fan-Out-Layer. loglevel (str): Logging level for the app (e.g., ``INFO``, ``DEBUG``). Defaults to ``INFO``. test_mode (bool): If True, disables live Kafka/ksql interaction (useful for unit tests). Note: - If ``bootstrap_servers`` is not explicitly provided, the constructor will attempt to read it from the ``KAFKA_BROKER`` environment variable. - If ``asset_router_url`` is not explicitly provided, the constructor will attempt to read it from the ``ASSET_ROUTER_URL`` environment variable. - Configures logging with the application UUID as prefix. - Mounts a storage backend if the ``STORAGE`` environment variable is set. - Registers signal handlers for ``SIGINT`` and ``SIGTERM``. .. tip:: The environment variables ``KSQLDB_URL``, ``KAFKA_BROKER`` and ``ASSET_ROUTER_URL`` will be set when deployed on the OpenFactory Cluster. """ # get APP-UUID from environment (set during deployement by ofa deployment tool) app_uuid = os.getenv('APP_UUID', 'DEV-UUID') super().__init__(asset_uuid=app_uuid, ksqlClient=ksqlClient, bootstrap_servers=bootstrap_servers, asset_router_url=asset_router_url, test_mode=test_mode) # setup logging self.logger = configure_prefixed_logger( app_uuid, prefix=app_uuid.upper(), level=loglevel) self.logger.debug(f"Setup OpenFactory App {app_uuid}") # attach storage storage_env = os.environ.get("STORAGE") self.storage: FileBackend | None = None if storage_env: schema = StorageBackendSchema(storage=json.loads(storage_env)) self.storage = schema.storage.create_backend_instance() self.logger.debug(f"Adding storage of type {self.storage.config.type}") self.logger.debug(json.dumps(self.storage.get_mount_spec(), indent=2)) # attach decorated methods self._subscribe_ofa_methods() # register all declarative attributes for attr_name, attr_field in self._declared_attributes.items(): asset_attr = AssetAttribute( id=attr_name, value=attr_field.value, type=attr_field.type, tag=attr_field.tag ) self.add_attribute(asset_attribute=asset_attr) self.logger.debug(f"Adding declarative attribute {asset_attr}") # setup signal handlers signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler)
def _subscribe_ofa_methods(self): """ Scan decorated methods and subscribe to their CMD attributes. """ # Iterate over class attributes for attr_name, attr in type(self).__dict__.items(): if hasattr(attr, "_ofa_method_metadata") and inspect.isfunction(attr): # attr is the original function (safe to access __name__) method = getattr(self, attr_name) # bind to self for execution cmd_attribute = f"{attr.__name__}_CMD" # use unbound function's __name__ # Build callback closure def make_callback(meth): def on_cmd(msg_key, msg_value): try: envelope = CommandEnvelope.model_validate_json(msg_value['VALUE']) # execute method with envelope arguments self._execute_ofa_method(meth, envelope) except Exception as e: self.logger.error( f"[{self.asset_uuid}] Failed to execute {meth.__name__}: {type(e).__name__}: {e}", exc_info=True ) return on_cmd # Register command as asset attributes self._register_ofa_method(method) self.add_attribute( asset_attribute=AssetAttribute( id=cmd_attribute, value="", type='OpenFactory', tag='Method.Command' ) ) if not self._test_mode: self.subscribe_to_attribute(cmd_attribute, make_callback(method)) def _register_ofa_method(self, method) -> None: """ Register an OpenFactory method as a command attribute. Builds the method contract from the metadata generated by the @ofa_method decorator and registers it on the asset. Args: method: Bound method decorated with @ofa_method. """ metadata = getattr(method, "_ofa_method_metadata", None) if metadata is None: raise ValueError("Method is not decorated with @ofa_method") self.logger.debug(f"Register OpenFactory method '{metadata['method_name']}'") method_contract = { "description": metadata.get("description", "") or "", "arguments": [] } for param_name, param_meta in metadata["parameters"].items(): desc = param_meta.get("description", "") or "" self.logger.debug(f" {param_name}: {desc}") method_contract["arguments"].append({ "name": param_name, "description": desc }) self.add_attribute( asset_attribute=AssetAttribute( id=metadata['method_name'], value=json.dumps(method_contract), type="Method", tag="Method" ) ) def _execute_ofa_method(self, method, envelope: CommandEnvelope): """ Execute a decorated OpenFactory method with arguments from CommandEnvelope. Args: method: the decorated method to call (bound method) envelope: CommandEnvelope instance containing command arguments """ metadata = getattr(method, "_ofa_method_metadata") # Build argument dict for method call kwargs = {} for param_name, param_meta in metadata["parameters"].items(): if param_name in envelope.arguments: # Convert argument from string to the correct type value_str = envelope.arguments[param_name] param_type = param_meta["annotation"] try: kwargs[param_name] = TypeAdapter(param_type).validate_python(value_str) except (ValueError, TypeError) as e: raise ValueError( f"Failed to convert argument '{param_name}'='{value_str}' to type {param_type}" ) from e except Exception as e: raise ValueError( f"Failed to convert argument '{param_name}'='{value_str}' " f"to type {param_type}: {e}" ) else: # Parameter missing in envelope if param_meta.get("required", True): raise ValueError(f"Missing required argument '{param_name}' for method {method.__name__}") else: # Optional parameter: use default kwargs[param_name] = param_meta.get("default") # Call the method on self return method(**kwargs)
[docs] def welcome_banner(self) -> None: """ Welcome banner printed to stdout. Can be redefined by children """ print("==============================================================") print(f"OpenFactory App {self.asset_uuid}") print("--------------------------------------------------------------") print(f"Application version: {self.application_version.value}") print(f"Application manufacturer: {self.application_manufacturer.value}") print(f"Application license: {self.application_license.value}") print("==============================================================")
[docs] def app_event_loop_stopped(self) -> None: """ Called when main loop is stopped. """ pass
[docs] def signal_handler(self, signum: int, frame: Optional[FrameType]) -> None: """ Handles ``SIGINT`` and ``SIGTERM`` signals, gracefully stopping the application. This method listens for termination signals, deregisters the asset from the system, and then stops the application’s event loop. It is typically used to handle clean shutdowns when the app receives signals like ``SIGINT`` or ``SIGTERM``. Args: signum (int): The signal number that was received (e.g., ``SIGINT``, ``SIGTERM``). frame (Optional): The current stack frame when the signal was received. """ signal_name = signal.Signals(signum).name self.logger.info(f"Received signal {signal_name}, stopping app gracefully ...") deregister_asset(self.asset_uuid, ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers) self.app_event_loop_stopped() exit(0)
[docs] def main_loop(self) -> None: """ Synchronous main loop of the OpenFactory App. This method defines the core execution logic of the application and is typically implemented as a blocking loop. Important: Subclasses must implement this method if they intend to use :meth:`run` as the application entry point. Raises: NotImplementedError: If :meth:`run` is used without overriding this method in a subclass. """ raise NotImplementedError("Method 'main_loop' must be implemented")
[docs] def run(self) -> None: """ Runs the OpenFactory app. Important: Subclasses must implement this method if they intend to use :meth:`run` as the application entry point. This method initializes the app by displaying a welcome banner, adding an availability attribute, and then starts the main application loop by calling :meth:`main_loop`. If an exception occurs during the execution of the main loop, the error is caught, and the app is gracefully stopped. The following steps are performed: 1. Display the welcome banner. 2. Add the ``avail`` attribute with 'AVAILABLE' value. 3. Start the main loop. 4. Catch any exceptions that occur and stop the app gracefully. Raises: Exception: If any exception occurs during the execution of the main loop, it is caught and logged, and the app is stopped. """ self.welcome_banner() self.avail = 'AVAILABLE' self.logger.info("Starting main loop") try: self.main_loop() except Exception: self.logger.exception("An error occurred in the main_loop of the app.") self.app_event_loop_stopped() deregister_asset(self.asset_uuid, ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers)
[docs] async def async_main_loop(self) -> None: """ Asynchronous main loop of the OpenFactory App. This method defines the core asynchronous execution logic of the application and is expected to be awaited by :meth:`async_run`. Important: Subclasses must implement this method if they intend to use :meth:`async_run` as the application entry point. Raises: NotImplementedError: If :meth:`async_run` is used without overriding this method in a subclass. """ raise NotImplementedError("Method 'async_main_loop' must be implemented")
[docs] async def async_run(self) -> None: """ Runs the OpenFactory app asynchronously. Important: Subclasses must implement :meth:`async_main_loop` when using this method as the application entry point. This method initializes the app by displaying a welcome banner, adding an availability attribute, and then starts the asynchronous application loop by awaiting :meth:`async_main_loop`. The following steps are performed: 1. Display the welcome banner. 2. Add the ``avail`` attribute with value 'AVAILABLE'. 3. Start the async main loop. 4. Catch any exceptions and stop the app gracefully. Raises: Exception: Any exception raised by :meth:`async_main_loop` is caught, logged, and triggers a graceful shutdown. """ self.welcome_banner() self.avail = 'AVAILABLE' self.logger.info("Starting async main loop") try: await self.async_main_loop() except Exception: self.logger.exception("An error occurred in the async main loop") self.app_event_loop_stopped() deregister_asset(self.asset_uuid, ksqlClient=self.ksql, bootstrap_servers=self.bootstrap_servers)
if __name__ == "__main__": # Example usage of the OpenFactoryApp class MyApp(OpenFactoryApp): """ Example Application. """ def main_loop(self): """ Main loop. """ # For actual use case, add here your logic of the app self.logger.info("I am the main loop of the app.\nI don't do anything useful in this example.") counter = 1 while True: self.logger.info(f"Counter: {counter}") counter += 1 time.sleep(2) def app_event_loop_stopped(self): """ Close connection to ksqlDB server. Optional as it is already done by KSQLDBClient class """ self.ksql.close() app = MyApp( ksqlClient=KSQLDBClient("http://localhost:8088"), bootstrap_servers="localhost:9092" ) app.run()