Source code for openfactory.monitoring.utils

import json
from openfactory.kafka.ksql import KSQLDBClient
from openfactory.kafka import AssetProducer
from openfactory.schemas.apps import OpenFactoryAppSchema
from openfactory.exceptions import OFAException


[docs] def discover_prometheus_registry(ksqlClient: KSQLDBClient): """ Discover deployed Prometheus registry """ query = "select ASSET_UUID from ASSETS_TYPE where TYPE='Prometheus.Registry';" registry = ksqlClient.query(query) if not registry: raise OFAException("No Prometheus Registry deployed") return registry[0]['ASSET_UUID']
[docs] def register_prometheus_target(target: OpenFactoryAppSchema, ksqlClient: KSQLDBClient, bootstrap_servers: str): """ Register a new Prometheus target Args: target (OpenFactoryAppSchema): target to register. ksqlClient: (KSQLDBClient) KSQL client for executing queries. bootstrap_servers (str): Kafka bootstrap server address. """ topic = ksqlClient.get_kafka_topic('METRICS_TARGETS_SOURCE') producer = AssetProducer(ksqlClient=ksqlClient, bootstrap_servers=bootstrap_servers) producer.produce( topic=topic, key=target.uuid, value=json.dumps({ "HOST": target.uuid.lower(), "PORT": str(target.metrics.port), "PATH": target.metrics.path }) ) producer.flush()
[docs] def deregister_prometheus_target(target_uuid: str, ksqlClient: KSQLDBClient, bootstrap_servers: str): """ Deregister a new Prometheus target Args: target_uuid (str): OpenFactory UUID of target to deregister. ksqlClient: (KSQLDBClient) KSQL client for executing queries. bootstrap_servers (str): Kafka bootstrap server address. """ topic = ksqlClient.get_kafka_topic('METRICS_TARGETS_SOURCE') producer = AssetProducer(ksqlClient=ksqlClient, bootstrap_servers=bootstrap_servers) producer.produce( topic=topic, key=target_uuid, value=None ) producer.flush()