OpenFactoryFastAPIApp#
- class openfactory.apps.ofa_fastapi_app.OpenFactoryFastAPIApp(ksqlClient, bootstrap_servers=None, asset_router_url=None, loglevel='INFO', test_mode=False)[source]#
Bases:
OpenFactoryAppOpenFactory application with an embedded FastAPI web interface.
Extends
OpenFactoryAppby attaching afastapi.FastAPIapplication to the OpenFactory runtime. This allows exposing HTTP endpoints alongside the standard OpenFactory asset, attribute, and method mechanisms.The FastAPI application is available via the
apiattribute and can be used exactly as in a standard FastAPI project.- Runtime behavior:
Calling
run()starts both the OpenFactory application and an embedded HTTP server.The HTTP server is powered by Uvicorn and serves the attached
fastapi.FastAPIapplication.The server listens on all network interfaces (
0.0.0.0).The listening port is defined by the
PORTenvironment variable, or defaults to4000if unset.The server log level is aligned with the OpenFactory application logger.
- api#
FastAPI application instance attached to the OpenFactory app.
- Type:
Usage Example (inline routes)
import os from openfactory.apps import OpenFactoryFastAPIApp, EventAttribute, ofa_method from openfactory.kafka import KSQLDBClient class DemoFastAPIApp(OpenFactoryFastAPIApp): status = EventAttribute(value="idle", tag="App.Status") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @self.api.get("/") async def root(): return {"status": self.status.value} @ofa_method(description="Move axis") def move_axis(self, x: float, y: float): self.logger.info(f"Move to {x},{y}") app = DemoFastAPIApp( ksqlClient=KSQLDBClient(os.getenv("KSQLDB_URL", "http://localhost:8088")), bootstrap_servers=os.getenv("KAFKA_BROKER", "localhost:9092"), ) app.run()
For larger applications, routes can be split into modules and included using routers:
Using routers (recommended for larger applications)
# main.py import os from openfactory.apps import OpenFactoryFastAPIApp, EventAttribute, ofa_method from openfactory.kafka import KSQLDBClient from routes import root, move class DemoFastAPIApp(OpenFactoryFastAPIApp): status = EventAttribute(value="idle", tag="App.Status") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # expose OpenFactory App inside FastAPI self.api.state.ofa_app = self # include routers self.api.include_router(root.router) self.api.include_router(move.router) @ofa_method(description="Move axis") def move_axis(self, x: float, y: float): self.logger.info(f"Move to {x},{y}") app = DemoFastAPIApp( ksqlClient=KSQLDBClient(os.getenv("KSQLDB_URL", "http://localhost:8088")), bootstrap_servers=os.getenv("KAFKA_BROKER", "localhost:9092"), ) app.run()
# routes/root.py from fastapi import APIRouter, Request router = APIRouter() @router.get("/") async def root(request: Request): ofa_app = request.app.state.ofa_app return {"status": ofa_app.status.value}
# routes/move.py from fastapi import APIRouter, Request router = APIRouter() @router.post("/move") async def move(x: float, y: float, request: Request): ofa_app = request.app.state.ofa_app ofa_app.move_axis(x, y) return {"message": "moving"}
Note
The FastAPI application is accessible via
apiand behaves like a standard FastAPI instance.Route definitions can be added either directly using
apior viaapi.include_router().For small applications, routes can be defined inline; for larger applications, using routers is recommended.
Only asynchronous execution is supported. Subclasses may optionally implement
async_main_loop()for background tasks.The synchronous
OpenFactoryApp.main_loop()is not supported in this class.OpenFactory features such as attributes, methods, and asset communication remain unchanged.
When deployed on the OpenFactory platform, the
PORTenvironment variable is set automatically by the deployment tool.
- __init__(ksqlClient, bootstrap_servers=None, asset_router_url=None, loglevel='INFO', test_mode=False)[source]#
Initialize the OpenFactory FastAPI application.
This constructor forwards all parameters to
OpenFactoryAppand additionally creates afastapi.FastAPIinstance accessible viaapi.- Parameters:
ksqlClient – KSQL client instance.
bootstrap_servers – Kafka bootstrap server address.
asset_router_url – Asset Router URL.
loglevel – Logging level (e.g.,
INFO,DEBUG).test_mode – Enables test mode (disables live Kafka/ksql interaction).
See also
OpenFactoryAppfor full initialization details and environment variable handling.
- async async_main_loop()[source]#
Default asynchronous main loop.
Keeps the application alive when no custom loop is provided.
Subclasses can override this method to implement background logic.
Example
async def async_main_loop(self): while True: await asyncio.sleep(5) self.logger.info("Background task running")
- Return type:
- async async_run()[source]#
Asynchronous entry point of the application.
Starts both the FastAPI server and the OpenFactory logic concurrently.
- Return type:
- This method:
Displays the welcome banner
Sets the application availability
Runs FastAPI and OpenFactory tasks concurrently
- Raises:
Exception – Any unhandled exception during execution is logged and triggers
OpenFactoryApp.app_event_loop_stopped().
- configure_routes()[source]#
Configure HTTP routes for the FastAPI application.
This method can be overridden by subclasses to define routes directly using
api.Example
def configure_routes(self): @self.api.get("/") async def root(): return {"status": "ok"}
Note
For larger applications, prefer using
api.include_router()instead of overriding this method.- Return type:
- run()[source]#
Start the application using a synchronous entry point.
This method runs
async_run()inside an asyncio event loop.- Return type: