From 36c7018ef67a8c8a661b98cd474e1d137cd62820 Mon Sep 17 00:00:00 2001 From: Khaled Salhab Date: Wed, 6 May 2026 10:46:58 +0300 Subject: [PATCH 1/2] feat: per-endpoint circuit breaker option Add `per_endpoint_circuit_breaker: bool = False` to `HyperpingClient` and `AsyncHyperpingClient`. When enabled, the client maintains an independent `CircuitBreaker` per request path (query string and fragment stripped from the key) so a single flaky endpoint no longer blocks traffic to healthy ones. Default `False` preserves the original single-shared-breaker behaviour and all existing breaker tests. Per-path state is queryable via `client.circuit_breaker_state_for(path)`; the existing `client.circuit_breaker` property still returns the shared breaker. The per-path dict is protected by its own `threading.Lock`; the per-path `CircuitBreaker` instances retain their own internal lock. Tests cover: isolation between paths, state-query API, query-string stripping, default-mode unchanged, async parity, and 50-thread concurrent access. --- BACKLOG.md | 4 +- CHANGELOG.md | 10 + README.md | 23 ++ src/hyperping/_async_client.py | 73 +++++- src/hyperping/client.py | 77 +++++- .../unit/test_per_endpoint_circuit_breaker.py | 223 ++++++++++++++++++ 6 files changed, 390 insertions(+), 20 deletions(-) create mode 100644 tests/unit/test_per_endpoint_circuit_breaker.py diff --git a/BACKLOG.md b/BACKLOG.md index fcc0244..4ec41c7 100644 --- a/BACKLOG.md +++ b/BACKLOG.md @@ -46,7 +46,7 @@ Agents: Security, Architecture, Code Quality, Refactor/Dead Code. - [x] **M1** Async client (`AsyncHyperpingClient`) — shipped in PR #13 (feature/sdk-py-03-async-client). - [x] **M2** Pagination (`page` param, `hasNextPage` auto-pagination via `collect_all_pages` / `collect_all_pages_async`) — shipped in PR #12 (merged) and PR #13. -- [ ] **M3** Per-endpoint circuit breaker (`per_endpoint_circuit_breaker: bool = False` option). **Deferred — moderate scope, add in follow-up.** +- [x] **M3** Per-endpoint circuit breaker (`per_endpoint_circuit_breaker: bool = False` option) on `HyperpingClient` and `AsyncHyperpingClient`. Per-path state via `circuit_breaker_state_for(path)`. Default off; existing single-shared-breaker behaviour preserved. - [x] **M4** `MonitorCreate` now has `@model_validator(mode="after")` that raises `ValueError` if DNS fields are set on non-DNS monitors. - [x] **M5** `MonitorListResponse` is in `__all__` — retained but documented as not returned by any client method. Will be used once pagination lands. - [x] **M6** `APIErrorResponse` removed from `__all__` (documented as intentionally internal in comment). @@ -106,7 +106,7 @@ The following items require either a semver bump, a separate PR, or manual work: - ~~**M1** Async client~~ — shipped - ~~**M2** Pagination~~ — shipped -- **M3** Per-endpoint circuit breaker option +- ~~**M3** Per-endpoint circuit breaker option~~ — shipped - **M11** URL validation for HTTP-protocol monitors (cross-field, needs discriminated union work) - **M12** DateTime coercion (breaking change — v0.2.0) - **H11** Pin all GitHub Actions `uses:` to 40-char commit SHAs (requires per-tag SHA lookup) diff --git a/CHANGELOG.md b/CHANGELOG.md index dfbf692..2f7ecc9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Per-endpoint circuit breaker option (`per_endpoint_circuit_breaker: bool = False`) on + `HyperpingClient` and `AsyncHyperpingClient`. When enabled, each request path gets its + own breaker state so a single flaky endpoint no longer blocks traffic to healthy ones. + Default behaviour is unchanged. State for a given path is readable via + `client.circuit_breaker_state_for(path)`. See README for details. + ## [1.5.0] - 2026-04-20 ### Added diff --git a/README.md b/README.md index ae6c7b6..89dbd23 100644 --- a/README.md +++ b/README.md @@ -252,6 +252,29 @@ client = HyperpingClient( ) ``` +### Per-endpoint circuit breaker + +By default a single shared circuit breaker covers every request. If one endpoint flakes, every other endpoint is also blocked. Enable `per_endpoint_circuit_breaker=True` to keep one breaker per request path so a failing endpoint does not punish healthy ones: + +```python +client = HyperpingClient( + api_key="sk_...", + per_endpoint_circuit_breaker=True, +) + +# Inspect state for a specific path: +from hyperping import CircuitState, Endpoint + +state = client.circuit_breaker_state_for(str(Endpoint.MONITORS)) +assert state in {CircuitState.CLOSED, CircuitState.HALF_OPEN, CircuitState.OPEN} +``` + +| Option | Type | Default | Description | +| --- | --- | --- | --- | +| `per_endpoint_circuit_breaker` | `bool` | `False` | When `True`, maintain a separate circuit breaker per request path (query string and fragment ignored). The same `circuit_breaker_config` applies to every per-path breaker. The shared breaker remains accessible via `client.circuit_breaker`; per-path state is read via `client.circuit_breaker_state_for(path)`. | + +The same option is available on `AsyncHyperpingClient`. + ## Type Safety This package ships a `py.typed` marker (PEP 561) and is fully typed. Works out of the box with mypy and pyright. diff --git a/src/hyperping/_async_client.py b/src/hyperping/_async_client.py index 6474f4f..2850303 100644 --- a/src/hyperping/_async_client.py +++ b/src/hyperping/_async_client.py @@ -14,6 +14,7 @@ import asyncio import logging import random +import threading from typing import Any import httpx @@ -28,6 +29,7 @@ from hyperping._circuit_breaker import ( CircuitBreaker, CircuitBreakerConfig, + CircuitState, ) from hyperping._internals import DEFAULT_USER_AGENT, RETRY_AFTER_MAX, sanitize_for_log from hyperping.client import DEFAULT_RETRY_CONFIG, RetryConfig @@ -73,6 +75,7 @@ def __init__( retry_config: RetryConfig | None = None, circuit_breaker_config: CircuitBreakerConfig | None = None, user_agent: str | None = None, + per_endpoint_circuit_breaker: bool = False, ) -> None: """Initialize the async Hyperping API client. @@ -82,8 +85,13 @@ def __init__( base_url: Override the default API base URL. timeout: HTTP request timeout in seconds. retry_config: Retry behaviour configuration. - circuit_breaker_config: Circuit breaker configuration. + circuit_breaker_config: Circuit breaker configuration. When + ``per_endpoint_circuit_breaker`` is ``True`` the same config is + applied to each per-path breaker. user_agent: Custom ``User-Agent`` header value. + per_endpoint_circuit_breaker: When ``True``, maintain an + independent breaker per request path. Default ``False`` + preserves the original single-shared-breaker behaviour. """ raw_key = api_key.get_secret_value() if isinstance(api_key, SecretStr) else api_key if not raw_key or not raw_key.strip(): @@ -92,7 +100,11 @@ def __init__( self.base_url = (base_url or self.DEFAULT_BASE_URL).rstrip("/") self.timeout = timeout self.retry_config = retry_config or DEFAULT_RETRY_CONFIG + self._circuit_breaker_config = circuit_breaker_config self._circuit_breaker = CircuitBreaker(circuit_breaker_config) + self._per_endpoint_circuit_breaker = per_endpoint_circuit_breaker + self._endpoint_breakers: dict[str, CircuitBreaker] = {} + self._endpoint_breakers_lock = threading.Lock() self._client = httpx.AsyncClient( base_url=self.base_url, @@ -120,9 +132,52 @@ async def __aexit__(self, *args: Any) -> None: @property def circuit_breaker(self) -> CircuitBreaker: - """Access the circuit breaker state (for monitoring).""" + """Access the (shared) circuit breaker state (for monitoring). + + In per-endpoint mode this returns the original shared breaker, kept + for backward compatibility; the per-path breakers are exposed via + :meth:`circuit_breaker_state_for`. + """ return self._circuit_breaker + @staticmethod + def _breaker_key(path: str) -> str: + """Return the dict key for a request path: drop query string and fragment.""" + for sep in ("?", "#"): + idx = path.find(sep) + if idx != -1: + path = path[:idx] + return path + + def _breaker_for(self, path: str) -> CircuitBreaker: + """Return the breaker that governs ``path`` (shared, or per-path).""" + if not self._per_endpoint_circuit_breaker: + return self._circuit_breaker + key = self._breaker_key(path) + with self._endpoint_breakers_lock: + breaker = self._endpoint_breakers.get(key) + if breaker is None: + breaker = CircuitBreaker(self._circuit_breaker_config) + self._endpoint_breakers[key] = breaker + return breaker + + def circuit_breaker_state_for(self, path: str) -> CircuitState: + """Return the circuit state of the breaker governing ``path``. + + Only valid when the client was constructed with + ``per_endpoint_circuit_breaker=True``. Untouched paths report + :attr:`CircuitState.CLOSED` without allocating a breaker. + """ + if not self._per_endpoint_circuit_breaker: + raise RuntimeError( + "circuit_breaker_state_for() requires per_endpoint_circuit_breaker=True; " + "use the .circuit_breaker property for the shared breaker.", + ) + key = self._breaker_key(path) + with self._endpoint_breakers_lock: + breaker = self._endpoint_breakers.get(key) + return breaker.state if breaker is not None else CircuitState.CLOSED + # ==================== Error Handling ==================== def _parse_error_body(self, response: httpx.Response) -> dict[str, Any]: @@ -236,7 +291,7 @@ async def _execute_single_attempt( if response.status_code >= 400: return response - self._circuit_breaker.record_success() + self._breaker_for(path).record_success() if response.status_code == 204: return {} return response.json() # type: ignore[no-any-return] @@ -262,12 +317,12 @@ async def _request( Raises: HyperpingAPIError: On API errors after retries exhausted """ - if not self._circuit_breaker.call_allowed(): - cb = self._circuit_breaker + breaker = self._breaker_for(path) + if not breaker.call_allowed(): raise HyperpingAPIError( f"Circuit breaker OPEN - API calls suspended. " - f"Consecutive failures: {cb.failure_count}. " - f"Will recover after {cb.recovery_timeout}s." + f"Consecutive failures: {breaker.failure_count}. " + f"Will recover after {breaker.recovery_timeout}s." ) last_exception: Exception | None = None @@ -299,7 +354,7 @@ async def _request( continue if response.status_code >= 500: - self._circuit_breaker.record_failure() + breaker.record_failure() self._handle_response_error(response) except (httpx.TimeoutException, httpx.RequestError) as e: @@ -320,7 +375,7 @@ async def _request( self.retry_config.max_delay, ) continue - self._circuit_breaker.record_failure() + breaker.record_failure() if isinstance(e, httpx.TimeoutException): raise HyperpingAPIError(f"Request timeout after {max_attempts} attempts") from e raise HyperpingAPIError(f"Request failed: {e}") from e diff --git a/src/hyperping/client.py b/src/hyperping/client.py index dfbcbe9..eebcbff 100644 --- a/src/hyperping/client.py +++ b/src/hyperping/client.py @@ -10,6 +10,7 @@ import logging import random +import threading import time from dataclasses import dataclass from typing import Any @@ -88,6 +89,7 @@ def __init__( retry_config: RetryConfig | None = None, circuit_breaker_config: CircuitBreakerConfig | None = None, user_agent: str | None = None, + per_endpoint_circuit_breaker: bool = False, ) -> None: """Initialize the Hyperping API client. @@ -100,9 +102,15 @@ def __init__( retry_config: Retry behaviour configuration. Pass ``None`` for defaults (3 retries, exponential backoff). circuit_breaker_config: Circuit breaker configuration. Pass ``None`` - for defaults (5-failure threshold, 60 s recovery). + for defaults (5-failure threshold, 60 s recovery). When + ``per_endpoint_circuit_breaker`` is ``True`` this same config + is applied to every per-path breaker. user_agent: Custom ``User-Agent`` header value. Defaults to ``hyperping-python/0.1.0``. + per_endpoint_circuit_breaker: When ``True``, maintain an independent + breaker per request path so a single flaky endpoint does not + block traffic to healthy ones. Default ``False`` preserves the + original single-shared-breaker behaviour. """ raw_key = api_key.get_secret_value() if isinstance(api_key, SecretStr) else api_key if not raw_key or not raw_key.strip(): @@ -111,7 +119,11 @@ def __init__( self.base_url = (base_url or self.DEFAULT_BASE_URL).rstrip("/") self.timeout = timeout self.retry_config = retry_config or DEFAULT_RETRY_CONFIG + self._circuit_breaker_config = circuit_breaker_config self._circuit_breaker = CircuitBreaker(circuit_breaker_config) + self._per_endpoint_circuit_breaker = per_endpoint_circuit_breaker + self._endpoint_breakers: dict[str, CircuitBreaker] = {} + self._endpoint_breakers_lock = threading.Lock() self._client = httpx.Client( base_url=self.base_url, @@ -139,9 +151,56 @@ def __exit__(self, *args: Any) -> None: @property def circuit_breaker(self) -> CircuitBreaker: - """Access the circuit breaker state (for monitoring).""" + """Access the (shared) circuit breaker state (for monitoring). + + In per-endpoint mode this returns the original shared breaker, kept + for backward compatibility; the per-path breakers are exposed via + :meth:`circuit_breaker_state_for`. + """ return self._circuit_breaker + @staticmethod + def _breaker_key(path: str) -> str: + """Return the dict key for a request path: drop query string and fragment.""" + for sep in ("?", "#"): + idx = path.find(sep) + if idx != -1: + path = path[:idx] + return path + + def _breaker_for(self, path: str) -> CircuitBreaker: + """Return the breaker that governs ``path``. + + In default mode this is always the shared breaker; in per-endpoint + mode each path gets its own ``CircuitBreaker`` lazily. + """ + if not self._per_endpoint_circuit_breaker: + return self._circuit_breaker + key = self._breaker_key(path) + with self._endpoint_breakers_lock: + breaker = self._endpoint_breakers.get(key) + if breaker is None: + breaker = CircuitBreaker(self._circuit_breaker_config) + self._endpoint_breakers[key] = breaker + return breaker + + def circuit_breaker_state_for(self, path: str) -> CircuitState: + """Return the circuit state of the breaker governing ``path``. + + Only valid when the client was constructed with + ``per_endpoint_circuit_breaker=True``. Untouched paths report + :attr:`CircuitState.CLOSED` without allocating a breaker. + """ + if not self._per_endpoint_circuit_breaker: + raise RuntimeError( + "circuit_breaker_state_for() requires per_endpoint_circuit_breaker=True; " + "use the .circuit_breaker property for the shared breaker.", + ) + key = self._breaker_key(path) + with self._endpoint_breakers_lock: + breaker = self._endpoint_breakers.get(key) + return breaker.state if breaker is not None else CircuitState.CLOSED + # ==================== Error Handling ==================== def _parse_error_body(self, response: httpx.Response) -> dict[str, Any]: @@ -310,7 +369,7 @@ def _execute_single_attempt( return response # Success - self._circuit_breaker.record_success() + self._breaker_for(path).record_success() if response.status_code == 204: return {} return response.json() # type: ignore[no-any-return] @@ -336,12 +395,12 @@ def _request( Raises: HyperpingAPIError: On API errors after retries exhausted """ - if not self._circuit_breaker.call_allowed(): - cb = self._circuit_breaker + breaker = self._breaker_for(path) + if not breaker.call_allowed(): raise HyperpingAPIError( f"Circuit breaker OPEN - API calls suspended. " - f"Consecutive failures: {cb.failure_count}. " - f"Will recover after {cb.recovery_timeout}s." + f"Consecutive failures: {breaker.failure_count}. " + f"Will recover after {breaker.recovery_timeout}s." ) last_exception: Exception | None = None @@ -374,7 +433,7 @@ def _request( # Only trip circuit breaker on server errors, not client errors if response.status_code >= 500: - self._circuit_breaker.record_failure() + breaker.record_failure() self._handle_response_error(response) except (httpx.TimeoutException, httpx.RequestError) as e: @@ -395,7 +454,7 @@ def _request( self.retry_config.max_delay, ) continue - self._circuit_breaker.record_failure() + breaker.record_failure() if isinstance(e, httpx.TimeoutException): raise HyperpingAPIError(f"Request timeout after {max_attempts} attempts") from e raise HyperpingAPIError(f"Request failed: {e}") from e diff --git a/tests/unit/test_per_endpoint_circuit_breaker.py b/tests/unit/test_per_endpoint_circuit_breaker.py new file mode 100644 index 0000000..4ad4719 --- /dev/null +++ b/tests/unit/test_per_endpoint_circuit_breaker.py @@ -0,0 +1,223 @@ +"""Tests for the per-endpoint circuit breaker option (PY-03). + +The default behaviour (single shared breaker) is exercised by the existing +breaker tests in ``test_sdk_surface.py``, ``test_monitors.py``, and +``test_async_client.py``. These tests cover the new opt-in path: + + HyperpingClient(..., per_endpoint_circuit_breaker=True) + +with isolation between paths, a per-path state accessor, async parity, and +thread safety on the per-path breaker dict. +""" + +from __future__ import annotations + +import threading + +import httpx +import pytest +import pytest_asyncio +import respx + +from hyperping._async_client import AsyncHyperpingClient +from hyperping._circuit_breaker import CircuitBreakerConfig, CircuitState +from hyperping.client import HyperpingClient, RetryConfig +from hyperping.endpoints import API_BASE, Endpoint +from hyperping.exceptions import HyperpingAPIError + + +def _cb_config(threshold: int = 2) -> CircuitBreakerConfig: + """Tight threshold so tests trip the breaker quickly without long timeouts.""" + return CircuitBreakerConfig(failure_threshold=threshold, recovery_timeout=60.0) + + +# ==================== sync ==================== + + +class TestPerEndpointCircuitBreakerSync: + """Per-endpoint isolation on HyperpingClient.""" + + @respx.mock + def test_per_endpoint_isolation(self) -> None: + """A failing endpoint trips its own breaker; a healthy endpoint is unaffected.""" + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=2), + per_endpoint_circuit_breaker=True, + ) + + respx.get(f"{API_BASE}{Endpoint.MONITORS}").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + respx.get(f"{API_BASE}{Endpoint.INCIDENTS}").mock( + return_value=httpx.Response(200, json={"incidents": []}), + ) + + # Trip /v1/monitors + for _ in range(2): + with pytest.raises(HyperpingAPIError): + client.list_monitors() + + # /v1/monitors breaker is now OPEN; further calls fail-fast. + with pytest.raises(HyperpingAPIError, match="Circuit breaker OPEN"): + client.list_monitors() + + # /v3/incidents breaker is untouched and the call succeeds. + assert client.list_incidents() == [] + + assert client.circuit_breaker_state_for(str(Endpoint.MONITORS)) == CircuitState.OPEN + assert client.circuit_breaker_state_for(str(Endpoint.INCIDENTS)) == CircuitState.CLOSED + + client.close() + + @respx.mock + def test_per_endpoint_state_query_strips_query_string(self) -> None: + """``circuit_breaker_state_for`` keys on path only, ignoring query/fragment.""" + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=1), + per_endpoint_circuit_breaker=True, + ) + + respx.get(f"{API_BASE}{Endpoint.INCIDENTS}").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + + with pytest.raises(HyperpingAPIError): + client.list_incidents(status="investigating") + + # The request used a path with a query string, but the breaker key strips it. + assert client.circuit_breaker_state_for(str(Endpoint.INCIDENTS)) == CircuitState.OPEN + assert ( + client.circuit_breaker_state_for(f"{Endpoint.INCIDENTS}?status=investigating") + == CircuitState.OPEN + ) + + client.close() + + @respx.mock + def test_default_behaviour_unchanged(self) -> None: + """With the flag off (default), a 5xx on one path trips the shared breaker for all paths.""" + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=1), + ) + + respx.get(f"{API_BASE}{Endpoint.MONITORS}").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + + with pytest.raises(HyperpingAPIError): + client.list_monitors() + + # Shared breaker is OPEN; even an unrelated path (which has no mock route) + # is rejected without an HTTP call. + with pytest.raises(HyperpingAPIError, match="Circuit breaker OPEN"): + client.list_incidents() + + assert client.circuit_breaker.state == CircuitState.OPEN + client.close() + + def test_state_for_unknown_path_is_closed(self) -> None: + """Querying a path that has not been touched returns CLOSED (no breaker created).""" + client = HyperpingClient( + api_key="sk_test", + per_endpoint_circuit_breaker=True, + ) + assert client.circuit_breaker_state_for("/v1/unused") == CircuitState.CLOSED + client.close() + + def test_state_for_requires_per_endpoint_mode(self) -> None: + """Calling the per-path accessor without the flag is a misuse — surface it.""" + client = HyperpingClient(api_key="sk_test") + with pytest.raises(RuntimeError, match="per_endpoint_circuit_breaker"): + client.circuit_breaker_state_for("/v1/monitors") + client.close() + + @respx.mock + def test_per_endpoint_threadsafe(self) -> None: + """50 concurrent calls across two paths: failing path opens, healthy path stays closed.""" + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=3), + per_endpoint_circuit_breaker=True, + ) + + respx.get(f"{API_BASE}{Endpoint.MONITORS}").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + respx.get(f"{API_BASE}{Endpoint.INCIDENTS}").mock( + return_value=httpx.Response(200, json={"incidents": []}), + ) + + def hit_monitors() -> None: + try: + client.list_monitors() + except HyperpingAPIError: + pass + + def hit_incidents() -> None: + client.list_incidents() + + threads: list[threading.Thread] = [] + for i in range(50): + target = hit_monitors if i % 2 == 0 else hit_incidents + threads.append(threading.Thread(target=target)) + for t in threads: + t.start() + for t in threads: + t.join() + + assert client.circuit_breaker_state_for(str(Endpoint.MONITORS)) == CircuitState.OPEN + assert client.circuit_breaker_state_for(str(Endpoint.INCIDENTS)) == CircuitState.CLOSED + + client.close() + + +# ==================== async ==================== + + +@pytest_asyncio.fixture +async def per_endpoint_async_client(): + client = AsyncHyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=2), + per_endpoint_circuit_breaker=True, + ) + yield client + await client.close() + + +class TestPerEndpointCircuitBreakerAsync: + """Per-endpoint isolation on AsyncHyperpingClient.""" + + @pytest.mark.asyncio + @respx.mock + async def test_per_endpoint_async( + self, per_endpoint_async_client: AsyncHyperpingClient + ) -> None: + client = per_endpoint_async_client + + respx.get(f"{API_BASE}{Endpoint.MONITORS}").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + respx.get(f"{API_BASE}{Endpoint.INCIDENTS}").mock( + return_value=httpx.Response(200, json={"incidents": []}), + ) + + for _ in range(2): + with pytest.raises(HyperpingAPIError): + await client.list_monitors() + + with pytest.raises(HyperpingAPIError, match="Circuit breaker OPEN"): + await client.list_monitors() + + assert await client.list_incidents() == [] + + assert client.circuit_breaker_state_for(str(Endpoint.MONITORS)) == CircuitState.OPEN + assert client.circuit_breaker_state_for(str(Endpoint.INCIDENTS)) == CircuitState.CLOSED From 48730f67058337c320745260b71011923edb296d Mon Sep 17 00:00:00 2001 From: Khaled Salhab Date: Wed, 6 May 2026 11:03:33 +0300 Subject: [PATCH 2/2] refactor: bucket per-endpoint breakers by Endpoint prefix Address review feedback on the initial per-endpoint breaker implementation. The previous version keyed the breaker dict on the literal request path. For paths that embed resource UUIDs (`/v1/monitors/{uuid}`, etc.) this meant every resource ended up with its own breaker, the dict could grow unbounded in long-lived clients, and the README's `circuit_breaker_state_for(Endpoint.MONITORS)` example only saw list-call state, not per-resource calls. Changes: - Default breaker key now collapses sub-resource paths under their matching `Endpoint` prefix (`/v1/monitors`, `/v3/incidents`, ...). The breaker set is bounded by the number of Endpoint values, not by resource cardinality. - New `breaker_key_fn: Callable[[str], str] | None` lets callers opt into a different scheme (per-UUID, per-verb, etc.) and explicitly own bounding. - OPEN-state error message in per-endpoint mode now includes the breaker key ("Circuit breaker OPEN for '/v1/monitors' - ..."), so the failure points at which endpoint tripped. - `circuit_breaker_state_for(path)` now falls back to the shared breaker's state in default mode instead of raising, so the call is always safe to make and toggling the flag at construction time doesn't change the API surface callers see. - `_breaker_key` rewritten with `urllib.parse.urlsplit` and the dict-lock comment explains the `threading.Lock` choice for the async client. Tests added for endpoint canonicalisation, custom `breaker_key_fn`, default- mode `circuit_breaker_state_for` returning shared state, and the OPEN error message including the endpoint key. --- CHANGELOG.md | 12 +- README.md | 27 +++- src/hyperping/_async_client.py | 80 ++++++--- src/hyperping/client.py | 95 +++++++---- .../unit/test_per_endpoint_circuit_breaker.py | 152 +++++++++++++++++- 5 files changed, 300 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f7ecc9..5c60503 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Per-endpoint circuit breaker option (`per_endpoint_circuit_breaker: bool = False`) on - `HyperpingClient` and `AsyncHyperpingClient`. When enabled, each request path gets its - own breaker state so a single flaky endpoint no longer blocks traffic to healthy ones. - Default behaviour is unchanged. State for a given path is readable via - `client.circuit_breaker_state_for(path)`. See README for details. + `HyperpingClient` and `AsyncHyperpingClient`. When enabled, each `Endpoint` gets its own + breaker state so a single flaky endpoint no longer blocks traffic to healthy ones. + Sub-resource paths (e.g. `/v1/monitors/{uuid}`, `/v1/monitors/{uuid}/reports`) are + bucketed under their parent `Endpoint` prefix so the breaker set stays bounded; pass a + custom `breaker_key_fn` to change that. The OPEN-state error message now identifies + which endpoint tripped. State for a given path is readable via + `client.circuit_breaker_state_for(path)` in either mode. Default behaviour is + unchanged. See README for details. ## [1.5.0] - 2026-04-20 diff --git a/README.md b/README.md index 89dbd23..6088078 100644 --- a/README.md +++ b/README.md @@ -254,7 +254,7 @@ client = HyperpingClient( ### Per-endpoint circuit breaker -By default a single shared circuit breaker covers every request. If one endpoint flakes, every other endpoint is also blocked. Enable `per_endpoint_circuit_breaker=True` to keep one breaker per request path so a failing endpoint does not punish healthy ones: +By default a single shared circuit breaker covers every request. If one endpoint flakes, every other endpoint is also blocked. Enable `per_endpoint_circuit_breaker=True` to keep one breaker per *endpoint* so a failing endpoint does not punish healthy ones: ```python client = HyperpingClient( @@ -262,18 +262,37 @@ client = HyperpingClient( per_endpoint_circuit_breaker=True, ) -# Inspect state for a specific path: +# Inspect state for an endpoint. The breaker key is canonicalised to the +# matching `Endpoint` prefix, so all sub-resource paths share a bucket: from hyperping import CircuitState, Endpoint state = client.circuit_breaker_state_for(str(Endpoint.MONITORS)) +# /v1/monitors, /v1/monitors/mon_abc and /v1/monitors/mon_abc/reports all +# report the same state — they share the `/v1/monitors` breaker. +assert client.circuit_breaker_state_for(f"{Endpoint.MONITORS}/mon_abc") == state assert state in {CircuitState.CLOSED, CircuitState.HALF_OPEN, CircuitState.OPEN} ``` +If you need different bucketing (e.g. one breaker per resource UUID, or a single breaker per HTTP verb), pass a `breaker_key_fn`: + +```python +def per_resource(path: str) -> str: + # one breaker per literal request path + return path.split("?", 1)[0] + +client = HyperpingClient( + api_key="sk_...", + per_endpoint_circuit_breaker=True, + breaker_key_fn=per_resource, +) +``` + | Option | Type | Default | Description | | --- | --- | --- | --- | -| `per_endpoint_circuit_breaker` | `bool` | `False` | When `True`, maintain a separate circuit breaker per request path (query string and fragment ignored). The same `circuit_breaker_config` applies to every per-path breaker. The shared breaker remains accessible via `client.circuit_breaker`; per-path state is read via `client.circuit_breaker_state_for(path)`. | +| `per_endpoint_circuit_breaker` | `bool` | `False` | When `True`, maintain a separate circuit breaker keyed by request endpoint instead of using one shared breaker. The same `circuit_breaker_config` applies to every per-endpoint breaker. The shared breaker remains accessible via `client.circuit_breaker`. | +| `breaker_key_fn` | `Callable[[str], str] \| None` | `None` | Override the default endpoint-prefix bucketing. Receives the request path and returns the breaker key. Default behaviour collapses every path under the matching `Endpoint` prefix so the breaker set stays bounded (one per `Endpoint`); a custom function takes responsibility for keeping the key set bounded. Ignored unless `per_endpoint_circuit_breaker=True`. | -The same option is available on `AsyncHyperpingClient`. +State for any path is readable via `client.circuit_breaker_state_for(path)`. In the default (single-breaker) mode this returns the shared breaker's state for any path, so the call is always safe regardless of the flag. The same options and method are available on `AsyncHyperpingClient`. ## Type Safety diff --git a/src/hyperping/_async_client.py b/src/hyperping/_async_client.py index 2850303..bd4e332 100644 --- a/src/hyperping/_async_client.py +++ b/src/hyperping/_async_client.py @@ -15,7 +15,9 @@ import logging import random import threading +from collections.abc import Callable from typing import Any +from urllib.parse import urlsplit import httpx from pydantic import SecretStr @@ -33,7 +35,7 @@ ) from hyperping._internals import DEFAULT_USER_AGENT, RETRY_AFTER_MAX, sanitize_for_log from hyperping.client import DEFAULT_RETRY_CONFIG, RetryConfig -from hyperping.endpoints import API_BASE +from hyperping.endpoints import API_BASE, Endpoint from hyperping.exceptions import ( HyperpingAPIError, HyperpingAuthError, @@ -76,6 +78,7 @@ def __init__( circuit_breaker_config: CircuitBreakerConfig | None = None, user_agent: str | None = None, per_endpoint_circuit_breaker: bool = False, + breaker_key_fn: Callable[[str], str] | None = None, ) -> None: """Initialize the async Hyperping API client. @@ -87,11 +90,17 @@ def __init__( retry_config: Retry behaviour configuration. circuit_breaker_config: Circuit breaker configuration. When ``per_endpoint_circuit_breaker`` is ``True`` the same config is - applied to each per-path breaker. + applied to each per-endpoint breaker. user_agent: Custom ``User-Agent`` header value. per_endpoint_circuit_breaker: When ``True``, maintain an - independent breaker per request path. Default ``False`` + independent breaker per :class:`~hyperping.endpoints.Endpoint` + prefix (sub-resources inherit the parent endpoint's breaker, + so the breaker set stays bounded). Default ``False`` preserves the original single-shared-breaker behaviour. + breaker_key_fn: Override the default endpoint-prefix bucketing. + Receives the request path and must return the breaker key. + Ignored unless ``per_endpoint_circuit_breaker`` is ``True``. + Caller is responsible for keeping the key set bounded. """ raw_key = api_key.get_secret_value() if isinstance(api_key, SecretStr) else api_key if not raw_key or not raw_key.strip(): @@ -103,6 +112,7 @@ def __init__( self._circuit_breaker_config = circuit_breaker_config self._circuit_breaker = CircuitBreaker(circuit_breaker_config) self._per_endpoint_circuit_breaker = per_endpoint_circuit_breaker + self._breaker_key_fn = breaker_key_fn self._endpoint_breakers: dict[str, CircuitBreaker] = {} self._endpoint_breakers_lock = threading.Lock() @@ -140,20 +150,29 @@ def circuit_breaker(self) -> CircuitBreaker: """ return self._circuit_breaker - @staticmethod - def _breaker_key(path: str) -> str: - """Return the dict key for a request path: drop query string and fragment.""" - for sep in ("?", "#"): - idx = path.find(sep) - if idx != -1: - path = path[:idx] - return path + def _resolve_breaker_key(self, path: str) -> str: + """Map a request path to its circuit-breaker key. + + Default bucketing strips query/fragment and collapses the path under + the longest matching :class:`Endpoint` prefix; a custom + ``breaker_key_fn`` wins outright. + """ + if self._breaker_key_fn is not None: + return self._breaker_key_fn(path) + pure = urlsplit(path).path + for ep in Endpoint: + ep_value = ep.value + if pure == ep_value or pure.startswith(ep_value + "/"): + return ep_value + return pure def _breaker_for(self, path: str) -> CircuitBreaker: - """Return the breaker that governs ``path`` (shared, or per-path).""" + """Return the breaker that governs ``path`` (shared, or per-endpoint).""" if not self._per_endpoint_circuit_breaker: return self._circuit_breaker - key = self._breaker_key(path) + key = self._resolve_breaker_key(path) + # threading.Lock here is intentional: see HyperpingClient._breaker_for + # for the rationale (works under both pure-asyncio and mixed-thread use). with self._endpoint_breakers_lock: breaker = self._endpoint_breakers.get(key) if breaker is None: @@ -164,20 +183,33 @@ def _breaker_for(self, path: str) -> CircuitBreaker: def circuit_breaker_state_for(self, path: str) -> CircuitState: """Return the circuit state of the breaker governing ``path``. - Only valid when the client was constructed with - ``per_endpoint_circuit_breaker=True``. Untouched paths report - :attr:`CircuitState.CLOSED` without allocating a breaker. + In per-endpoint mode the path is canonicalised the same way as during + a request; untouched buckets report :attr:`CircuitState.CLOSED` + without allocating a breaker. In default mode the shared breaker's + state is returned for any path. """ if not self._per_endpoint_circuit_breaker: - raise RuntimeError( - "circuit_breaker_state_for() requires per_endpoint_circuit_breaker=True; " - "use the .circuit_breaker property for the shared breaker.", - ) - key = self._breaker_key(path) + return self._circuit_breaker.state + key = self._resolve_breaker_key(path) with self._endpoint_breakers_lock: breaker = self._endpoint_breakers.get(key) return breaker.state if breaker is not None else CircuitState.CLOSED + def _circuit_open_message(self, breaker: CircuitBreaker, path: str) -> str: + """Build the error message raised when a request is rejected by an OPEN breaker.""" + if self._per_endpoint_circuit_breaker: + key = self._resolve_breaker_key(path) + return ( + f"Circuit breaker OPEN for {key!r} - API calls to this endpoint suspended. " + f"Consecutive failures: {breaker.failure_count}. " + f"Will recover after {breaker.recovery_timeout}s." + ) + return ( + f"Circuit breaker OPEN - API calls suspended. " + f"Consecutive failures: {breaker.failure_count}. " + f"Will recover after {breaker.recovery_timeout}s." + ) + # ==================== Error Handling ==================== def _parse_error_body(self, response: httpx.Response) -> dict[str, Any]: @@ -319,11 +351,7 @@ async def _request( """ breaker = self._breaker_for(path) if not breaker.call_allowed(): - raise HyperpingAPIError( - f"Circuit breaker OPEN - API calls suspended. " - f"Consecutive failures: {breaker.failure_count}. " - f"Will recover after {breaker.recovery_timeout}s." - ) + raise HyperpingAPIError(self._circuit_open_message(breaker, path)) last_exception: Exception | None = None delay = self.retry_config.initial_delay diff --git a/src/hyperping/client.py b/src/hyperping/client.py index eebcbff..8763200 100644 --- a/src/hyperping/client.py +++ b/src/hyperping/client.py @@ -12,8 +12,10 @@ import random import threading import time +from collections.abc import Callable from dataclasses import dataclass from typing import Any +from urllib.parse import urlsplit import httpx from pydantic import SecretStr @@ -31,7 +33,7 @@ from hyperping._monitors_mixin import MonitorsMixin from hyperping._outages_mixin import OutagesMixin from hyperping._statuspages_mixin import StatusPagesMixin -from hyperping.endpoints import API_BASE +from hyperping.endpoints import API_BASE, Endpoint from hyperping.exceptions import ( HyperpingAPIError, HyperpingAuthError, @@ -90,6 +92,7 @@ def __init__( circuit_breaker_config: CircuitBreakerConfig | None = None, user_agent: str | None = None, per_endpoint_circuit_breaker: bool = False, + breaker_key_fn: Callable[[str], str] | None = None, ) -> None: """Initialize the Hyperping API client. @@ -108,9 +111,22 @@ def __init__( user_agent: Custom ``User-Agent`` header value. Defaults to ``hyperping-python/0.1.0``. per_endpoint_circuit_breaker: When ``True``, maintain an independent - breaker per request path so a single flaky endpoint does not - block traffic to healthy ones. Default ``False`` preserves the - original single-shared-breaker behaviour. + breaker per *endpoint* so a single flaky endpoint does not + block traffic to healthy ones. By default the breaker key is + the matching :class:`~hyperping.endpoints.Endpoint` prefix: + ``/v1/monitors``, ``/v1/monitors/{uuid}`` and + ``/v1/monitors/{uuid}/anything`` all share one breaker keyed + on ``/v1/monitors``. This keeps the breaker set bounded + (one per Endpoint) instead of growing per resource UUID. + Default ``False`` preserves the original single-shared-breaker + behaviour. + breaker_key_fn: Override the default endpoint-prefix bucketing. + Receives the request path (with query/fragment intact) and + must return the breaker key. Use this if you want different + granularity (e.g. one breaker per resource UUID, or a single + breaker for all monitor sub-paths). Ignored unless + ``per_endpoint_circuit_breaker`` is ``True``. *Caller is + responsible for keeping the key set bounded.* """ raw_key = api_key.get_secret_value() if isinstance(api_key, SecretStr) else api_key if not raw_key or not raw_key.strip(): @@ -122,6 +138,7 @@ def __init__( self._circuit_breaker_config = circuit_breaker_config self._circuit_breaker = CircuitBreaker(circuit_breaker_config) self._per_endpoint_circuit_breaker = per_endpoint_circuit_breaker + self._breaker_key_fn = breaker_key_fn self._endpoint_breakers: dict[str, CircuitBreaker] = {} self._endpoint_breakers_lock = threading.Lock() @@ -159,24 +176,37 @@ def circuit_breaker(self) -> CircuitBreaker: """ return self._circuit_breaker - @staticmethod - def _breaker_key(path: str) -> str: - """Return the dict key for a request path: drop query string and fragment.""" - for sep in ("?", "#"): - idx = path.find(sep) - if idx != -1: - path = path[:idx] - return path + def _resolve_breaker_key(self, path: str) -> str: + """Map a request path to its circuit-breaker key. + + Default bucketing strips query/fragment and collapses the path under + the longest matching :class:`Endpoint` prefix, so every sub-resource + under an endpoint shares the parent's breaker. When the caller passes + a custom ``breaker_key_fn`` it wins outright. + """ + if self._breaker_key_fn is not None: + return self._breaker_key_fn(path) + pure = urlsplit(path).path + for ep in Endpoint: + ep_value = ep.value + if pure == ep_value or pure.startswith(ep_value + "/"): + return ep_value + return pure def _breaker_for(self, path: str) -> CircuitBreaker: """Return the breaker that governs ``path``. In default mode this is always the shared breaker; in per-endpoint - mode each path gets its own ``CircuitBreaker`` lazily. + mode each canonical key gets its own :class:`CircuitBreaker` lazily. """ if not self._per_endpoint_circuit_breaker: return self._circuit_breaker - key = self._breaker_key(path) + key = self._resolve_breaker_key(path) + # threading.Lock here (not asyncio.Lock) is intentional: it lets the + # same per-endpoint logic serve both the sync and async clients + # without forcing an `async` accessor, and it correctly serialises + # access if the async client is driven from multiple OS threads + # (e.g. via run_in_executor). with self._endpoint_breakers_lock: breaker = self._endpoint_breakers.get(key) if breaker is None: @@ -187,20 +217,35 @@ def _breaker_for(self, path: str) -> CircuitBreaker: def circuit_breaker_state_for(self, path: str) -> CircuitState: """Return the circuit state of the breaker governing ``path``. - Only valid when the client was constructed with - ``per_endpoint_circuit_breaker=True``. Untouched paths report - :attr:`CircuitState.CLOSED` without allocating a breaker. + In per-endpoint mode the path is canonicalised the same way as during + a request (default endpoint-prefix bucketing, or ``breaker_key_fn`` + if set); untouched buckets report :attr:`CircuitState.CLOSED` without + allocating a breaker. In the default single-breaker mode the shared + breaker's state is returned for any path, so this method is always + safe to call regardless of the flag. """ if not self._per_endpoint_circuit_breaker: - raise RuntimeError( - "circuit_breaker_state_for() requires per_endpoint_circuit_breaker=True; " - "use the .circuit_breaker property for the shared breaker.", - ) - key = self._breaker_key(path) + return self._circuit_breaker.state + key = self._resolve_breaker_key(path) with self._endpoint_breakers_lock: breaker = self._endpoint_breakers.get(key) return breaker.state if breaker is not None else CircuitState.CLOSED + def _circuit_open_message(self, breaker: CircuitBreaker, path: str) -> str: + """Build the error message raised when a request is rejected by an OPEN breaker.""" + if self._per_endpoint_circuit_breaker: + key = self._resolve_breaker_key(path) + return ( + f"Circuit breaker OPEN for {key!r} - API calls to this endpoint suspended. " + f"Consecutive failures: {breaker.failure_count}. " + f"Will recover after {breaker.recovery_timeout}s." + ) + return ( + f"Circuit breaker OPEN - API calls suspended. " + f"Consecutive failures: {breaker.failure_count}. " + f"Will recover after {breaker.recovery_timeout}s." + ) + # ==================== Error Handling ==================== def _parse_error_body(self, response: httpx.Response) -> dict[str, Any]: @@ -397,11 +442,7 @@ def _request( """ breaker = self._breaker_for(path) if not breaker.call_allowed(): - raise HyperpingAPIError( - f"Circuit breaker OPEN - API calls suspended. " - f"Consecutive failures: {breaker.failure_count}. " - f"Will recover after {breaker.recovery_timeout}s." - ) + raise HyperpingAPIError(self._circuit_open_message(breaker, path)) last_exception: Exception | None = None delay = self.retry_config.initial_delay diff --git a/tests/unit/test_per_endpoint_circuit_breaker.py b/tests/unit/test_per_endpoint_circuit_breaker.py index 4ad4719..f5e8219 100644 --- a/tests/unit/test_per_endpoint_circuit_breaker.py +++ b/tests/unit/test_per_endpoint_circuit_breaker.py @@ -31,6 +31,23 @@ def _cb_config(threshold: int = 2) -> CircuitBreakerConfig: return CircuitBreakerConfig(failure_threshold=threshold, recovery_timeout=60.0) +def _monitor_payload(uuid: str) -> dict: + """Minimal monitor payload that satisfies Monitor.model_validate().""" + return { + "monitorUuid": uuid, + "name": uuid, + "url": "https://example.com", + "method": "GET", + "frequency": 60, + "timeout": 10, + "regions": ["london"], + "headers": {}, + "expectedStatus": 200, + "down": False, + "paused": False, + } + + # ==================== sync ==================== @@ -130,11 +147,136 @@ def test_state_for_unknown_path_is_closed(self) -> None: assert client.circuit_breaker_state_for("/v1/unused") == CircuitState.CLOSED client.close() - def test_state_for_requires_per_endpoint_mode(self) -> None: - """Calling the per-path accessor without the flag is a misuse — surface it.""" - client = HyperpingClient(api_key="sk_test") - with pytest.raises(RuntimeError, match="per_endpoint_circuit_breaker"): - client.circuit_breaker_state_for("/v1/monitors") + @respx.mock + def test_state_for_default_mode_returns_shared_state(self) -> None: + """In default mode, state_for(any_path) reflects the single shared breaker.""" + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=1), + ) + # Untripped: any path reports CLOSED, identical to the shared breaker. + assert client.circuit_breaker_state_for("/v1/monitors") == CircuitState.CLOSED + assert client.circuit_breaker_state_for("/anything") == CircuitState.CLOSED + + respx.get(f"{API_BASE}{Endpoint.MONITORS}").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + with pytest.raises(HyperpingAPIError): + client.list_monitors() + + # Shared breaker is now OPEN; state_for reports OPEN regardless of path. + assert client.circuit_breaker.state == CircuitState.OPEN + assert client.circuit_breaker_state_for("/v1/monitors") == CircuitState.OPEN + assert client.circuit_breaker_state_for("/v3/incidents") == CircuitState.OPEN + client.close() + + @respx.mock + def test_default_canonicalization_buckets_by_endpoint(self) -> None: + """`/v1/monitors/{uuid}` and `/v1/monitors` share one breaker; other endpoints don't.""" + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=2), + per_endpoint_circuit_breaker=True, + ) + + # Two different monitor UUIDs both fail with 5xx. + respx.get(f"{API_BASE}{Endpoint.MONITORS}/mon_A").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + respx.get(f"{API_BASE}{Endpoint.MONITORS}/mon_B").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + respx.get(f"{API_BASE}{Endpoint.INCIDENTS}").mock( + return_value=httpx.Response(200, json={"incidents": []}), + ) + + # Two failures on mon_A trip the shared `/v1/monitors` breaker. + with pytest.raises(HyperpingAPIError): + client.get_monitor("mon_A") + with pytest.raises(HyperpingAPIError): + client.get_monitor("mon_A") + + # mon_B is now blocked too — it shares the `/v1/monitors` bucket. No HTTP + # request is made (no mock interaction needed beyond the route). + with pytest.raises(HyperpingAPIError, match="Circuit breaker OPEN"): + client.get_monitor("mon_B") + + # The list endpoint also falls under `/v1/monitors` and is blocked. + with pytest.raises(HyperpingAPIError, match="Circuit breaker OPEN"): + client.list_monitors() + + # A different endpoint (`/v3/incidents`) is unaffected. + assert client.list_incidents() == [] + + # State queries: any monitor sub-path resolves to the same key. + assert client.circuit_breaker_state_for(f"{Endpoint.MONITORS}/mon_A") == CircuitState.OPEN + assert client.circuit_breaker_state_for(f"{Endpoint.MONITORS}/mon_B") == CircuitState.OPEN + assert client.circuit_breaker_state_for(str(Endpoint.MONITORS)) == CircuitState.OPEN + assert client.circuit_breaker_state_for(str(Endpoint.INCIDENTS)) == CircuitState.CLOSED + + client.close() + + @respx.mock + def test_custom_breaker_key_fn(self) -> None: + """A caller-supplied key fn overrides the default endpoint bucketing.""" + seen: list[str] = [] + + def per_uuid(path: str) -> str: + seen.append(path) + # Force one breaker per literal path (the pre-canonicalisation behaviour). + return path + + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=2), + per_endpoint_circuit_breaker=True, + breaker_key_fn=per_uuid, + ) + + respx.get(f"{API_BASE}{Endpoint.MONITORS}/mon_A").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + respx.get(f"{API_BASE}{Endpoint.MONITORS}/mon_B").mock( + return_value=httpx.Response(200, json=_monitor_payload("mon_B")), + ) + + with pytest.raises(HyperpingAPIError): + client.get_monitor("mon_A") + with pytest.raises(HyperpingAPIError): + client.get_monitor("mon_A") + + # With a per-UUID key fn, mon_B has its own breaker and the call goes through. + result = client.get_monitor("mon_B") + assert result.uuid == "mon_B" + + assert seen, "custom key fn was not invoked" + assert client.circuit_breaker_state_for(f"{Endpoint.MONITORS}/mon_A") == CircuitState.OPEN + assert client.circuit_breaker_state_for(f"{Endpoint.MONITORS}/mon_B") == CircuitState.CLOSED + + client.close() + + @respx.mock + def test_open_error_message_includes_endpoint_key(self) -> None: + """OPEN error message identifies which endpoint was tripped.""" + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=1), + per_endpoint_circuit_breaker=True, + ) + respx.get(f"{API_BASE}{Endpoint.MONITORS}").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + + with pytest.raises(HyperpingAPIError): + client.list_monitors() + + with pytest.raises(HyperpingAPIError, match=r"Circuit breaker OPEN for '/v1/monitors'"): + client.list_monitors() + client.close() @respx.mock