""" Asset registration and deregistration in OpenFactory. """
import json
from typing import Dict
from datetime import datetime, timezone
from kafka import KafkaProducer
from openfactory.assets.utils import AssetAttribute
from openfactory.kafka import AssetProducer
from openfactory.kafka.ksql import KSQLDBClient
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from openfactory.schemas.devices import Device
[docs]
def now_iso_to_epoch_millis() -> int:
"""
Get the current UTC time as epoch milliseconds, suitable for Kafka TIMESTAMP fields.
Returns:
int: The current time in milliseconds since the Unix epoch.
"""
now = datetime.now(timezone.utc)
return int(now.timestamp() * 1000)
[docs]
def register_asset(asset_uuid: str, uns: Dict, asset_type: str,
ksqlClient: KSQLDBClient, bootstrap_servers: str, docker_service=""):
"""
Register an asset in OpenFactory.
Args:
asset_uuid (str): UUID of the asset.
uns (dict): UNS data with 'levels' (a dict of hierarchy levels) and 'uns_id' (UNS id as a string).
asset_type (str): Type of the asset.
ksqlClient: (KSQLDBClient) KSQL client for executing queries.
bootstrap_servers (str): Kafka bootstrap server address.
docker_service (str): Docker service name associated with the asset.
"""
producer = AssetProducer(ksqlClient, bootstrap_servers)
producer.send_asset_attribute(
asset_uuid,
AssetAttribute(id="AssetType", value=asset_type, type="OpenFactory", tag="AssetType")
)
producer.send_asset_attribute(
asset_uuid,
AssetAttribute(id="DockerService", value=docker_service, type="OpenFactory", tag="DockerService")
)
# Initialize references
for ref_id in ["references_below", "references_above"]:
producer.send_asset_attribute(
asset_uuid,
AssetAttribute(id=ref_id, value="", type="OpenFactory", tag="AssetsReferences")
)
if uns:
# Set UNS levels
for level_name, level_value in uns['levels'].items():
producer.send_asset_attribute(
asset_uuid,
AssetAttribute(id=level_name, value=level_value, type="OpenFactory", tag="UNSLevel")
)
producer.send_asset_attribute(
asset_uuid,
AssetAttribute(id='uns_id', value=uns['uns_id'], type="OpenFactory", tag="UNSId")
)
# Set UNS map
producer.produce(
topic=ksqlClient.get_kafka_topic('asset_to_uns_map_raw'),
key=asset_uuid.encode('utf-8'),
value=json.dumps({
'ASSET_UUID': asset_uuid,
'UNS_ID': uns['uns_id'],
'UNS_LEVELS': uns['levels'],
'UPDATED_AT': now_iso_to_epoch_millis()
})
)
producer.flush()
[docs]
def deregister_asset(asset_uuid: str,
ksqlClient: KSQLDBClient, bootstrap_servers: str):
"""
Deregister an asset from OpenFactory.
Args:
asset_uuid (str): UUID of the asset.
ksqlClient: (KSQLDBClient) KSQL client for executing queries.
bootstrap_servers (str): Kafka bootstrap server address.
"""
producer = AssetProducer(ksqlClient, bootstrap_servers)
# UNAVAILABLE message
producer.send_asset_attribute(
asset_uuid,
AssetAttribute(id="avail", value="UNAVAILABLE", type="Events", tag="Availability")
)
# remove references
for ref_id in ["references_below", "references_above"]:
producer.send_asset_attribute(
asset_uuid,
AssetAttribute(id=ref_id, value="", type="OpenFactory", tag="AssetsReferences")
)
# tombstone message for table ASSETS
producer.produce(topic=ksqlClient.get_kafka_topic('assets_type'),
key=asset_uuid,
value=None)
# tombstone message for table DOCKER_SERVICES
producer.produce(topic=ksqlClient.get_kafka_topic('docker_services'),
key=asset_uuid,
value=None)
# remove UNS map
producer.produce(
topic=ksqlClient.get_kafka_topic('asset_to_uns_map_raw'),
key=asset_uuid,
value=json.dumps({
'asset_uuid': asset_uuid,
'uns_id': None,
'uns_levels': None,
'updated_at': now_iso_to_epoch_millis()
})
)
producer.flush()
[docs]
def register_device_connector(device: "Device",
ksqlClient: KSQLDBClient,
table_name: str = "DEVICE_CONNECTOR_SOURCE") -> None:
"""
Register a device connector configuration in KSQLDB.
Args:
device (Device): OpenFactory device for which the connector is to register
ksqlClient (KSQLDBClient): OpenFactory ksqlDB client.
table_name (str): Name of the ksqlDB table to insert into. Defaults to 'DEVICE_CONNECTOR_SOURCE'.
Raises:
ValueError: If connector_config is invalid or cannot be serialized.
KSQLDBClientException: If the insert fails.
"""
sql = f"""
INSERT INTO {table_name} (ASSET_UUID, CONNECTOR_CONFIG)
VALUES ('{device.uuid}', '{device.model_dump_json(exclude_none=True)}');
"""
ksqlClient.statement_query(sql)
[docs]
def deregister_device_connector(device_uuid: str,
bootstrap_servers: str,
topic: str = "device_connector_topic"):
"""
Deregister a device connector configuration.
Args:
device_uuid (str): UUID of the device to deregister.
bootstrap_servers (str): Kafka bootstrap servers.
topic (str): Kafka topic backing the device connector table. Defaults to 'device_connector_topic'.
Raises:
KafkaError: If sending the tombstone to the Kafka topic fails.
"""
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8') if v is not None else None
)
# Send tombstone
producer.send(topic, key=device_uuid, value=None)
producer.flush()