From 7fddd5b4e37befe40f626210b483e636a41ee226 Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Mon, 23 Mar 2026 17:06:18 -0700 Subject: [PATCH] feat(instrumentation): Add metrics polling service and execution status count gauge Add a PollingService daemon thread that runs COUNT GROUP BY status every 30s and emits an ObservableGauge per active (non-terminal) execution status. - Add cloud_pipelines_backend/instrumentation/metrics_polling.py with PollingService, configure_logging(), and _ACTIVE_STATUSES - Add execution_status_count ObservableGauge to metrics.py - Add metrics poller region to start_local.py with OTel guard --- .../instrumentation/metrics.py | 8 ++ .../instrumentation/metrics_polling.py | 107 ++++++++++++++++++ start_local.py | 11 +- 3 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 cloud_pipelines_backend/instrumentation/metrics_polling.py diff --git a/cloud_pipelines_backend/instrumentation/metrics.py b/cloud_pipelines_backend/instrumentation/metrics.py index b67bbd7..abba410 100644 --- a/cloud_pipelines_backend/instrumentation/metrics.py +++ b/cloud_pipelines_backend/instrumentation/metrics.py @@ -40,6 +40,7 @@ class MetricUnit(str, enum.Enum): SECONDS = "s" ERRORS = "{error}" + EXECUTIONS = "{execution}" # --------------------------------------------------------------------------- @@ -59,6 +60,13 @@ class MetricUnit(str, enum.Enum): unit=MetricUnit.SECONDS, ) +execution_status_count = orchestrator_meter.create_observable_gauge( + name="execution.status.count", + callbacks=[], + description="Number of execution nodes in each active (non-terminal) status", + unit=MetricUnit.EXECUTIONS, +) + # --------------------------------------------------------------------------- # SQLAlchemy event listeners diff --git a/cloud_pipelines_backend/instrumentation/metrics_polling.py b/cloud_pipelines_backend/instrumentation/metrics_polling.py new file mode 100644 index 0000000..e01c7b5 --- /dev/null +++ b/cloud_pipelines_backend/instrumentation/metrics_polling.py @@ -0,0 +1,107 @@ +"""Metrics polling. + +Periodically queries the DB and updates ObservableGauges. Currently emits +execution status counts; add new DB-backed metrics here as needed. + +Only fluctuating (non-terminal) statuses are emitted as status count gauges — +terminal statuses like SUCCEEDED and FAILED only ever climb and are not useful +as gauges. +""" + +import logging +import time +import typing + +import sqlalchemy as sql +from opentelemetry import metrics as otel_metrics +from sqlalchemy import orm + +from .. import backend_types_sql as bts +from . import metrics as app_metrics +from .opentelemetry._internal import configuration as otel_configuration + +_logger = logging.getLogger(__name__) + + +# All statuses minus terminal (ended) ones — these fluctuate up and down +_ACTIVE_STATUSES: frozenset[bts.ContainerExecutionStatus] = ( + frozenset(bts.ContainerExecutionStatus) - bts.CONTAINER_STATUSES_ENDED +) + + +def _empty_status_counts() -> dict[str, int]: + return {s.value: 0 for s in _ACTIVE_STATUSES} + + +class PollingService: + """Polls the DB periodically and emits execution status count gauges.""" + + def __init__( + self, + *, + session_factory: typing.Callable[[], orm.Session], + poll_interval_seconds: float = 30.0, + ) -> None: + self._session_factory = session_factory + self._poll_interval_seconds = poll_interval_seconds + # Initialize all active statuses to 0 + self._counts: dict[str, int] = _empty_status_counts() + # Register our observe method as the gauge callback. + # The OTel SDK stores callbacks in _callbacks; we append after creation + # since create_observable_gauge is called at module load time in metrics.py. + app_metrics.execution_status_count._callbacks.append(self._observe) + + def run_loop(self) -> None: + while True: + try: + self._poll() + except Exception: + _logger.exception("Metrics PollingService: error polling DB") + time.sleep(self._poll_interval_seconds) + + def _poll(self) -> None: + with self._session_factory() as session: + rows = session.execute( + sql.select( + bts.ExecutionNode.container_execution_status, + sql.func.count().label("count"), + ) + .where( + bts.ExecutionNode.container_execution_status.in_(_ACTIVE_STATUSES) + ) + .group_by(bts.ExecutionNode.container_execution_status) + ).all() + new_counts = _empty_status_counts() + for status, count in rows: + if status is not None: + new_counts[status.value] = count + # CPython: attribute assignment is atomic under the GIL; no lock needed. + # If GIL-free Python is ever adopted, revisit this. + self._counts = new_counts + _logger.debug(f"Metrics PollingService: polled status counts: {new_counts}") + + def _observe( + self, _options: otel_metrics.CallbackOptions + ) -> typing.Iterable[otel_metrics.Observation]: + counts = self._counts.copy() + for status_value, count in counts.items(): + yield otel_metrics.Observation(count, {"execution.status": status_value}) + + +def run(*, db_engine: sql.Engine) -> None: + """Check OTel config and run the metrics polling loop (blocking). + + Logs and returns immediately if no metrics endpoint is configured. + """ + otel_config = otel_configuration.resolve() + if otel_config is None or otel_config.metrics is None: + _logger.info( + f"No OTel metrics endpoint configured" + f" (set {otel_configuration.EnvVar.METRIC_EXPORTER_ENDPOINT})" + f" — metrics poller not starting" + ) + return + session_factory = orm.sessionmaker( + autocommit=False, autoflush=False, bind=db_engine + ) + PollingService(session_factory=session_factory).run_loop() diff --git a/start_local.py b/start_local.py index 4ea8921..840932f 100644 --- a/start_local.py +++ b/start_local.py @@ -211,6 +211,13 @@ def run_orchestrator( # endregion +# region: Metrics poller initialization + +from cloud_pipelines_backend.instrumentation import metrics_polling + +# endregion + + # region: API Server initialization import contextlib import threading @@ -228,9 +235,9 @@ def run_orchestrator( @contextlib.asynccontextmanager async def lifespan(app: fastapi.FastAPI): database_ops.initialize_and_migrate_db(db_engine=db_engine) + threading.Thread(target=run_configured_orchestrator, daemon=True).start() threading.Thread( - target=run_configured_orchestrator, - daemon=True, + target=metrics_polling.run, kwargs={"db_engine": db_engine}, daemon=True ).start() if os.environ.get("GOOGLE_CLOUD_SHELL") == "true": # TODO: Find a way to get fastapi/starlette/uvicorn port