OpenFactoryApp#

class openfactory.apps.ofaapp.OpenFactoryApp(ksqlClient, bootstrap_servers=None, asset_router_url=None, loglevel='INFO', test_mode=False)[source]#

Bases: Asset

Generic OpenFactory application.

Inherits from openfactory.assets.asset_class.Asset and extends it to represent an OpenFactory application with standard metadata, logging, and lifecycle management.

application_version#

Version from the APPLICATION_VERSION environment variable or "latest".

Type:

AssetAttribute

application_manufacturer#

Manufacturer from the APPLICATION_MANUFACTURER environment variable or "OpenFactoryIO".

Type:

AssetAttribute

application_license#

License from the APPLICATION_LICENSE environment variable or "BSD-3-Clause license".

Type:

AssetAttribute

openfactory_manufacturer#

OpenFacotory vendor ("OpenFactoryIO")

Type:

AssetAttribute

openfactory_license#

OpenFacotory license ("Polyform Noncommercial License 1.0.0")

Type:

AssetAttribute

openfactory_version#

OpenFactory version

Type:

AssetAttribute

logger#

Prefixed logger instance configured with the app UUID.

Type:

logging.Logger

storage#

Storage backend instance created from the STORAGE environment variable, or None if not configured.

Type:

Optional [FileBackend]

Usage Example

import time
import os
from openfactory.apps import OpenFactoryApp, EventAttribute, SampleAttribute, ofa_method
from openfactory.kafka import KSQLDBClient

class DemoApp(OpenFactoryApp):

    # Declarative attributes are automatically registered as AssetAttributes
    status = EventAttribute(value="idle", tag="App.Status")
    sample_rate = SampleAttribute(value=42, tag="Sample.Rate")

    @ofa_method(description="Move to a given (x, y) position with speed")
    def move_axis(self, x: float, y: float, speed: int = 100):
        # This method is callable over OpenFactory due to the @ofa_method decorator
        self.logger.info(f"Moving axis to x={x}, y={y} at speed={speed}")

    @ofa_method()
    def stop_axis(self):
        """Stops all motion immediately."""
        self.logger.info("Stopping all axis")

    def main_loop(self):
        # For actual use case, add here your logic of the app
        self.logger.info("I don't do anything useful in this example.")
        counter = 1
        while True:
            self.logger.info(f"Counter: {counter}")
            counter += 1
            time.sleep(2)

    def app_event_loop_stopped(self):
        # Optional as it is already done by the `KSQLDBClient` class
        self.ksql.close()

# When the Application is deployed on the OpenFactory Cluster, the
# environment variables KSQLDB_URL and KAFKA_BROKER will be set.
# The default values can be used for local development.
app = DemoApp(
    ksqlClient=KSQLDBClient(os.getenv("KSQLDB_URL", "http://localhost:8088")),
    bootstrap_servers=os.getenv("KAFKA_BROKER", "localhost:9092")
)
app.run()

Note

  • When deployed on the OpenFactory Cluster, the environment variables KSQLDB_URL, KAFKA_BROKER and ASSET_ROUTER_URL are set and can be used.

  • The UUID of the App is set based on the OpenFactory configuration file of the App during the deployment process.

  • Subclasses must implement either main_loop() (synchronous) or async_main_loop() (asynchronous) to define application behavior.

  • Attributes are automatically added to the OpenFactory asset for version, manufacturer, license, and availability.

See also

__init__(ksqlClient, bootstrap_servers=None, asset_router_url=None, loglevel='INFO', test_mode=False)[source]#

Initializes the OpenFactory application.

Sets up the application UUID, storage backend (if configured), standard attributes (version, manufacturer, license), a prefixed logger, and termination signal handlers, and automatically registers all methods decorated with @ofa_method decorator.

Parameters:
  • ksqlClient (KSQLDBClient) – The KSQL client instance.

  • bootstrap_servers (str | None) – Kafka bootstrap server address.

  • asset_router_url (str | None) – Asset Router URL from the OpenFactory Fan-Out-Layer.

  • loglevel (str) – Logging level for the app (e.g., INFO, DEBUG). Defaults to INFO.

  • test_mode (bool) – If True, disables live Kafka/ksql interaction (useful for unit tests).

Note

  • If bootstrap_servers is not explicitly provided, the constructor will attempt to read it from the KAFKA_BROKER environment variable.

  • If asset_router_url is not explicitly provided, the constructor will attempt to read it from the ASSET_ROUTER_URL environment variable.

  • Configures logging with the application UUID as prefix.

  • Mounts a storage backend if the STORAGE environment variable is set.

  • Registers signal handlers for SIGINT and SIGTERM.

Tip

The environment variables KSQLDB_URL, KAFKA_BROKER and ASSET_ROUTER_URL will be set when deployed on the OpenFactory Cluster.

app_event_loop_stopped()[source]#

Called when main loop is stopped.

Return type:

None

async async_main_loop()[source]#

Asynchronous main loop of the OpenFactory App.

This method defines the core asynchronous execution logic of the application and is expected to be awaited by async_run().

Return type:

None

Important

Subclasses must implement this method if they intend to use async_run() as the application entry point.

Raises:

NotImplementedError – If async_run() is used without overriding this method in a subclass.

async async_run()[source]#

Runs the OpenFactory app asynchronously.

Return type:

None

Important

Subclasses must implement async_main_loop() when using this method as the application entry point.

This method initializes the app by displaying a welcome banner, adding an availability attribute, and then starts the asynchronous application loop by awaiting async_main_loop().

The following steps are performed:

  1. Display the welcome banner.

  2. Add the avail attribute with value ‘AVAILABLE’.

  3. Start the async main loop.

  4. Catch any exceptions and stop the app gracefully.

Raises:

Exception – Any exception raised by async_main_loop() is caught, logged, and triggers a graceful shutdown.

main_loop()[source]#

Synchronous main loop of the OpenFactory App.

This method defines the core execution logic of the application and is typically implemented as a blocking loop.

Return type:

None

Important

Subclasses must implement this method if they intend to use run() as the application entry point.

Raises:

NotImplementedError – If run() is used without overriding this method in a subclass.

run()[source]#

Runs the OpenFactory app.

Return type:

None

Important

Subclasses must implement this method if they intend to use run() as the application entry point.

This method initializes the app by displaying a welcome banner, adding an availability attribute, and then starts the main application loop by calling main_loop(). If an exception occurs during the execution of the main loop, the error is caught, and the app is gracefully stopped.

The following steps are performed:

  1. Display the welcome banner.

  2. Add the avail attribute with ‘AVAILABLE’ value.

  3. Start the main loop.

  4. Catch any exceptions that occur and stop the app gracefully.

Raises:

Exception – If any exception occurs during the execution of the main loop, it is caught and logged, and the app is stopped.

signal_handler(signum, frame)[source]#

Handles SIGINT and SIGTERM signals, gracefully stopping the application.

This method listens for termination signals, deregisters the asset from the system, and then stops the application’s event loop. It is typically used to handle clean shutdowns when the app receives signals like SIGINT or SIGTERM.

Return type:

None

Parameters:
  • signum (int) – The signal number that was received (e.g., SIGINT, SIGTERM).

  • frame (Optional) – The current stack frame when the signal was received.

welcome_banner()[source]#

Welcome banner printed to stdout.

Can be redefined by children

Return type:

None