AssetUNS#
OpenFactory AssetUNS class.
- class openfactory.assets.asset_uns_class.AssetUNS(uns_id, ksqlClient, bootstrap_servers, asset_router_url=None)[source]#
Bases:
BaseAssetRepresents an OpenFactory Asset using the UNS identifier.
This class encapsulates Asset metadata and a Kafka producer responsible for sending Asset data. It uses the ksqlDB topology based on the
ASSETS_STREAM_UNSstream to handle Asset data.Note
All write operations to the asset take place in the
assetsstream.- ksql#
Client for interacting with ksqlDB.
- Type:
- ASSET_CONSUMER_CLASS#
Kafka consumer class for reading messages from Asset strean.
- Type:
- producer#
Kafka producer instance for sending Asset messages.
- Type:
Example usage:
import time from openfactory.assets import AssetUNS from openfactory.kafka import KSQLDBClient ksql = KSQLDBClient('http://localhost:8088') cnc = AssetUNS('cnc-003', ksqlClient=ksql, bootstrap_servers='localhost:9092') # list samples print(cnc.samples()) print(cnc.Zact.value) print(cnc.Zact.type) print(cnc.Zact.timestamp) # redefine an attribute value cnc.Zact = 10.0 print(cnc.Zact.value) # callbacks for subscriptions def on_messages(msg_key, msg_value): print(f"[Message] [{msg_key}] {msg_value}") def on_sample(msg_key, msg_value): print(f"[Sample] [{msg_key}] {msg_value}") def on_event(msg_key, msg_value): print(f"[Event] [{msg_key}] {msg_value}") def on_condition(msg_key, msg_value): print(f"[Condition] [{msg_key}] {msg_value}") cnc.subscribe_to_messages(on_messages, 'demo_messages_group') cnc.subscribe_to_samples(on_sample, 'demo_samples_group') cnc.subscribe_to_events(on_event, 'demo_events_group') cnc.subscribe_to_conditions(on_condition, 'demo_conditions_group') # run a main loop while subscriptions remain active try: while True: time.sleep(1) except KeyboardInterrupt: print("Stopping consumer threads ...") cnc.stop_messages_subscription() cnc.stop_samples_subscription() cnc.stop_events_subscription() cnc.stop_conditions_subscription() print("Consumers stopped") finally: ksql.close()
- __init__(uns_id, ksqlClient, bootstrap_servers, asset_router_url=None)[source]#
Initializes the Asset with metadata and a Kafka producer.
- Parameters:
uns_id (str) – UNS identifier of the asset.
ksqlClient (KSQLDBClient) – Client for interacting with ksqlDB.
bootstrap_servers (str) – Kafka bootstrap server address.
asset_router_url (str | None) – Asset Router URL from the OpenFactory Fan-Out-Layer.
- Raises:
OFAException – If
asset_router_urlis not provided and theASSET_ROUTER_URLenvironment variable is not set.
Note
If
asset_router_urlis not explicitly provided, the constructor will attempt to read it from theASSET_ROUTER_URLenvironment variable.When used in an
OpenFactoryAppdeployed on the OpenFactory cluster, the environment variableASSET_ROUTER_URLwill be set.
- add_attribute(asset_attribute)#
Adds a new attribute to the asset.
- Return type:
- Parameters:
asset_attribute (AssetAttribute) – The attribute to be added.
- add_reference_above(above_asset_reference)#
Adds a reference to an asset above the current asset.
- add_reference_below(below_asset_reference)#
Adds a reference to an asset below the current asset.
- property asset_uuid: str#
Returns the asset UUID based on runtime state.
- Returns:
str – The asset’s UUID.
- attributes()#
Returns all non-
Methodattribute IDs associated with this asset.
- close()#
Gracefully closes the Asset and frees ressources.
- Steps performed:
Stops all NATS subscribers (unsubscribe + close NATS connection).
Cancels any remaining tasks in the AsyncLoopThread.
Stops the AsyncLoopThread and joins the thread.
Warning
After calling this method, the Asset instance should not be used again.
- conditions()#
Returns all condition-type attributes for this asset.
- events()#
Returns all event-type attributes for this asset.
- method(method, sender_uuid, args=None)#
Requests the execution of a method for the asset.
This function further sets the corresponding callable attribute with the name of the method (e.g.
GenerateCode) to trigger the command execution.Methods execution can be requested in two ways:
Using the
method()interface:asset.method('GenerateCode', sender_uuid='SENDER-ID', args=[('Code', '123')])
Or directly via the generated callable attribute:
asset.GenerateCode(sender_uuid='SENDER-ID', Code='123')
In both cases,
sender_uuidmust be provided in addition to the command’s named arguments.Note
Named arguments are case sensitive and can be discovered by calling
methods().- Parameters:
- Returns:
str – The correlation_id of the command, which can be used to track the response.
- Return type:
- methods()#
Returns method-type attributes for this asset.
- Return type:
- Returns:
Dict[str, dict | None] – Dictionary mapping method IDs to their parsed method contract (description + arguments). Returns None if no value is stored.
Returned Dictionnary Example
{ "GenerateCode": { "description": "GenerateCode", "arguments": [ { "name": "Code", "description": "Barcode to generate (empty for random)" } ] }, "SetAutomaticMode": { "description": "SetAutomaticMode", "arguments": [] }, "SetManualMode": { "description": "SetManualMode", "arguments": [] } }
- references_above()#
Retrieves a list of assets above the current asset.
- Return type:
List[Self]- Returns:
List[Self] – A list of asset objects that are above the current asset.
- references_above_uuid()#
Retrieves a list of asset-references of assets above the current asset.
- references_below()#
Retrieves a list of assets below the current asset.
- Return type:
List[Self]- Returns:
List[Self] – A list of asset objects that are below the current asset.
- references_below_uuid()#
Retrieves a list of asset-references below the current asset.
- samples()#
Returns all sample-type attributes for this asset.
- stop_attribute_subscription(attribute_id)#
Stops the NATS consumer and gracefully shuts down the subscription.
- stop_conditions_subscription()#
Stops the NATS consumer and gracefully shuts down the subscription for conditions.
- Return type:
- stop_events_subscription()#
Stops the NATS consumer and gracefully shuts down the subscription for events.
- Return type:
- stop_messages_subscription()#
Stops the NATS consumer and gracefully shuts down the subscription.
- Return type:
- stop_samples_subscription()#
Stops the NATS consumer and gracefully shuts down the subscription for samples.
- Return type:
- subscribe_to_attribute(attribute_id, on_message)#
Subscribes to changes of an asset attribute using a NATS consumer.
- Return type:
- Parameters:
attribute_id (str) – The attribute ID to monitor.
on_message (AssetNATSCallback) – Callable that takes (msg_subject: str, msg_value: dict) and handles messages.
- subscribe_to_conditions(on_condition)#
Subscribes to asset conditions using a NATS consumer.
- Return type:
- Parameters:
on_condition (AssetNATSCallback) – Callable that takes (msg_subject: str, msg_value: dict).
- subscribe_to_events(on_event)#
Subscribes to asset events using a NATS consumer.
- Return type:
- Parameters:
on_event (AssetNATSCallback) – Callable that takes (msg_subject: str, msg_value: dict).
- subscribe_to_messages(on_message)#
Subscribes to asset messages using a NATS consumer.
- Return type:
- Parameters:
on_message (AssetNATSCallback) – Callable that takes (msg_subject: str, msg_value: dict) and handles messages.
- subscribe_to_samples(on_sample)#
Subscribes to asset samples using a NATS consumer.
- Return type:
- Parameters:
on_sample (AssetNATSCallback) – Callable that takes (msg_subject: str, msg_value: dict).
- property type: Literal['Samples', 'Condition', 'Events', 'Method', 'OpenFactory', 'UNAVAILABLE']#
Retrieves the type of the asset from ksqlDB.
Executes a SQL query to fetch the asset type. If the query returns no result, the method defaults to
UNAVAILABLE.- Returns:
Literal[‘Samples’, ‘Condition’, ‘Events’, ‘Method’, ‘OpenFactory’, ‘UNAVAILABLE’] – The asset type as stored in the
assets_typetable, orUNAVAILABLEif not found.
- wait_until(attribute_id, value, timeout=30, use_ksqlDB=False)#
Waits until the asset attribute has a specific value or times out.
Monitors either the NATS cluster or ksqlDB to check if the attribute value matches the expected value. Returns True if the value is found within the given timeout, False otherwise.
- Return type:
- Parameters:
attribute_id (str) – The attribute ID of the asset to monitor.
value (Any) – The value to wait for the attribute to match.
timeout (int) – The maximum time to wait, in seconds. Default is 30 seconds.
use_ksqlDB (bool) – If True, uses ksqlDB instead of NATS to check the attribute value. Default is False.
- Returns:
bool – True if the attribute value matches the expected value within the timeout, False otherwise.