Source code for openfactory.assets.asset_uns_class
""" OpenFactory AssetUNS class. """
from __future__ import annotations
from typing import List
from openfactory.assets.asset_base import BaseAsset
from openfactory.kafka import KafkaAssetUNSConsumer, KSQLDBClient
[docs]
class AssetUNS(BaseAsset):
"""
Represents an OpenFactory Asset using the UNS identifier.
This class encapsulates Asset metadata and a Kafka producer responsible for sending Asset data.
It uses the ksqlDB topology based on the ``ASSETS_STREAM_UNS`` stream to handle Asset data.
Note:
All write operations to the asset take place in the ``assets`` stream.
Attributes:
KSQL_ASSET_TABLE (str): Name of ksqlDB table of Asset states (``assets_uns``)
KSQL_ASSET_ID (str): ksqlDB ID used to identify the Asset (``uns_id``) in the ``KSQL_ASSET_TABLE``
ASSET_ID (str): value of the identifer of the Asset (``uns_id``) used in the ``KSQL_ASSET_TABLE``
ksql (KSQLDBClient): Client for interacting with ksqlDB.
bootstrap_servers (str): Kafka bootstrap server address.
asset_router_url (str): Asset Router URL from the OpenFactory Fan-Out-Layer.
ASSET_CONSUMER_CLASS (KafkaAssetUNSConsumer): Kafka consumer class for reading messages from Asset strean.
producer (AssetProducer): Kafka producer instance for sending Asset messages.
.. admonition:: Example usage:
.. code-block:: python
import time
from openfactory.assets import AssetUNS
from openfactory.kafka import KSQLDBClient
ksql = KSQLDBClient('http://localhost:8088')
cnc = AssetUNS('cnc-003', ksqlClient=ksql, bootstrap_servers='localhost:9092')
# list samples
print(cnc.samples())
print(cnc.Zact.value)
print(cnc.Zact.type)
print(cnc.Zact.timestamp)
# redefine an attribute value
cnc.Zact = 10.0
print(cnc.Zact.value)
# callbacks for subscriptions
def on_messages(msg_key, msg_value):
print(f"[Message] [{msg_key}] {msg_value}")
def on_sample(msg_key, msg_value):
print(f"[Sample] [{msg_key}] {msg_value}")
def on_event(msg_key, msg_value):
print(f"[Event] [{msg_key}] {msg_value}")
def on_condition(msg_key, msg_value):
print(f"[Condition] [{msg_key}] {msg_value}")
cnc.subscribe_to_messages(on_messages, 'demo_messages_group')
cnc.subscribe_to_samples(on_sample, 'demo_samples_group')
cnc.subscribe_to_events(on_event, 'demo_events_group')
cnc.subscribe_to_conditions(on_condition, 'demo_conditions_group')
# run a main loop while subscriptions remain active
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping consumer threads ...")
cnc.stop_messages_subscription()
cnc.stop_samples_subscription()
cnc.stop_events_subscription()
cnc.stop_conditions_subscription()
print("Consumers stopped")
finally:
ksql.close()
"""
KSQL_ASSET_TABLE = 'assets_uns'
KSQL_ASSET_ID = 'uns_id'
ASSET_CONSUMER_CLASS = KafkaAssetUNSConsumer
[docs]
def __init__(self, uns_id: str,
ksqlClient: KSQLDBClient, bootstrap_servers: str,
asset_router_url: str | None = None) -> None:
"""
Initializes the Asset with metadata and a Kafka producer.
Args:
uns_id (str): UNS identifier of the asset.
ksqlClient (KSQLDBClient): Client for interacting with ksqlDB.
bootstrap_servers (str): Kafka bootstrap server address.
asset_router_url (str | None): Asset Router URL from the OpenFactory Fan-Out-Layer.
Raises:
OFAException: If ``asset_router_url`` is not provided and the
``ASSET_ROUTER_URL`` environment variable is not set.
Note:
- If ``asset_router_url`` is not explicitly provided, the constructor will attempt to read it from the ``ASSET_ROUTER_URL`` environment variable.
- When used in an :class:`OpenFactoryApp <openfactory.apps.ofaapp.OpenFactoryApp>` deployed on the OpenFactory cluster, the environment variable ``ASSET_ROUTER_URL`` will be set.
"""
object.__setattr__(self, 'ASSET_ID', uns_id)
super().__init__(ksqlClient, bootstrap_servers, asset_router_url)
@property
def asset_uuid(self) -> str:
"""
Returns the asset UUID based on runtime state.
Returns:
str: The asset's UUID.
"""
query = f"SELECT asset_uuid FROM asset_to_uns_map WHERE {self.KSQL_ASSET_ID}='{self.ASSET_ID}';"
result = self.ksql.query(query)
if not result:
return None
return result[0]['ASSET_UUID']
def _get_reference_list(self, direction: str, as_assets: bool = False) -> List[str | AssetUNS]:
"""
Retrieves a list of asset references (UUIDs or AssetUNS objects) in the given direction.
Args:
direction (str): Either 'above' or 'below', indicating reference direction.
as_assets (bool): If True, returns AssetUNS instances instead of UUID strings.
Returns:
List: List of asset UUIDs or AssetUNS objects.
"""
key = f"{self.ASSET_ID}|references_{direction}"
query = f"SELECT VALUE FROM {self.KSQL_ASSET_TABLE} WHERE key='{key}';"
results = self.ksql.query(query)
if not results or not results[0].get('VALUE', '').strip():
return []
uns_ids = [uns_id.strip() for uns_id in results[0]['VALUE'].split(",")]
if as_assets:
return [AssetUNS(uns_id, ksqlClient=self.ksql) for uns_id in uns_ids]
return uns_ids