Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cloud_pipelines_backend/backend_types_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,10 @@ class ExecutionNode(_TableBase):
repr=False,
)

_status_changed: bool = dataclasses.field(default=False, init=False, repr=False)


EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY = "container_execution_status_history"
EXECUTION_NODE_EXTRA_DATA_SYSTEM_ERROR_EXCEPTION_MESSAGE_KEY = (
"system_error_exception_message"
)
Expand Down
62 changes: 61 additions & 1 deletion cloud_pipelines_backend/instrumentation/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,25 @@
- Instrument: orchestrator_execution_system_errors
"""

import datetime
import enum
import logging

from opentelemetry import metrics as otel_metrics
from sqlalchemy import event as sql_event
from sqlalchemy import orm

from .. import backend_types_sql

_logger = logging.getLogger(__name__)


class MetricUnit(str, enum.Enum):
"""UCUM-style unit strings accepted by the OTel SDK."""

SECONDS = "s"
ERRORS = "{error}"


# ---------------------------------------------------------------------------
# tangle.orchestrator
Expand All @@ -32,5 +50,47 @@
execution_system_errors = orchestrator_meter.create_counter(
name="execution.system_errors",
description="Number of execution nodes that ended in SYSTEM_ERROR status",
unit="{error}",
unit=MetricUnit.ERRORS,
)

execution_status_transition_duration = orchestrator_meter.create_histogram(
name="execution.status_transition.duration",
description="Duration an execution spent in a status before transitioning to the next status",
unit=MetricUnit.SECONDS,
)


# ---------------------------------------------------------------------------
# SQLAlchemy event listeners
# ---------------------------------------------------------------------------

_HISTORY_KEY = backend_types_sql.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY


@sql_event.listens_for(orm.Session, "before_commit")
def _handle_before_commit(session: orm.Session) -> None:
for obj in list(session.new) + list(session.dirty):
if not isinstance(obj, backend_types_sql.ExecutionNode):
continue
if not obj._status_changed:
continue
history: list = (obj.extra_data or {}).get(_HISTORY_KEY, [])
if len(history) >= 2:
prev = history[-2]
curr = history[-1]
prev_time = datetime.datetime.fromisoformat(prev["first_observed_at"])
curr_time = datetime.datetime.fromisoformat(curr["first_observed_at"])
try:
execution_status_transition_duration.record(
(curr_time - prev_time).total_seconds(),
attributes={
"execution.status.from": prev["status"],
"execution.status.to": curr["status"],
},
)
except Exception:
_logger.warning(
f"Failed to record status transition metric for execution {obj.id!r}",
exc_info=True,
)
obj._status_changed = False
33 changes: 33 additions & 0 deletions cloud_pipelines_backend/orchestrator_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@


import sqlalchemy as sql
from sqlalchemy import event as sql_event
from sqlalchemy import orm

from cloud_pipelines.orchestration.storage_providers import (
Expand Down Expand Up @@ -1101,3 +1102,35 @@ def _maybe_get_small_artifact_value(
return text
except Exception:
pass


# ---------------------------------------------------------------------------
# SQLAlchemy event listeners
# ---------------------------------------------------------------------------

_HISTORY_KEY = bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY


@sql_event.listens_for(bts.ExecutionNode.container_execution_status, "set")
def _handle_container_execution_status_set(
execution: bts.ExecutionNode,
value: bts.ContainerExecutionStatus | None,
_old_value: object,
_initiator: object,
) -> None:
if value is None:
return
if execution.extra_data is None:
execution.extra_data = {}
history: list = execution.extra_data.get(_HISTORY_KEY, [])
if history and history[-1]["status"] == value.value:
return
entry = {
"status": value.value,
"first_observed_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
}
execution.extra_data = {
**execution.extra_data,
_HISTORY_KEY: history + [entry],
}
execution._status_changed = True
69 changes: 69 additions & 0 deletions tests/test_sql_event_listeners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Tests for SQLAlchemy event listeners in orchestrator_sql and instrumentation.metrics."""

import unittest.mock

import pytest
from sqlalchemy import orm

from cloud_pipelines_backend import backend_types_sql as bts
from cloud_pipelines_backend import database_ops
from cloud_pipelines_backend import (
orchestrator_sql,
) # noqa: F401 — registers set listener
from cloud_pipelines_backend.instrumentation import (
metrics,
) # noqa: F401 — registers before_commit listener


@pytest.fixture()
def session() -> orm.Session:
db_engine = database_ops.create_db_engine(database_uri="sqlite://")
bts._TableBase.metadata.create_all(db_engine)
with orm.Session(db_engine) as s:
yield s


class TestStatusHistoryListeners:
def test_status_change_appends_history_to_extra_data(
self, session: orm.Session
) -> None:
node = bts.ExecutionNode(task_spec={})
session.add(node)
node.container_execution_status = bts.ContainerExecutionStatus.QUEUED
session.commit()

history = node.extra_data[bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY]
assert len(history) == 1
assert history[0]["status"] == bts.ContainerExecutionStatus.QUEUED

def test_duplicate_status_is_not_appended_to_history(
self, session: orm.Session
) -> None:
node = bts.ExecutionNode(task_spec={})
session.add(node)
node.container_execution_status = bts.ContainerExecutionStatus.QUEUED
node.container_execution_status = bts.ContainerExecutionStatus.QUEUED
session.commit()

history = node.extra_data[bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY]
assert len(history) == 1

def test_second_status_change_records_duration_metric(
self, session: orm.Session
) -> None:
node = bts.ExecutionNode(task_spec={})
session.add(node)
node.container_execution_status = bts.ContainerExecutionStatus.QUEUED
session.commit()

node.container_execution_status = bts.ContainerExecutionStatus.RUNNING
with unittest.mock.patch.object(
metrics.execution_status_transition_duration, "record"
) as mock_record:
session.commit()

mock_record.assert_called_once()
assert mock_record.call_args.kwargs["attributes"] == {
"execution.status.from": bts.ContainerExecutionStatus.QUEUED,
"execution.status.to": bts.ContainerExecutionStatus.RUNNING,
}
Loading