OpenFactoryApp#

class openfactory.apps.ofaapp.OpenFactoryApp(ksqlClient, bootstrap_servers, loglevel='INFO')[source]#

Bases: Asset

Generic OpenFactory application.

Inherits from openfactory.assets.Asset and extends it to represent an OpenFactory application with standard metadata, logging, and lifecycle management.

APPLICATION_VERSION#

Version string from APPLICATION_VERSION environment variable or ‘latest’.

Type:

str

APPLICATION_MANUFACTURER#

Manufacturer from APPLICATION_MANUFACTURER environment variable or ‘OpenFactory’.

Type:

str

APPLICATION_LICENSE#

License string from APPLICATION_LICENSE environment variable or ‘BSD-3-Clause license’.

Type:

str

logger#

Prefixed logger instance configured with the app UUID.

Type:

logging.Logger

storage#

Storage backend instance created from the STORAGE environment variable, or ‘None’ if not configured.

Type:

Optional [FileBackend]

Usage Example

import time
import os
from openfactory.apps import OpenFactoryApp
from openfactory.kafka import KSQLDBClient

class DemoApp(OpenFactoryApp):

    def main_loop(self):
        # For actual use case, add here your logic of the app
        self.logger.info("I don't do anything useful in this example.")
        counter = 1
        while True:
            self.logger.info(f"Counter: {counter}")
            counter += 1
            time.sleep(2)

    def app_event_loop_stopped(self):
        # Optional as it is already done by the `KSQLDBClient` class
        self.ksql.close()

# When the Application is deployed on the OpenFactory Cluster, the
# environment variables KSQLDB_URL and KAFKA_BROKER will be set.
# The default values can be used for local development.
app = DemoApp(
    ksqlClient=KSQLDBClient(os.getenv("KSQLDB_URL", "http://localhost:8088")),
    bootstrap_servers=os.getenv("KAFKA_BROKER", "localhost:9092")
)
app.run()

Note

  • When deployed on the OpenFactory Cluster, the environment variables KSQLDB_URL and KAFKA_BROKER are set and can be used.

  • The UUID of the App is set based on the OpenFactory configuration file of the App during the deployment process.

  • Subclasses must implement either main_loop() (synchronous) or async_main_loop() (asynchronous) to define application behavior.

  • Attributes are automatically added to the OpenFactory asset for version, manufacturer, license, and availability.

See also

The schema of OpenFactory Apps is openfactory.schemas.apps.OpenFactoryAppSchema.

__init__(ksqlClient, bootstrap_servers, loglevel='INFO')[source]#

Initializes the OpenFactory application.

Sets up the application UUID, storage backend (if configured), standard attributes (version, manufacturer, license), a prefixed logger, and termination signal handlers.

Parameters:
  • ksqlClient (KSQLDBClient) – The KSQL client instance.

  • bootstrap_servers (str) – Kafka bootstrap servers URL.

  • loglevel (str) – Logging level for the app (e.g., ‘INFO’, ‘DEBUG’). Defaults to ‘INFO’.

Note

  • Configures logging with the application UUID as prefix.

  • Mounts a storage backend if the STORAGE environment variable is set.

  • Registers signal handlers for SIGINT and SIGTERM.

Tip

The environment variables KSQLDB_URL and KAFKA_BROKER will be set when deployed on the OpenFactory Cluster.

app_event_loop_stopped()[source]#

Called when main loop is stopped.

Return type:

None

async async_main_loop()[source]#

Asynchronous main loop of the OpenFactory App.

This method defines the core asynchronous execution logic of the application and is expected to be awaited by async_run().

Return type:

None

Important

Subclasses must implement this method if they intend to use async_run() as the application entry point.

Raises:

NotImplementedError – If async_run() is used without overriding this method in a subclass.

async async_run()[source]#

Runs the OpenFactory app asynchronously.

Return type:

None

Important

Subclasses must implement async_main_loop() when using this method as the application entry point.

This method initializes the app by displaying a welcome banner, adding an availability attribute, and then starts the asynchronous application loop by awaiting async_main_loop().

The following steps are performed:

  1. Display the welcome banner.

  2. Add the avail attribute with value ‘AVAILABLE’.

  3. Start the async main loop.

  4. Catch any exceptions and stop the app gracefully.

Raises:

Exception – Any exception raised by async_main_loop() is caught, logged, and triggers a graceful shutdown.

main_loop()[source]#

Synchronous main loop of the OpenFactory App.

This method defines the core execution logic of the application and is typically implemented as a blocking loop.

Return type:

None

Important

Subclasses must implement this method if they intend to use run() as the application entry point.

Raises:

NotImplementedError – If run() is used without overriding this method in a subclass.

run()[source]#

Runs the OpenFactory app.

Return type:

None

Important

Subclasses must implement this method if they intend to use run() as the application entry point.

This method initializes the app by displaying a welcome banner, adding an availability attribute, and then starts the main application loop by calling main_loop(). If an exception occurs during the execution of the main loop, the error is caught, and the app is gracefully stopped.

The following steps are performed:

  1. Display the welcome banner.

  2. Add the avail attribute with ‘AVAILABLE’ value.

  3. Start the main loop.

  4. Catch any exceptions that occur and stop the app gracefully.

Raises:

Exception – If any exception occurs during the execution of the main loop, it is caught and logged, and the app is stopped.

signal_handler(signum, frame)[source]#

Handles SIGINT and SIGTERM signals, gracefully stopping the application.

This method listens for termination signals, deregisters the asset from the system, and then stops the application’s event loop. It is typically used to handle clean shutdowns when the app receives signals like SIGINT or SIGTERM.

Return type:

None

Parameters:
  • signum (int) – The signal number that was received (e.g., SIGINT, SIGTERM).

  • frame (Optional) – The current stack frame when the signal was received.

welcome_banner()[source]#

Welcome banner printed to stdout.

Can be redefined by children

Return type:

None