Source code for openfactory.monitoring.registry.src.metrics_registry
import json
import os
from typing import Annotated
from openfactory.kafka import KSQLDBClient
from openfactory.apps import OpenFactoryFastAPIApp, ofa_method
[docs]
class MetricsRegistry(OpenFactoryFastAPIApp):
"""
OpenFactory Metrics Registry.
This service acts as a bridge between OpenFactory applications and
Prometheus HTTP Service Discovery.
Applications register and deregister Prometheus metrics endpoints
through OpenFactory methods. Registered endpoints are persisted in
Kafka and materialized into a KSQLDB table.
Prometheus periodically queries the HTTP discovery endpoint to obtain
the list of metrics targets to scrape.
By default, the discovery endpoint is exposed at::
/prometheus/targets
This can be overridden using the environment variable ``PROMETHEUS_SD_ENDPOINT``.
The registry itself does not store any state in memory. Kafka and
KSQLDB are the source of truth for all registered metrics endpoints.
"""
[docs]
def __init__(self, *args, **kwargs):
"""
Initialize the MetricsRegistry.
This constructor forwards all parameters to
:class:`OpenFactoryFastAPIApp <openfactory.apps.ofa_fastapi_app.OpenFactoryFastAPIApp>`
Environment Variables:
PROMETHEUS_SD_ENDPOINT:
HTTP endpoint exposing the Prometheus HTTP Service Discovery target list.
Default:
``/prometheus/targets``
Args:
ksqlClient: KSQL client instance.
bootstrap_servers: Kafka bootstrap server address.
asset_router_url: Asset Router URL.
loglevel: Logging level (e.g., ``INFO``, ``DEBUG``).
test_mode: Enables test mode (disables live Kafka/ksql interaction).
See also:
:class:`OpenFactoryFastAPIApp <openfactory.apps.ofa_fastapi_app.OpenFactoryFastAPIApp>`
for full initialization details and environment variable handling.
"""
super().__init__(*args, **kwargs)
self.api.state.ofa_app = self
# redefine the Asset type
if not getattr(self, "_test_mode", False):
self.wait_until(attribute_id='AssetType', value='OpenFactoryApp')
self.AssetType = 'Prometheus.Registry'
# Prometheus discovery endpoint
PROMETHEUS_SD_ENDPOINT = os.getenv("PROMETHEUS_SD_ENDPOINT", "/prometheus/targets")
self.api.get(PROMETHEUS_SD_ENDPOINT)(
self.prometheus_targets
)
@property
def metrics_topic(self):
return "metrics_targets"
@property
def metrics_table(self) -> str:
return "METRICS_TARGETS"
[docs]
@ofa_method()
def register_target(
self,
application_uuid: Annotated[str, "OpenFactory Application UUID"],
host: Annotated[str, "Hostname or service name exposing metrics"],
port: Annotated[int, "TCP port exposing metrics"],
path: Annotated[str, "HTTP path exposing metrics"] = "/metrics",
):
""" Register a Prometheus metrics endpoint """
self.logger.info(f"Registering metrics endpoint {host}:{port}{path}")
self.producer.produce(
topic=self.metrics_topic,
key=application_uuid,
value=json.dumps({
"HOST": host,
"PORT": port,
"PATH": path
})
)
[docs]
@ofa_method()
def deregister_target(
self,
application_uuid: Annotated[str, "OpenFactory Application UUID"],
):
""" Deregister a Prometheus metrics endpoint """
self.logger.info(f"Deregistering metrics endpoint for {application_uuid}")
self.producer.produce(
topic=self.metrics_topic,
key=application_uuid,
value=None
)
[docs]
def prometheus_targets(self):
"""
Return metrics targets using the Prometheus HTTP Service Discovery format.
Prometheus periodically invokes this endpoint to discover metrics
endpoints exposed by OpenFactory applications.
Returns:
list[dict]: List of Prometheus target definitions.
Example:
.. code-block:: yaml
[
{
"targets": ["shdr-gateway:4000"],
"labels": {
"__metrics_path__": "/metrics"
}
}
]
"""
rows = self.ksql.query(f"""
SELECT
APPLICATION_UUID,
HOST,
PORT,
PATH
FROM {self.metrics_table};
""")
targets = []
for row in rows:
targets.append(
{
"targets": [
f"{row['HOST']}:{row['PORT']}"
],
"labels": {
"__metrics_path__": row["PATH"]
}
}
)
return targets
if __name__ == "__main__":
app = MetricsRegistry(
ksqlClient=KSQLDBClient(os.getenv("KSQLDB_URL")),
bootstrap_servers=os.getenv("KAFKA_BROKER"),
loglevel=os.getenv("LOG_LEVEL", "DEBUG")
)
app.run()