OpenFactoryApp#
- class openfactory.apps.ofaapp.OpenFactoryApp(ksqlClient, bootstrap_servers, loglevel='INFO')[source]#
Bases:
AssetGeneric OpenFactory application.
Inherits from
openfactory.assets.Assetand extends it to represent an OpenFactory application with standard metadata, logging, and lifecycle management.- APPLICATION_VERSION#
Version string from
APPLICATION_VERSIONenvironment variable or ‘latest’.- Type:
- APPLICATION_MANUFACTURER#
Manufacturer from
APPLICATION_MANUFACTURERenvironment variable or ‘OpenFactory’.- Type:
- APPLICATION_LICENSE#
License string from
APPLICATION_LICENSEenvironment variable or ‘BSD-3-Clause license’.- Type:
- logger#
Prefixed logger instance configured with the app UUID.
- Type:
- storage#
Storage backend instance created from the
STORAGEenvironment variable, or ‘None’ if not configured.- Type:
Optional [FileBackend]
Usage Example
import time import os from openfactory.apps import OpenFactoryApp from openfactory.kafka import KSQLDBClient class DemoApp(OpenFactoryApp): def main_loop(self): # For actual use case, add here your logic of the app self.logger.info("I don't do anything useful in this example.") counter = 1 while True: self.logger.info(f"Counter: {counter}") counter += 1 time.sleep(2) def app_event_loop_stopped(self): # Optional as it is already done by the `KSQLDBClient` class self.ksql.close() # When the Application is deployed on the OpenFactory Cluster, the # environment variables KSQLDB_URL and KAFKA_BROKER will be set. # The default values can be used for local development. app = DemoApp( ksqlClient=KSQLDBClient(os.getenv("KSQLDB_URL", "http://localhost:8088")), bootstrap_servers=os.getenv("KAFKA_BROKER", "localhost:9092") ) app.run()
Note
When deployed on the OpenFactory Cluster, the environment variables
KSQLDB_URLandKAFKA_BROKERare set and can be used.The
UUIDof the App is set based on the OpenFactory configuration file of the App during the deployment process.Subclasses must implement either
main_loop()(synchronous) orasync_main_loop()(asynchronous) to define application behavior.Attributes are automatically added to the OpenFactory asset for version, manufacturer, license, and availability.
See also
The schema of OpenFactory Apps is
openfactory.schemas.apps.OpenFactoryAppSchema.- __init__(ksqlClient, bootstrap_servers, loglevel='INFO')[source]#
Initializes the OpenFactory application.
Sets up the application UUID, storage backend (if configured), standard attributes (version, manufacturer, license), a prefixed logger, and termination signal handlers.
- Parameters:
ksqlClient (KSQLDBClient) – The KSQL client instance.
bootstrap_servers (str) – Kafka bootstrap servers URL.
loglevel (str) – Logging level for the app (e.g., ‘INFO’, ‘DEBUG’). Defaults to ‘INFO’.
Note
Configures logging with the application UUID as prefix.
Mounts a storage backend if the
STORAGEenvironment variable is set.Registers signal handlers for
SIGINTandSIGTERM.
Tip
The environment variables
KSQLDB_URLandKAFKA_BROKERwill be set when deployed on the OpenFactory Cluster.
- async async_main_loop()[source]#
Asynchronous main loop of the OpenFactory App.
This method defines the core asynchronous execution logic of the application and is expected to be awaited by
async_run().- Return type:
Important
Subclasses must implement this method if they intend to use
async_run()as the application entry point.- Raises:
NotImplementedError – If
async_run()is used without overriding this method in a subclass.
- async async_run()[source]#
Runs the OpenFactory app asynchronously.
- Return type:
Important
Subclasses must implement
async_main_loop()when using this method as the application entry point.This method initializes the app by displaying a welcome banner, adding an availability attribute, and then starts the asynchronous application loop by awaiting
async_main_loop().The following steps are performed:
Display the welcome banner.
Add the
availattribute with value ‘AVAILABLE’.Start the async main loop.
Catch any exceptions and stop the app gracefully.
- Raises:
Exception – Any exception raised by
async_main_loop()is caught, logged, and triggers a graceful shutdown.
- main_loop()[source]#
Synchronous main loop of the OpenFactory App.
This method defines the core execution logic of the application and is typically implemented as a blocking loop.
- Return type:
Important
Subclasses must implement this method if they intend to use
run()as the application entry point.- Raises:
NotImplementedError – If
run()is used without overriding this method in a subclass.
- run()[source]#
Runs the OpenFactory app.
- Return type:
Important
Subclasses must implement this method if they intend to use
run()as the application entry point.This method initializes the app by displaying a welcome banner, adding an availability attribute, and then starts the main application loop by calling
main_loop(). If an exception occurs during the execution of the main loop, the error is caught, and the app is gracefully stopped.The following steps are performed:
Display the welcome banner.
Add the
availattribute with ‘AVAILABLE’ value.Start the main loop.
Catch any exceptions that occur and stop the app gracefully.
- Raises:
Exception – If any exception occurs during the execution of the main loop, it is caught and logged, and the app is stopped.
- signal_handler(signum, frame)[source]#
Handles
SIGINTandSIGTERMsignals, gracefully stopping the application.This method listens for termination signals, deregisters the asset from the system, and then stops the application’s event loop. It is typically used to handle clean shutdowns when the app receives signals like
SIGINTorSIGTERM.