OpenFactoryApp#
- class openfactory.apps.ofaapp.OpenFactoryApp(ksqlClient, bootstrap_servers=None, asset_router_url=None, loglevel='INFO', test_mode=False)[source]#
Bases:
AssetGeneric OpenFactory application.
Inherits from
openfactory.assets.asset_class.Assetand extends it to represent an OpenFactory application with standard metadata, logging, and lifecycle management.- application_version#
Version from the
APPLICATION_VERSIONenvironment variable or"latest".- Type:
- application_manufacturer#
Manufacturer from the
APPLICATION_MANUFACTURERenvironment variable or"OpenFactoryIO".- Type:
- application_license#
License from the
APPLICATION_LICENSEenvironment variable or"BSD-3-Clause license".- Type:
- openfactory_manufacturer#
OpenFacotory vendor (
"OpenFactoryIO")- Type:
- openfactory_license#
OpenFacotory license (
"Polyform Noncommercial License 1.0.0")- Type:
- openfactory_version#
OpenFactory version
- Type:
- logger#
Prefixed logger instance configured with the app UUID.
- Type:
- storage#
Storage backend instance created from the
STORAGEenvironment variable, orNoneif not configured.- Type:
Optional [FileBackend]
Usage Example
import time import os from openfactory.apps import OpenFactoryApp, EventAttribute, SampleAttribute, ofa_method from openfactory.kafka import KSQLDBClient class DemoApp(OpenFactoryApp): # Declarative attributes are automatically registered as AssetAttributes status = EventAttribute(value="idle", tag="App.Status") sample_rate = SampleAttribute(value=42, tag="Sample.Rate") @ofa_method(description="Move to a given (x, y) position with speed") def move_axis(self, x: float, y: float, speed: int = 100): # This method is callable over OpenFactory due to the @ofa_method decorator self.logger.info(f"Moving axis to x={x}, y={y} at speed={speed}") @ofa_method() def stop_axis(self): """Stops all motion immediately.""" self.logger.info("Stopping all axis") def main_loop(self): # For actual use case, add here your logic of the app self.logger.info("I don't do anything useful in this example.") counter = 1 while True: self.logger.info(f"Counter: {counter}") counter += 1 time.sleep(2) def app_event_loop_stopped(self): # Optional as it is already done by the `KSQLDBClient` class self.ksql.close() # When the Application is deployed on the OpenFactory Cluster, the # environment variables KSQLDB_URL and KAFKA_BROKER will be set. # The default values can be used for local development. app = DemoApp( ksqlClient=KSQLDBClient(os.getenv("KSQLDB_URL", "http://localhost:8088")), bootstrap_servers=os.getenv("KAFKA_BROKER", "localhost:9092") ) app.run()
Note
When deployed on the OpenFactory Cluster, the environment variables
KSQLDB_URL,KAFKA_BROKERandASSET_ROUTER_URLare set and can be used.The
UUIDof the App is set based on the OpenFactory configuration file of the App during the deployment process.Subclasses must implement either
main_loop()(synchronous) orasync_main_loop()(asynchronous) to define application behavior.Attributes are automatically added to the OpenFactory asset for version, manufacturer, license, and availability.
See also
The schema of OpenFactory Apps is
openfactory.schemas.apps.OpenFactoryAppSchema.The decorator @ofa_method can be used to define OpenFactory callable methods.
- __init__(ksqlClient, bootstrap_servers=None, asset_router_url=None, loglevel='INFO', test_mode=False)[source]#
Initializes the OpenFactory application.
Sets up the application UUID, storage backend (if configured), standard attributes (version, manufacturer, license), a prefixed logger, and termination signal handlers, and automatically registers all methods decorated with @ofa_method decorator.
- Parameters:
ksqlClient (KSQLDBClient) – The KSQL client instance.
bootstrap_servers (str | None) – Kafka bootstrap server address.
asset_router_url (str | None) – Asset Router URL from the OpenFactory Fan-Out-Layer.
loglevel (str) – Logging level for the app (e.g.,
INFO,DEBUG). Defaults toINFO.test_mode (bool) – If True, disables live Kafka/ksql interaction (useful for unit tests).
Note
If
bootstrap_serversis not explicitly provided, the constructor will attempt to read it from theKAFKA_BROKERenvironment variable.If
asset_router_urlis not explicitly provided, the constructor will attempt to read it from theASSET_ROUTER_URLenvironment variable.Configures logging with the application UUID as prefix.
Mounts a storage backend if the
STORAGEenvironment variable is set.Registers signal handlers for
SIGINTandSIGTERM.
Tip
The environment variables
KSQLDB_URL,KAFKA_BROKERandASSET_ROUTER_URLwill be set when deployed on the OpenFactory Cluster.
- async async_main_loop()[source]#
Asynchronous main loop of the OpenFactory App.
This method defines the core asynchronous execution logic of the application and is expected to be awaited by
async_run().- Return type:
Important
Subclasses must implement this method if they intend to use
async_run()as the application entry point.- Raises:
NotImplementedError – If
async_run()is used without overriding this method in a subclass.
- async async_run()[source]#
Runs the OpenFactory app asynchronously.
- Return type:
Important
Subclasses must implement
async_main_loop()when using this method as the application entry point.This method initializes the app by displaying a welcome banner, adding an availability attribute, and then starts the asynchronous application loop by awaiting
async_main_loop().The following steps are performed:
Display the welcome banner.
Add the
availattribute with value ‘AVAILABLE’.Start the async main loop.
Catch any exceptions and stop the app gracefully.
- Raises:
Exception – Any exception raised by
async_main_loop()is caught, logged, and triggers a graceful shutdown.
- main_loop()[source]#
Synchronous main loop of the OpenFactory App.
This method defines the core execution logic of the application and is typically implemented as a blocking loop.
- Return type:
Important
Subclasses must implement this method if they intend to use
run()as the application entry point.- Raises:
NotImplementedError – If
run()is used without overriding this method in a subclass.
- run()[source]#
Runs the OpenFactory app.
- Return type:
Important
Subclasses must implement this method if they intend to use
run()as the application entry point.This method initializes the app by displaying a welcome banner, adding an availability attribute, and then starts the main application loop by calling
main_loop(). If an exception occurs during the execution of the main loop, the error is caught, and the app is gracefully stopped.The following steps are performed:
Display the welcome banner.
Add the
availattribute with ‘AVAILABLE’ value.Start the main loop.
Catch any exceptions that occur and stop the app gracefully.
- Raises:
Exception – If any exception occurs during the execution of the main loop, it is caught and logged, and the app is stopped.
- signal_handler(signum, frame)[source]#
Handles
SIGINTandSIGTERMsignals, gracefully stopping the application.This method listens for termination signals, deregisters the asset from the system, and then stops the application’s event loop. It is typically used to handle clean shutdowns when the app receives signals like
SIGINTorSIGTERM.