Source code for openfactory.openfactory
"""
OpenFactory API.
This module provides the `OpenFactory` class, which serves as the main interface
to deployed assets, devices, and services in an OpenFactory environment through
ksqlDB queries.
Core responsibilities:
- Retrieve deployed assets and their UUIDs
- Access availability information for assets
- Query Docker services associated with assets
- Classify assets by type (devices, MTConnect agents, Kafka producers, applications)
- Provide high-level `Asset` objects for interacting with deployed components
Key integrations:
- ksqlDB for querying asset metadata and status
- Kafka for underlying streaming infrastructure
- Asset abstraction for modeling and interacting with deployed entities
Usage Example:
.. code-block:: python
from openfactory import OpenFactory
from openfactory.kafka.ksql import KSQLDBClient
import openfactory.config as config
ofa = OpenFactory(ksqlClient=KSQLDBClient(config.KSQLDB_URL))
# List all UUID of deployed assets
print(ofa.assets_uuid())
Error handling:
- Returns empty lists when no results are available
- Relies on the KSQLDBClient to report query execution errors
"""
from typing import List
import openfactory.config as config
from openfactory.assets import Asset
from openfactory.kafka.ksql import KSQLDBClient
from openfactory.exceptions import OFAException
[docs]
class OpenFactory:
"""
Main API to OpenFactory.
Provides access to deployed assets, their availability, Docker services,
and their classification by type (devices, agents, etc.).
"""
[docs]
def __init__(self, ksqlClient: KSQLDBClient,
bootstrap_servers: str | None = None,
asset_url: str | None = None):
"""
Initialize the OpenFactory API.
Args:
ksqlClient (KSQLDBClient): A client capable of executing KSQL queries.
bootstrap_servers (str | None):
Kafka bootstrap server address.
If not provided, the value from
``openfactory.config.KAFKA_BROKER`` is used.
asset_url (str | None):
URL of the Asset Router.
If not provided, the value from
``openfactory.config.ASSET_ROUTER_URL`` is used.
Raises:
OFAException:
If no Asset Router URL is available (neither explicitly provided nor configured via ``.ofaenv``).
"""
self.asset_url = asset_url if asset_url is not None else config.ASSET_ROUTER_URL
self.bootstrap_servers = bootstrap_servers if bootstrap_servers is not None else config.KAFKA_BROKER
if self.asset_url is None:
raise OFAException(
"OpenFactory requires 'asset_router_url' to be provided "
"either explicitly or via the '.ofaenv' file"
)
self.ksql = ksqlClient
[docs]
def assets_uuid(self) -> List[str]:
"""
Get list of asset UUIDs deployed on OpenFactory.
Returns:
List[str]: UUIDs of all deployed assets.
"""
query = "SELECT ASSET_UUID FROM assets_type;"
results = self.ksql.query(query)
return [row['ASSET_UUID'] for row in results] if results else []
[docs]
def assets(self) -> List[Asset]:
"""
Get list of Asset objects deployed on OpenFactory.
Returns:
List[Asset]: Deployed Asset instances.
"""
return [Asset(uuid, self.ksql, self.bootstrap_servers, self.asset_url) for uuid in self.assets_uuid()]
[docs]
def assets_availability(self) -> list[dict]:
"""
Get availability data for all deployed assets.
Returns:
list[dict]: Availability data of deployed assets.
"""
query = "SELECT * FROM assets_avail;"
return self.ksql.query(query)
[docs]
def assets_docker_services(self) -> list[dict]:
"""
Get Docker services associated with all deployed assets.
Returns:
list[dict]: Docker services data of deployed assets.
"""
query = "SELECT * FROM docker_services;"
return self.ksql.query(query)
[docs]
def devices_uuid(self) -> List[str]:
"""
Get UUIDs of all devices deployed on OpenFactory.
Returns:
List[str]: UUIDs of deployed device-type assets.
"""
query = "SELECT ASSET_UUID FROM assets_type WHERE TYPE = 'Device';"
results = self.ksql.query(query)
return [row['ASSET_UUID'] for row in results] if results else []
[docs]
def devices(self) -> List[Asset]:
"""
Get Asset objects corresponding to deployed devices.
Returns:
List[Asset]: Deployed device-type assets.
"""
return [Asset(uuid, self.ksql, self.bootstrap_servers) for uuid in self.devices_uuid()]
[docs]
def agents_uuid(self) -> List[str]:
"""
Get UUIDs of deployed MTConnect agents.
Returns:
List[str]: UUIDs of deployed MTConnect agents.
"""
query = "SELECT ASSET_UUID FROM assets_type WHERE TYPE = 'MTConnectAgent';"
results = self.ksql.query(query)
return [row['ASSET_UUID'] for row in results] if results else []
[docs]
def agents(self) -> List[Asset]:
"""
Get `Asset` objects corresponding to deployed MTConnect agents.
Returns:
List[Asset]: Deployed MTConnect agent assets.
"""
return [Asset(uuid, self.ksql, self.bootstrap_servers) for uuid in self.agents_uuid()]
[docs]
def producers_uuid(self) -> List[str]:
"""
Get UUIDs of deployed Kafka producers.
Returns:
List[str]: UUIDs of deployed Kafka producer assets.
"""
query = "SELECT ASSET_UUID FROM assets_type WHERE TYPE = 'KafkaProducer';"
results = self.ksql.query(query)
return [row['ASSET_UUID'] for row in results] if results else []
[docs]
def producers(self) -> List[Asset]:
"""
Get Asset objects corresponding to deployed Kafka producers.
Returns:
List[Asset]: Kafka producer assets.
"""
return [Asset(uuid, self.ksql, self.bootstrap_servers) for uuid in self.producers_uuid()]
[docs]
def applications_uuid(self) -> List[str]:
"""
Get UUIDs of deployed OpenFactory applications.
Returns:
List[str]: UUIDs of deployed OpenFactory application-type assets.
"""
query = "SELECT ASSET_UUID FROM assets_type WHERE TYPE = 'OpenFactoryApp';"
results = self.ksql.query(query)
return [row['ASSET_UUID'] for row in results] if results else []
[docs]
def applications(self) -> List[Asset]:
"""
Get Asset objects corresponding to deployed OpenFactory applications.
Returns:
List[Asset]: Deployed OpenFactory application-type assets.
"""
return [Asset(uuid, self.ksql, self.bootstrap_servers) for uuid in self.applications_uuid()]