AssetUNS#

class openfactory.assets.AssetUNS(uns_id, ksqlClient, bootstrap_servers='${KAFKA_BROKER}')[source]#

Bases: BaseAsset

Represents an OpenFactory Asset using the UNS identifier.

This class encapsulates Asset metadata and a Kafka producer responsible for sending Asset data. It uses the ksqlDB topology based on the ASSETS_STREAM_UNS stream to handle Asset data.

Note

All write operations to the asset take place in the assets stream.

KSQL_ASSET_TABLE#

Name of ksqlDB table of Asset states (assets_uns)

Type:

str

KSQL_ASSET_ID#

ksqlDB ID used to identify the Asset (uns_id) in the KSQL_ASSET_TABLE

Type:

str

ASSET_ID#

value of the identifer of the Asset (uns_id) used in the KSQL_ASSET_TABLE

Type:

str

ksql#

Client for interacting with ksqlDB.

Type:

KSQLDBClient

bootstrap_servers#

Kafka bootstrap server address.

Type:

str

ASSET_CONSUMER_CLASS#

Kafka consumer class for reading messages from Asset strean.

Type:

KafkaAssetUNSConsumer

producer#

Kafka producer instance for sending Asset messages.

Type:

AssetProducer

Example usage:
import time
from openfactory.assets import AssetUNS
from openfactory.kafka import KSQLDBClient

ksql = KSQLDBClient('http://localhost:8088')
cnc = AssetUNS('cnc-003', ksqlClient=ksql)

# list samples
print(cnc.samples())
print(cnc.Zact.value)
print(cnc.Zact.type)
print(cnc.Zact.timestamp)

# redefine an attribute value
cnc.Zact = 10.0
print(cnc.Zact.value)

# callbacks for subscriptions
def on_messages(msg_key, msg_value):
    print(f"[Message] [{msg_key}] {msg_value}")

def on_sample(msg_key, msg_value):
    print(f"[Sample] [{msg_key}] {msg_value}")

def on_event(msg_key, msg_value):
    print(f"[Event] [{msg_key}] {msg_value}")

def on_condition(msg_key, msg_value):
    print(f"[Condition] [{msg_key}] {msg_value}")

cnc.subscribe_to_messages(on_messages, 'demo_messages_group')
cnc.subscribe_to_samples(on_sample, 'demo_samples_group')
cnc.subscribe_to_events(on_event, 'demo_events_group')
cnc.subscribe_to_conditions(on_condition, 'demo_conditions_group')

# run a main loop while subscriptions remain active
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Stopping consumer threads ...")
    cnc.stop_messages_subscription()
    cnc.stop_samples_subscription()
    cnc.stop_events_subscription()
    cnc.stop_conditions_subscription()
    print("Consumers stopped")
finally:
    ksql.close()
__init__(uns_id, ksqlClient, bootstrap_servers='${KAFKA_BROKER}')[source]#

Initializes the Asset with metadata and a Kafka producer.

Parameters:
  • uns_id (str) – UNS identifier of the asset.

  • ksqlClient (KSQLDBClient) – Client for interacting with ksqlDB.

  • bootstrap_servers (str) – Kafka bootstrap server address. Defaults to config setting.

add_attribute(attribute_id, asset_attribute)#

Adds a new attribute to the asset.

Return type:

None

Parameters:
  • attribute_id (str) – The unique identifier for the asset attribute.

  • asset_attribute (AssetAttribute) – The attribute to be added.

add_reference_above(above_asset_reference)#

Adds a reference to an asset above the current asset.

Return type:

None

Parameters:

above_asset_reference (str) – The asset-reference of the asset above the current one to be added.

add_reference_below(below_asset_reference)#

Adds a reference to an asset below the current asset.

Return type:

None

Parameters:

below_asset_reference (str) – The asset-reference of the asset below the current one to be added.

property asset_uuid: str#

Returns the asset UUID based on runtime state.

Returns:

str – The asset’s UUID.

attributes()#

Returns all non-‘Method’ attribute IDs associated with this asset.

Queries KSQL_ASSET_TABLE for all attribute IDs of the asset where the type is not ‘Method’.

Return type:

List[str]

Returns:

List[str] – A list of attribute IDs.

conditions()#

Returns all condition-type attributes for this asset.

Return type:

List[Dict[str, Any]]

Returns:

List[Dict]

A list of dictionaries, each containing:
  • ”ID” (str): The attribute ID.

  • ”VALUE” (Any): The value of the condition.

  • ”TAG” (str): The condition tag (‘Normal’, ‘Warning’, ‘Fault’)

events()#

Returns all event-type attributes for this asset.

Return type:

List[Dict[str, Any]]

Returns:

List[Dict]

A list of dictionaries, each containing:
  • ”ID” (str): The attribute ID.

  • ”VALUE” (Any): The value of the event.

  • ”TAG” (str): The cleaned tag name with placeholders removed.

method(method, args='')#

Requests the execution of a method for the asset by sending a command to the Kafka stream.

Constructs a message with the provided method name and optional arguments, and sends it to the CMDS_STREAM Kafka topic for processing.

Return type:

None

Parameters:
  • method (str) – The name of the method to be executed.

  • args (str) – Arguments for the method, if any. Defaults to an empty string.

methods()#

Returns method-type attributes for this asset.

Queries KSQL_ASSET_TABLE for entries where TYPE = ‘Method’ for the asset.

Return type:

Dict[str, Any]

Returns:

Dict – A dictionary where keys are method attribute IDs and values are the corresponding method values.

references_above()#

Retrieves a list of assets above the current asset.

Return type:

List[Self]

Returns:

List[Self] – A list of asset objects that are above the current asset.

references_above_uuid()#

Retrieves a list of asset-references of assets above the current asset.

Return type:

List[str]

Returns:

List[str] – A list of asset-references (as strings) that are above the current asset.

references_below()#

Retrieves a list of assets below the current asset.

Return type:

List[Self]

Returns:

List[Self] – A list of asset objects that are below the current asset.

references_below_uuid()#

Retrieves a list of asset-references below the current asset.

Return type:

List[str]

Returns:

List[str] – A list of asset-references (as strings) that are below the current asset.

samples()#

Returns all sample-type attributes for this asset.

Return type:

List[Dict[str, Any]]

Returns:

List[Dict]

A list of dictionaries, each containing:
  • ”ID” (str): The attribute ID.

  • ”VALUE” (Any): The value of the sample.

  • ”TAG” (str): The cleaned tag name with placeholders removed.

stop_conditions_subscription()#

Stops the Kafka consumer and gracefully shuts down the subscription.

If a consumer instance exists, it is stopped and the associated consumer thread is joined. The consumer group is deleted from Kafka to clean up the subscription.

Return type:

None

stop_events_subscription()#

Stops the Kafka consumer and gracefully shuts down the subscription to ‘Events’.

If a consumer instance exists, it is stopped, and the associated consumer thread is joined. The consumer group is deleted from Kafka to clean up the subscription.

Return type:

None

stop_messages_subscription()#

Stops the Kafka consumer and gracefully shuts down the subscription.

If a consumer instance exists, it is stopped and the associated consumer thread joined. The consumer group is deleted from Kafka to clean up the subscription.

Return type:

None

stop_samples_subscription()#

Stops the Kafka consumer and gracefully shuts down the subscription to ‘Samples’.

If a consumer instance exists, it is stopped, and the associated consumer thread is joined. The consumer group is deleted from Kafka to clean up the subscription.

Return type:

None

subscribe_to_conditions(on_condition, kafka_group_id)#

Subscribes to ‘Condition’ messages and starts a consumer in a separate thread.

This method creates and starts a Kafka consumer thread that listens for ‘Condition’ messages related to the asset. The provided callback is invoked for each received condition.

Return type:

Thread

Warning

A Kafka consumer is used to subscribe to the kafka Assets topic. Direct Kafka topic consumption offers lower latency but requires reading and filtering all messages, from all Assets deployed on the cluster, which increases load on the brokers and duplicates work across consumers.

Whenever possible, a loop reading from asset attributes (which queries a ksqlDB table) should be prefered if the design allows for it. Alternatively consider deploying a stream processing topology.

Parameters:
  • on_condition (AssetKafkaMessagesCallback) – Callable that takes (msg_key: str, msg_value: dict) and handles condition messages.

  • kafka_group_id (str) – The Kafka consumer group ID to subscribe to.

Returns:

threading.Thread – The consumer thread that is now running.

subscribe_to_events(on_event, kafka_group_id)#

Subscribes to ‘Events’ messages and starts a consumer thread.

This method creates and starts a Kafka consumer thread that listens for ‘Events’ messages related to the asset. The provided callback is invoked for each received event.

Return type:

Thread

Warning

A Kafka consumer is used to subscribe to the kafka Assets topic. Direct Kafka topic consumption offers lower latency but requires reading and filtering all messages, from all Assets deployed on the cluster, which increases load on the brokers and duplicates work across consumers.

Whenever possible, a loop reading from asset attributes (which queries a ksqlDB table) should be prefered if the design allows for it. Alternatively consider deploying a stream processing topology.

Parameters:
  • on_event (AssetKafkaMessagesCallback) – Callable that takes (msg_key: str, msg_value: dict) and handles event messages.

  • kafka_group_id (str) – The Kafka consumer group ID to subscribe to.

Returns:

threading.Thread – The thread object running the Kafka consumer.

subscribe_to_messages(on_message, kafka_group_id)#

Subscribes to asset messages and starts a consumer thread.

This method creates and starts a Kafka consumer thread that listens for messages related to the asset. The provided callback is invoked for each received message.

Return type:

Thread

Warning

A Kafka consumer is used to subscribe to the kafka Assets topic. Direct Kafka topic consumption offers lower latency but requires reading and filtering all messages, from all Assets deployed on the cluster, which increases load on the brokers and duplicates work across consumers.

Whenever possible, a loop reading from asset attributes (which queries a ksqlDB table) should be prefered if the design allows for it. Alternatively consider deploying a stream processing topology.

Parameters:
  • on_message (AssetKafkaMessagesCallback) – Callable that takes (msg_key: str, msg_value: dict) and handles messages.

  • kafka_group_id (str) – The Kafka consumer group ID to subscribe to.

Returns:

threading.Thread – The thread object running the Kafka consumer.

subscribe_to_samples(on_sample, kafka_group_id)#

Subscribes to ‘Samples’ messages and starts a consumer thread.

This method creates and starts a Kafka consumer thread that listens for ‘Samples’ messages related to the asset. The provided callback is invoked for each received sample.

Return type:

Thread

Warning

A Kafka consumer is used to subscribe to the kafka Assets topic. Direct Kafka topic consumption offers lower latency but requires reading and filtering all messages, from all Assets deployed on the cluster, which increases load on the brokers and duplicates work across consumers.

Whenever possible, a loop reading from asset attributes (which queries a ksqlDB table) should be prefered if the design allows for it. Alternatively consider deploying a stream processing topology.

Parameters:
  • on_sample (AssetKafkaMessagesCallback) – Callable that takes (msg_key: str, msg_value: dict) and handles samples messages.

  • kafka_group_id (str) – The Kafka consumer group ID to subscribe to.

Returns:

threading.Thread – The thread object running the Kafka consumer.

property type: Literal['Samples', 'Condition', 'Events', 'Method', 'OpenFactory', 'UNAVAILABLE']#

Retrieves the type of the asset from ksqlDB.

Executes a SQL query to fetch the asset type. If the query returns no result, the method defaults to ‘UNAVAILABLE’.

Returns:

Literal[‘Samples’, ‘Condition’, ‘Events’, ‘Method’, ‘OpenFactory’, ‘UNAVAILABLE’] – The asset type as stored in the assets_type table, or ‘UNAVAILABLE’ if not found.

wait_until(attribute, value, timeout=30, use_ksqlDB=False)#

Waits until the asset attribute has a specific value or times out.

Monitors either the Kafka topic or ksqlDB to check if the attribute value matches the expected value. The method will return True if the value is found within the given timeout, and False if the timeout is reached.

Return type:

bool

Attention

Using ksqlDB introduces slightly higher latency due to internal stream processing and state materialization, but it is significantly more efficient for the Kafka cluster, especially when multiple consumers are involved.

Direct Kafka topic consumption offers lower latency but requires reading and filtering all messages, which increases load on the brokers and duplicates work across consumers.

Whenever possible, prefer use_ksqlDB=True to reduce resource usage and improve scalability.

Parameters:
  • attribute (str) – The attribute of the asset to monitor.

  • value (Any) – The value to wait for the attribute to match.

  • timeout (int) – The maximum time to wait, in seconds. Default is 30 seconds.

  • use_ksqlDB (bool) – If True, uses ksqlDB instead of Kafka topic to check the attribute value. Default is False.

Returns:

boolTrue if the attribute value matches the expected value within the timeout, False otherwise.