BaseAsset#

Warning

The BaseAsset class is an abstract class not intented to be used.

Two classes (Asset and AssetUNS) are derived from it for actual usage.

class openfactory.assets.asset_base.BaseAsset(ksqlClient, bootstrap_servers)[source]#

Bases: object

Base class for OpenFactory Assets.

Warning

This is an abstract class not intented to be used. From this class, two classes are derived (Asset and AssetUNS) for actual usage.

It can interact with the Kafka topic of the OpenFactory assets or the ksqlDB streams and state tables.

Note

  • All write operations to the asset take place in the assets stream.

  • NATS subscribers allow filtering messages by TYPE (‘Samples’, ‘Events’, ‘Condition’).

KSQL_ASSET_TABLE#

Name of ksqlDB table of asset states (assets or assets_uns).

Type:

str

KSQL_ASSET_ID#

ksqlDB ID used to identify the asset (asset_uuid or uns_id) in the KSQL_ASSET_TABLE.

Type:

str

ASSET_ID#

Value of the identifier of the asset (asset_uuid or uns_id) used in the KSQL_ASSET_TABLE.

Type:

str

ksql#

Client for interacting with ksqlDB.

Type:

KSQLDBClient

bootstrap_servers#

Kafka bootstrap server address.

Type:

str

ASSET_CONSUMER_CLASS#

Kafka consumer class for reading messages from asset stream.

Type:

KafkaAssetConsumer|KafkaAssetUNSConsumer

producer#

Shared Kafka producer instance used to publish asset messages (singleton across all BaseAsset subclasses).

Type:

AssetProducer

loop_thread#

Async event loop thread used for NATS subscriptions.

Type:

AsyncLoopThread

subscribers#

Mapping of subscription keys to NATSSubscriber instances.

Type:

dict

__init__(ksqlClient, bootstrap_servers)[source]#

Initializes the Asset with metadata.

Parameters:
  • ksqlClient (KSQLDBClient) – Client for interacting with ksqlDB.

  • bootstrap_servers (str) – Kafka bootstrap server address.

add_attribute(asset_attribute)[source]#

Adds a new attribute to the asset.

Return type:

None

Parameters:

asset_attribute (AssetAttribute) – The attribute to be added.

add_reference_above(above_asset_reference)[source]#

Adds a reference to an asset above the current asset.

Return type:

None

Parameters:

above_asset_reference (str) – The asset-reference of the asset above the current one to be added.

add_reference_below(below_asset_reference)[source]#

Adds a reference to an asset below the current asset.

Return type:

None

Parameters:

below_asset_reference (str) – The asset-reference of the asset below the current one to be added.

property asset_uuid: str#

Returns the asset UUID.

Important

This property must be implemented by subclasses. It is expected to return the current asset UUID dynamically, based on runtime state.

Returns:

str – The asset’s UUID.

Raises:

NotImplementedError – If the property is not implemented in a subclass.

attributes()[source]#

Returns all non-Method attribute IDs associated with this asset.

Return type:

List[str]

Returns:

List[str] – A list of attribute IDs.

close()[source]#

Gracefully closes the Asset and frees ressources.

Steps performed:
  1. Stops all NATS subscribers (unsubscribe + close NATS connection).

  2. Cancels any remaining tasks in the AsyncLoopThread.

  3. Stops the AsyncLoopThread and joins the thread.

Warning

After calling this method, the Asset instance should not be used again.

conditions()[source]#

Returns all condition-type attributes for this asset.

Return type:

List[Dict[str, Any]]

Returns:

List[Dict]

A list of dictionaries, each containing:
  • ID (str): The attribute ID.

  • VALUE (Any): The value of the condition.

  • TAG (str): The condition tag (‘Normal’, ‘Warning’, ‘Fault’)

events()[source]#

Returns all event-type attributes for this asset.

Return type:

List[Dict[str, Any]]

Returns:

List[Dict]

A list of dictionaries, each containing:
  • ID (str): The attribute ID.

  • VALUE (Any): The value of the event.

  • TAG (str): The cleaned tag name with placeholders removed.

method(method, sender_uuid, args=None)[source]#

Requests the execution of a method for the asset.

This function further sets the corresponding callable attribute with the name of the method (e.g. GenerateCode) to trigger the command execution.

Methods execution can be requested in two ways:

  1. Using the method() interface:

    asset.method('GenerateCode', sender_uuid='SENDER-ID', args=[('Code', '123')])
    
  2. Or directly via the generated callable attribute:

    asset.GenerateCode(sender_uuid='SENDER-ID', Code='123')
    

In both cases, sender_uuid must be provided in addition to the command’s named arguments.

Note

Named arguments are case sensitive and can be discovered by calling methods().

Parameters:
  • method (str) – Name of the method to be executed.

  • sender_uuid (str) – Asset UUID of the asset sending the request.

  • args (Optional[List[Tuple[str, str]]]) – List of (argument_name, value) pairs. All values must be strings. Defaults to empty list if not provided.

Returns:

str – The correlation_id of the command, which can be used to track the response.

Return type:

str

methods()[source]#

Returns method-type attributes for this asset.

Return type:

Dict[str, dict | None]

Returns:

Dict[str, dict | None] – Dictionary mapping method IDs to their parsed method contract (description + arguments). Returns None if no value is stored.

Returned Dictionnary Example

{
   "GenerateCode": {
       "description": "GenerateCode",
       "arguments": [
         {
           "name": "Code",
           "description": "Barcode to generate (empty for random)"
         }
       ]
   },
   "SetAutomaticMode": {
       "description": "SetAutomaticMode",
       "arguments": []
   },
   "SetManualMode": {
       "description": "SetManualMode",
       "arguments": []
   }
}
references_above()[source]#

Retrieves a list of assets above the current asset.

Return type:

List[Self]

Returns:

List[Self] – A list of asset objects that are above the current asset.

references_above_uuid()[source]#

Retrieves a list of asset-references of assets above the current asset.

Return type:

List[str]

Returns:

List[str] – A list of asset-references (as strings) that are above the current asset.

references_below()[source]#

Retrieves a list of assets below the current asset.

Return type:

List[Self]

Returns:

List[Self] – A list of asset objects that are below the current asset.

references_below_uuid()[source]#

Retrieves a list of asset-references below the current asset.

Return type:

List[str]

Returns:

List[str] – A list of asset-references (as strings) that are below the current asset.

samples()[source]#

Returns all sample-type attributes for this asset.

Return type:

List[Dict[str, Any]]

Returns:

List[Dict]

A list of dictionaries, each containing:
  • ID (str): The attribute ID.

  • VALUE (Any): The value of the sample.

  • TAG (str): The cleaned tag name with placeholders removed.

stop_attribute_subscription(attribute_id)[source]#

Stops the NATS consumer and gracefully shuts down the subscription.

Return type:

None

Parameters:

attribute_id (str) – The attribute ID to for which to stop the subscription.

stop_conditions_subscription()[source]#

Stops the NATS consumer and gracefully shuts down the subscription for conditions.

Return type:

None

stop_events_subscription()[source]#

Stops the NATS consumer and gracefully shuts down the subscription for events.

Return type:

None

stop_messages_subscription()[source]#

Stops the NATS consumer and gracefully shuts down the subscription.

Return type:

None

stop_samples_subscription()[source]#

Stops the NATS consumer and gracefully shuts down the subscription for samples.

Return type:

None

subscribe_to_attribute(attribute_id, on_message)[source]#

Subscribes to changes of an asset attribute using a NATS consumer.

Return type:

None

Parameters:
  • attribute_id (str) – The attribute ID to monitor.

  • on_message (AssetNATSCallback) – Callable that takes (msg_subject: str, msg_value: dict) and handles messages.

subscribe_to_conditions(on_condition)[source]#

Subscribes to asset conditions using a NATS consumer.

Return type:

None

Parameters:

on_condition (AssetNATSCallback) – Callable that takes (msg_subject: str, msg_value: dict).

subscribe_to_events(on_event)[source]#

Subscribes to asset events using a NATS consumer.

Return type:

None

Parameters:

on_event (AssetNATSCallback) – Callable that takes (msg_subject: str, msg_value: dict).

subscribe_to_messages(on_message)[source]#

Subscribes to asset messages using a NATS consumer.

Return type:

None

Parameters:

on_message (AssetNATSCallback) – Callable that takes (msg_subject: str, msg_value: dict) and handles messages.

subscribe_to_samples(on_sample)[source]#

Subscribes to asset samples using a NATS consumer.

Return type:

None

Parameters:

on_sample (AssetNATSCallback) – Callable that takes (msg_subject: str, msg_value: dict).

property type: Literal['Samples', 'Condition', 'Events', 'Method', 'OpenFactory', 'UNAVAILABLE']#

Retrieves the type of the asset from ksqlDB.

Executes a SQL query to fetch the asset type. If the query returns no result, the method defaults to UNAVAILABLE.

Returns:

Literal[‘Samples’, ‘Condition’, ‘Events’, ‘Method’, ‘OpenFactory’, ‘UNAVAILABLE’] – The asset type as stored in the assets_type table, or UNAVAILABLE if not found.

wait_until(attribute_id, value, timeout=30, use_ksqlDB=False)[source]#

Waits until the asset attribute has a specific value or times out.

Monitors either the NATS cluster or ksqlDB to check if the attribute value matches the expected value. Returns True if the value is found within the given timeout, False otherwise.

Return type:

bool

Parameters:
  • attribute_id (str) – The attribute ID of the asset to monitor.

  • value (Any) – The value to wait for the attribute to match.

  • timeout (int) – The maximum time to wait, in seconds. Default is 30 seconds.

  • use_ksqlDB (bool) – If True, uses ksqlDB instead of NATS to check the attribute value. Default is False.

Returns:

boolTrue if the attribute value matches the expected value within the timeout, False otherwise.