From 850b646cc3d474ab5620e7eed8122ad92704c49a Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Tue, 17 Mar 2026 21:20:35 -0700 Subject: [PATCH] feat: Measure execution status duration **Changes:** * Adds histogram measurement for execution node status duration without adding additional database load to the system --- cloud_pipelines_backend/backend_types_sql.py | 3 + .../instrumentation/metrics.py | 62 ++++++++++++++++- cloud_pipelines_backend/orchestrator_sql.py | 33 +++++++++ tests/test_sql_event_listeners.py | 69 +++++++++++++++++++ 4 files changed, 166 insertions(+), 1 deletion(-) create mode 100644 tests/test_sql_event_listeners.py diff --git a/cloud_pipelines_backend/backend_types_sql.py b/cloud_pipelines_backend/backend_types_sql.py index b2d29b1..a2f8f88 100644 --- a/cloud_pipelines_backend/backend_types_sql.py +++ b/cloud_pipelines_backend/backend_types_sql.py @@ -424,7 +424,10 @@ class ExecutionNode(_TableBase): repr=False, ) + _status_changed: bool = dataclasses.field(default=False, init=False, repr=False) + +EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY = "container_execution_status_history" EXECUTION_NODE_EXTRA_DATA_SYSTEM_ERROR_EXCEPTION_MESSAGE_KEY = ( "system_error_exception_message" ) diff --git a/cloud_pipelines_backend/instrumentation/metrics.py b/cloud_pipelines_backend/instrumentation/metrics.py index 5ddb326..b67bbd7 100644 --- a/cloud_pipelines_backend/instrumentation/metrics.py +++ b/cloud_pipelines_backend/instrumentation/metrics.py @@ -22,7 +22,25 @@ - Instrument: orchestrator_execution_system_errors """ +import datetime +import enum +import logging + from opentelemetry import metrics as otel_metrics +from sqlalchemy import event as sql_event +from sqlalchemy import orm + +from .. import backend_types_sql + +_logger = logging.getLogger(__name__) + + +class MetricUnit(str, enum.Enum): + """UCUM-style unit strings accepted by the OTel SDK.""" + + SECONDS = "s" + ERRORS = "{error}" + # --------------------------------------------------------------------------- # tangle.orchestrator @@ -32,5 +50,47 @@ execution_system_errors = orchestrator_meter.create_counter( name="execution.system_errors", description="Number of execution nodes that ended in SYSTEM_ERROR status", - unit="{error}", + unit=MetricUnit.ERRORS, +) + +execution_status_transition_duration = orchestrator_meter.create_histogram( + name="execution.status_transition.duration", + description="Duration an execution spent in a status before transitioning to the next status", + unit=MetricUnit.SECONDS, ) + + +# --------------------------------------------------------------------------- +# SQLAlchemy event listeners +# --------------------------------------------------------------------------- + +_HISTORY_KEY = backend_types_sql.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY + + +@sql_event.listens_for(orm.Session, "before_commit") +def _handle_before_commit(session: orm.Session) -> None: + for obj in list(session.new) + list(session.dirty): + if not isinstance(obj, backend_types_sql.ExecutionNode): + continue + if not obj._status_changed: + continue + history: list = (obj.extra_data or {}).get(_HISTORY_KEY, []) + if len(history) >= 2: + prev = history[-2] + curr = history[-1] + prev_time = datetime.datetime.fromisoformat(prev["first_observed_at"]) + curr_time = datetime.datetime.fromisoformat(curr["first_observed_at"]) + try: + execution_status_transition_duration.record( + (curr_time - prev_time).total_seconds(), + attributes={ + "execution.status.from": prev["status"], + "execution.status.to": curr["status"], + }, + ) + except Exception: + _logger.warning( + f"Failed to record status transition metric for execution {obj.id!r}", + exc_info=True, + ) + obj._status_changed = False diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index d9374f3..3e07577 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -9,6 +9,7 @@ import sqlalchemy as sql +from sqlalchemy import event as sql_event from sqlalchemy import orm from cloud_pipelines.orchestration.storage_providers import ( @@ -1101,3 +1102,35 @@ def _maybe_get_small_artifact_value( return text except Exception: pass + + +# --------------------------------------------------------------------------- +# SQLAlchemy event listeners +# --------------------------------------------------------------------------- + +_HISTORY_KEY = bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY + + +@sql_event.listens_for(bts.ExecutionNode.container_execution_status, "set") +def _handle_container_execution_status_set( + execution: bts.ExecutionNode, + value: bts.ContainerExecutionStatus | None, + _old_value: object, + _initiator: object, +) -> None: + if value is None: + return + if execution.extra_data is None: + execution.extra_data = {} + history: list = execution.extra_data.get(_HISTORY_KEY, []) + if history and history[-1]["status"] == value.value: + return + entry = { + "status": value.value, + "first_observed_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), + } + execution.extra_data = { + **execution.extra_data, + _HISTORY_KEY: history + [entry], + } + execution._status_changed = True diff --git a/tests/test_sql_event_listeners.py b/tests/test_sql_event_listeners.py new file mode 100644 index 0000000..832f1eb --- /dev/null +++ b/tests/test_sql_event_listeners.py @@ -0,0 +1,69 @@ +"""Tests for SQLAlchemy event listeners in orchestrator_sql and instrumentation.metrics.""" + +import unittest.mock + +import pytest +from sqlalchemy import orm + +from cloud_pipelines_backend import backend_types_sql as bts +from cloud_pipelines_backend import database_ops +from cloud_pipelines_backend import ( + orchestrator_sql, +) # noqa: F401 — registers set listener +from cloud_pipelines_backend.instrumentation import ( + metrics, +) # noqa: F401 — registers before_commit listener + + +@pytest.fixture() +def session() -> orm.Session: + db_engine = database_ops.create_db_engine(database_uri="sqlite://") + bts._TableBase.metadata.create_all(db_engine) + with orm.Session(db_engine) as s: + yield s + + +class TestStatusHistoryListeners: + def test_status_change_appends_history_to_extra_data( + self, session: orm.Session + ) -> None: + node = bts.ExecutionNode(task_spec={}) + session.add(node) + node.container_execution_status = bts.ContainerExecutionStatus.QUEUED + session.commit() + + history = node.extra_data[bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY] + assert len(history) == 1 + assert history[0]["status"] == bts.ContainerExecutionStatus.QUEUED + + def test_duplicate_status_is_not_appended_to_history( + self, session: orm.Session + ) -> None: + node = bts.ExecutionNode(task_spec={}) + session.add(node) + node.container_execution_status = bts.ContainerExecutionStatus.QUEUED + node.container_execution_status = bts.ContainerExecutionStatus.QUEUED + session.commit() + + history = node.extra_data[bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY] + assert len(history) == 1 + + def test_second_status_change_records_duration_metric( + self, session: orm.Session + ) -> None: + node = bts.ExecutionNode(task_spec={}) + session.add(node) + node.container_execution_status = bts.ContainerExecutionStatus.QUEUED + session.commit() + + node.container_execution_status = bts.ContainerExecutionStatus.RUNNING + with unittest.mock.patch.object( + metrics.execution_status_transition_duration, "record" + ) as mock_record: + session.commit() + + mock_record.assert_called_once() + assert mock_record.call_args.kwargs["attributes"] == { + "execution.status.from": bts.ContainerExecutionStatus.QUEUED, + "execution.status.to": bts.ContainerExecutionStatus.RUNNING, + }