BaseAsset#
Warning
The BaseAsset class is an abstract class not intented to be used.
Two classes (Asset and AssetUNS) are derived from it for actual usage.
- class openfactory.assets.asset_base.BaseAsset(ksqlClient, bootstrap_servers=None)[source]#
Bases:
objectBase class for OpenFactory Assets.
Warning
This is an abstract class not intented to be used. From this class, two classes are derived (Asset and AssetUNS) for actual usage.
It can interact with the Kafka topic of the OpenFactory assets or the ksqlDB streams and state tables.
Note
All write operations to the asset take place in the assets stream.
NATS subscribers allow filtering messages by TYPE (‘Samples’, ‘Events’, ‘Condition’).
- KSQL_ASSET_ID#
ksqlDB ID used to identify the asset (asset_uuid or uns_id) in the KSQL_ASSET_TABLE.
- Type:
- ASSET_ID#
Value of the identifier of the asset (asset_uuid or uns_id) used in the KSQL_ASSET_TABLE.
- Type:
- ksql#
Client for interacting with ksqlDB.
- Type:
- ASSET_CONSUMER_CLASS#
Kafka consumer class for reading messages from asset stream.
- producer#
Shared Kafka producer instance used to publish asset messages (singleton across all BaseAsset subclasses).
- Type:
- loop_thread#
Async event loop thread used for NATS subscriptions.
- Type:
AsyncLoopThread
- __init__(ksqlClient, bootstrap_servers=None)[source]#
Initializes the Asset with metadata.
- Parameters:
ksqlClient (KSQLDBClient) – Client for interacting with ksqlDB.
bootstrap_servers (str) – Kafka bootstrap server address. Defaults to config setting.
- add_attribute(asset_attribute)[source]#
Adds a new attribute to the asset.
- Return type:
- Parameters:
asset_attribute (AssetAttribute) – The attribute to be added.
- add_reference_above(above_asset_reference)[source]#
Adds a reference to an asset above the current asset.
- add_reference_below(below_asset_reference)[source]#
Adds a reference to an asset below the current asset.
- property asset_uuid: str#
Returns the asset UUID.
Important
This property must be implemented by subclasses. It is expected to return the current asset UUID dynamically, based on runtime state.
- Returns:
str – The asset’s UUID.
- Raises:
NotImplementedError – If the property is not implemented in a subclass.
- attributes()[source]#
Returns all non-‘Method’ attribute IDs associated with this asset.
Queries KSQL_ASSET_TABLE for all attribute IDs of the asset where the type is not ‘Method’.
- method(method, args='')[source]#
Requests the execution of a method for the asset by sending a command to the Kafka stream.
Constructs a message with the provided method name and optional arguments, and sends it to the CMDS_STREAM Kafka topic for processing.
- methods()[source]#
Returns method-type attributes for this asset.
Queries KSQL_ASSET_TABLE for entries where TYPE = ‘Method’ for the asset.
- references_above()[source]#
Retrieves a list of assets above the current asset.
- Return type:
List[Self]- Returns:
List[Self] – A list of asset objects that are above the current asset.
- references_above_uuid()[source]#
Retrieves a list of asset-references of assets above the current asset.
- references_below()[source]#
Retrieves a list of assets below the current asset.
- Return type:
List[Self]- Returns:
List[Self] – A list of asset objects that are below the current asset.
- stop_conditions_subscription()[source]#
Stops the NATS consumer and gracefully shuts down the subscription for conditions.
- Return type:
- stop_events_subscription()[source]#
Stops the NATS consumer and gracefully shuts down the subscription for events.
- Return type:
- stop_messages_subscription()[source]#
Stops the NATS consumer and gracefully shuts down the subscription.
- Return type:
- stop_samples_subscription()[source]#
Stops the NATS consumer and gracefully shuts down the subscription for samples.
- Return type:
- subscribe_to_conditions(on_condition)[source]#
Subscribes to asset conditions using a NATS consumer. Only messages with TYPE == ‘Condition’ are forwarded to the callback.
- Return type:
- Parameters:
on_condition (AssetNATSCallback) – Callable that takes (msg_subject: str, msg_value: dict).
- subscribe_to_events(on_event)[source]#
Subscribes to asset events using a NATS consumer. Only messages with TYPE == ‘Events’ are forwarded to the callback.
- Return type:
- Parameters:
on_event (AssetNATSCallback) – Callable that takes (msg_subject: str, msg_value: dict).
- subscribe_to_messages(on_message)[source]#
Subscribes to asset messages using a NATS consumer.
- Return type:
- Parameters:
on_message (AssetNATSCallback) – Callable that takes (msg_subject: str, msg_value: dict) and handles messages.
- subscribe_to_samples(on_sample)[source]#
Subscribes to asset samples using a NATS consumer. Only messages with TYPE == ‘Samples’ are forwarded to the callback.
- Return type:
- Parameters:
on_meon_samplessage (AssetNATSCallback) – Callable that takes (msg_subject: str, msg_value: dict).
- property type: Literal['Samples', 'Condition', 'Events', 'Method', 'OpenFactory', 'UNAVAILABLE']#
Retrieves the type of the asset from ksqlDB.
Executes a SQL query to fetch the asset type. If the query returns no result, the method defaults to ‘UNAVAILABLE’.
- Returns:
Literal[‘Samples’, ‘Condition’, ‘Events’, ‘Method’, ‘OpenFactory’, ‘UNAVAILABLE’] – The asset type as stored in the assets_type table, or ‘UNAVAILABLE’ if not found.
- wait_until(attribute_id, value, timeout=30, use_ksqlDB=False)[source]#
Waits until the asset attribute has a specific value or times out.
Monitors either the NATS cluster or ksqlDB to check if the attribute value matches the expected value. Returns True if the value is found within the given timeout, False otherwise.
- Return type:
- Parameters:
attribute_id (str) – The attribute ID of the asset to monitor.
value (Any) – The value to wait for the attribute to match.
timeout (int) – The maximum time to wait, in seconds. Default is 30 seconds.
use_ksqlDB (bool) – If True, uses ksqlDB instead of NATS to check the attribute value. Default is False.
- Returns:
bool – True if the attribute value matches the expected value within the timeout, False otherwise.