diff --git a/BACKLOG.md b/BACKLOG.md index fcc0244..4ec41c7 100644 --- a/BACKLOG.md +++ b/BACKLOG.md @@ -46,7 +46,7 @@ Agents: Security, Architecture, Code Quality, Refactor/Dead Code. - [x] **M1** Async client (`AsyncHyperpingClient`) — shipped in PR #13 (feature/sdk-py-03-async-client). - [x] **M2** Pagination (`page` param, `hasNextPage` auto-pagination via `collect_all_pages` / `collect_all_pages_async`) — shipped in PR #12 (merged) and PR #13. -- [ ] **M3** Per-endpoint circuit breaker (`per_endpoint_circuit_breaker: bool = False` option). **Deferred — moderate scope, add in follow-up.** +- [x] **M3** Per-endpoint circuit breaker (`per_endpoint_circuit_breaker: bool = False` option) on `HyperpingClient` and `AsyncHyperpingClient`. Per-path state via `circuit_breaker_state_for(path)`. Default off; existing single-shared-breaker behaviour preserved. - [x] **M4** `MonitorCreate` now has `@model_validator(mode="after")` that raises `ValueError` if DNS fields are set on non-DNS monitors. - [x] **M5** `MonitorListResponse` is in `__all__` — retained but documented as not returned by any client method. Will be used once pagination lands. - [x] **M6** `APIErrorResponse` removed from `__all__` (documented as intentionally internal in comment). @@ -106,7 +106,7 @@ The following items require either a semver bump, a separate PR, or manual work: - ~~**M1** Async client~~ — shipped - ~~**M2** Pagination~~ — shipped -- **M3** Per-endpoint circuit breaker option +- ~~**M3** Per-endpoint circuit breaker option~~ — shipped - **M11** URL validation for HTTP-protocol monitors (cross-field, needs discriminated union work) - **M12** DateTime coercion (breaking change — v0.2.0) - **H11** Pin all GitHub Actions `uses:` to 40-char commit SHAs (requires per-tag SHA lookup) diff --git a/CHANGELOG.md b/CHANGELOG.md index dfbf692..5c60503 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Per-endpoint circuit breaker option (`per_endpoint_circuit_breaker: bool = False`) on + `HyperpingClient` and `AsyncHyperpingClient`. When enabled, each `Endpoint` gets its own + breaker state so a single flaky endpoint no longer blocks traffic to healthy ones. + Sub-resource paths (e.g. `/v1/monitors/{uuid}`, `/v1/monitors/{uuid}/reports`) are + bucketed under their parent `Endpoint` prefix so the breaker set stays bounded; pass a + custom `breaker_key_fn` to change that. The OPEN-state error message now identifies + which endpoint tripped. State for a given path is readable via + `client.circuit_breaker_state_for(path)` in either mode. Default behaviour is + unchanged. See README for details. + ## [1.5.0] - 2026-04-20 ### Added diff --git a/README.md b/README.md index ae6c7b6..6088078 100644 --- a/README.md +++ b/README.md @@ -252,6 +252,48 @@ client = HyperpingClient( ) ``` +### Per-endpoint circuit breaker + +By default a single shared circuit breaker covers every request. If one endpoint flakes, every other endpoint is also blocked. Enable `per_endpoint_circuit_breaker=True` to keep one breaker per *endpoint* so a failing endpoint does not punish healthy ones: + +```python +client = HyperpingClient( + api_key="sk_...", + per_endpoint_circuit_breaker=True, +) + +# Inspect state for an endpoint. The breaker key is canonicalised to the +# matching `Endpoint` prefix, so all sub-resource paths share a bucket: +from hyperping import CircuitState, Endpoint + +state = client.circuit_breaker_state_for(str(Endpoint.MONITORS)) +# /v1/monitors, /v1/monitors/mon_abc and /v1/monitors/mon_abc/reports all +# report the same state — they share the `/v1/monitors` breaker. +assert client.circuit_breaker_state_for(f"{Endpoint.MONITORS}/mon_abc") == state +assert state in {CircuitState.CLOSED, CircuitState.HALF_OPEN, CircuitState.OPEN} +``` + +If you need different bucketing (e.g. one breaker per resource UUID, or a single breaker per HTTP verb), pass a `breaker_key_fn`: + +```python +def per_resource(path: str) -> str: + # one breaker per literal request path + return path.split("?", 1)[0] + +client = HyperpingClient( + api_key="sk_...", + per_endpoint_circuit_breaker=True, + breaker_key_fn=per_resource, +) +``` + +| Option | Type | Default | Description | +| --- | --- | --- | --- | +| `per_endpoint_circuit_breaker` | `bool` | `False` | When `True`, maintain a separate circuit breaker keyed by request endpoint instead of using one shared breaker. The same `circuit_breaker_config` applies to every per-endpoint breaker. The shared breaker remains accessible via `client.circuit_breaker`. | +| `breaker_key_fn` | `Callable[[str], str] \| None` | `None` | Override the default endpoint-prefix bucketing. Receives the request path and returns the breaker key. Default behaviour collapses every path under the matching `Endpoint` prefix so the breaker set stays bounded (one per `Endpoint`); a custom function takes responsibility for keeping the key set bounded. Ignored unless `per_endpoint_circuit_breaker=True`. | + +State for any path is readable via `client.circuit_breaker_state_for(path)`. In the default (single-breaker) mode this returns the shared breaker's state for any path, so the call is always safe regardless of the flag. The same options and method are available on `AsyncHyperpingClient`. + ## Type Safety This package ships a `py.typed` marker (PEP 561) and is fully typed. Works out of the box with mypy and pyright. diff --git a/src/hyperping/_async_client.py b/src/hyperping/_async_client.py index 6474f4f..bd4e332 100644 --- a/src/hyperping/_async_client.py +++ b/src/hyperping/_async_client.py @@ -14,7 +14,10 @@ import asyncio import logging import random +import threading +from collections.abc import Callable from typing import Any +from urllib.parse import urlsplit import httpx from pydantic import SecretStr @@ -28,10 +31,11 @@ from hyperping._circuit_breaker import ( CircuitBreaker, CircuitBreakerConfig, + CircuitState, ) from hyperping._internals import DEFAULT_USER_AGENT, RETRY_AFTER_MAX, sanitize_for_log from hyperping.client import DEFAULT_RETRY_CONFIG, RetryConfig -from hyperping.endpoints import API_BASE +from hyperping.endpoints import API_BASE, Endpoint from hyperping.exceptions import ( HyperpingAPIError, HyperpingAuthError, @@ -73,6 +77,8 @@ def __init__( retry_config: RetryConfig | None = None, circuit_breaker_config: CircuitBreakerConfig | None = None, user_agent: str | None = None, + per_endpoint_circuit_breaker: bool = False, + breaker_key_fn: Callable[[str], str] | None = None, ) -> None: """Initialize the async Hyperping API client. @@ -82,8 +88,19 @@ def __init__( base_url: Override the default API base URL. timeout: HTTP request timeout in seconds. retry_config: Retry behaviour configuration. - circuit_breaker_config: Circuit breaker configuration. + circuit_breaker_config: Circuit breaker configuration. When + ``per_endpoint_circuit_breaker`` is ``True`` the same config is + applied to each per-endpoint breaker. user_agent: Custom ``User-Agent`` header value. + per_endpoint_circuit_breaker: When ``True``, maintain an + independent breaker per :class:`~hyperping.endpoints.Endpoint` + prefix (sub-resources inherit the parent endpoint's breaker, + so the breaker set stays bounded). Default ``False`` + preserves the original single-shared-breaker behaviour. + breaker_key_fn: Override the default endpoint-prefix bucketing. + Receives the request path and must return the breaker key. + Ignored unless ``per_endpoint_circuit_breaker`` is ``True``. + Caller is responsible for keeping the key set bounded. """ raw_key = api_key.get_secret_value() if isinstance(api_key, SecretStr) else api_key if not raw_key or not raw_key.strip(): @@ -92,7 +109,12 @@ def __init__( self.base_url = (base_url or self.DEFAULT_BASE_URL).rstrip("/") self.timeout = timeout self.retry_config = retry_config or DEFAULT_RETRY_CONFIG + self._circuit_breaker_config = circuit_breaker_config self._circuit_breaker = CircuitBreaker(circuit_breaker_config) + self._per_endpoint_circuit_breaker = per_endpoint_circuit_breaker + self._breaker_key_fn = breaker_key_fn + self._endpoint_breakers: dict[str, CircuitBreaker] = {} + self._endpoint_breakers_lock = threading.Lock() self._client = httpx.AsyncClient( base_url=self.base_url, @@ -120,9 +142,74 @@ async def __aexit__(self, *args: Any) -> None: @property def circuit_breaker(self) -> CircuitBreaker: - """Access the circuit breaker state (for monitoring).""" + """Access the (shared) circuit breaker state (for monitoring). + + In per-endpoint mode this returns the original shared breaker, kept + for backward compatibility; the per-path breakers are exposed via + :meth:`circuit_breaker_state_for`. + """ return self._circuit_breaker + def _resolve_breaker_key(self, path: str) -> str: + """Map a request path to its circuit-breaker key. + + Default bucketing strips query/fragment and collapses the path under + the longest matching :class:`Endpoint` prefix; a custom + ``breaker_key_fn`` wins outright. + """ + if self._breaker_key_fn is not None: + return self._breaker_key_fn(path) + pure = urlsplit(path).path + for ep in Endpoint: + ep_value = ep.value + if pure == ep_value or pure.startswith(ep_value + "/"): + return ep_value + return pure + + def _breaker_for(self, path: str) -> CircuitBreaker: + """Return the breaker that governs ``path`` (shared, or per-endpoint).""" + if not self._per_endpoint_circuit_breaker: + return self._circuit_breaker + key = self._resolve_breaker_key(path) + # threading.Lock here is intentional: see HyperpingClient._breaker_for + # for the rationale (works under both pure-asyncio and mixed-thread use). + with self._endpoint_breakers_lock: + breaker = self._endpoint_breakers.get(key) + if breaker is None: + breaker = CircuitBreaker(self._circuit_breaker_config) + self._endpoint_breakers[key] = breaker + return breaker + + def circuit_breaker_state_for(self, path: str) -> CircuitState: + """Return the circuit state of the breaker governing ``path``. + + In per-endpoint mode the path is canonicalised the same way as during + a request; untouched buckets report :attr:`CircuitState.CLOSED` + without allocating a breaker. In default mode the shared breaker's + state is returned for any path. + """ + if not self._per_endpoint_circuit_breaker: + return self._circuit_breaker.state + key = self._resolve_breaker_key(path) + with self._endpoint_breakers_lock: + breaker = self._endpoint_breakers.get(key) + return breaker.state if breaker is not None else CircuitState.CLOSED + + def _circuit_open_message(self, breaker: CircuitBreaker, path: str) -> str: + """Build the error message raised when a request is rejected by an OPEN breaker.""" + if self._per_endpoint_circuit_breaker: + key = self._resolve_breaker_key(path) + return ( + f"Circuit breaker OPEN for {key!r} - API calls to this endpoint suspended. " + f"Consecutive failures: {breaker.failure_count}. " + f"Will recover after {breaker.recovery_timeout}s." + ) + return ( + f"Circuit breaker OPEN - API calls suspended. " + f"Consecutive failures: {breaker.failure_count}. " + f"Will recover after {breaker.recovery_timeout}s." + ) + # ==================== Error Handling ==================== def _parse_error_body(self, response: httpx.Response) -> dict[str, Any]: @@ -236,7 +323,7 @@ async def _execute_single_attempt( if response.status_code >= 400: return response - self._circuit_breaker.record_success() + self._breaker_for(path).record_success() if response.status_code == 204: return {} return response.json() # type: ignore[no-any-return] @@ -262,13 +349,9 @@ async def _request( Raises: HyperpingAPIError: On API errors after retries exhausted """ - if not self._circuit_breaker.call_allowed(): - cb = self._circuit_breaker - raise HyperpingAPIError( - f"Circuit breaker OPEN - API calls suspended. " - f"Consecutive failures: {cb.failure_count}. " - f"Will recover after {cb.recovery_timeout}s." - ) + breaker = self._breaker_for(path) + if not breaker.call_allowed(): + raise HyperpingAPIError(self._circuit_open_message(breaker, path)) last_exception: Exception | None = None delay = self.retry_config.initial_delay @@ -299,7 +382,7 @@ async def _request( continue if response.status_code >= 500: - self._circuit_breaker.record_failure() + breaker.record_failure() self._handle_response_error(response) except (httpx.TimeoutException, httpx.RequestError) as e: @@ -320,7 +403,7 @@ async def _request( self.retry_config.max_delay, ) continue - self._circuit_breaker.record_failure() + breaker.record_failure() if isinstance(e, httpx.TimeoutException): raise HyperpingAPIError(f"Request timeout after {max_attempts} attempts") from e raise HyperpingAPIError(f"Request failed: {e}") from e diff --git a/src/hyperping/client.py b/src/hyperping/client.py index dfbcbe9..8763200 100644 --- a/src/hyperping/client.py +++ b/src/hyperping/client.py @@ -10,9 +10,12 @@ import logging import random +import threading import time +from collections.abc import Callable from dataclasses import dataclass from typing import Any +from urllib.parse import urlsplit import httpx from pydantic import SecretStr @@ -30,7 +33,7 @@ from hyperping._monitors_mixin import MonitorsMixin from hyperping._outages_mixin import OutagesMixin from hyperping._statuspages_mixin import StatusPagesMixin -from hyperping.endpoints import API_BASE +from hyperping.endpoints import API_BASE, Endpoint from hyperping.exceptions import ( HyperpingAPIError, HyperpingAuthError, @@ -88,6 +91,8 @@ def __init__( retry_config: RetryConfig | None = None, circuit_breaker_config: CircuitBreakerConfig | None = None, user_agent: str | None = None, + per_endpoint_circuit_breaker: bool = False, + breaker_key_fn: Callable[[str], str] | None = None, ) -> None: """Initialize the Hyperping API client. @@ -100,9 +105,28 @@ def __init__( retry_config: Retry behaviour configuration. Pass ``None`` for defaults (3 retries, exponential backoff). circuit_breaker_config: Circuit breaker configuration. Pass ``None`` - for defaults (5-failure threshold, 60 s recovery). + for defaults (5-failure threshold, 60 s recovery). When + ``per_endpoint_circuit_breaker`` is ``True`` this same config + is applied to every per-path breaker. user_agent: Custom ``User-Agent`` header value. Defaults to ``hyperping-python/0.1.0``. + per_endpoint_circuit_breaker: When ``True``, maintain an independent + breaker per *endpoint* so a single flaky endpoint does not + block traffic to healthy ones. By default the breaker key is + the matching :class:`~hyperping.endpoints.Endpoint` prefix: + ``/v1/monitors``, ``/v1/monitors/{uuid}`` and + ``/v1/monitors/{uuid}/anything`` all share one breaker keyed + on ``/v1/monitors``. This keeps the breaker set bounded + (one per Endpoint) instead of growing per resource UUID. + Default ``False`` preserves the original single-shared-breaker + behaviour. + breaker_key_fn: Override the default endpoint-prefix bucketing. + Receives the request path (with query/fragment intact) and + must return the breaker key. Use this if you want different + granularity (e.g. one breaker per resource UUID, or a single + breaker for all monitor sub-paths). Ignored unless + ``per_endpoint_circuit_breaker`` is ``True``. *Caller is + responsible for keeping the key set bounded.* """ raw_key = api_key.get_secret_value() if isinstance(api_key, SecretStr) else api_key if not raw_key or not raw_key.strip(): @@ -111,7 +135,12 @@ def __init__( self.base_url = (base_url or self.DEFAULT_BASE_URL).rstrip("/") self.timeout = timeout self.retry_config = retry_config or DEFAULT_RETRY_CONFIG + self._circuit_breaker_config = circuit_breaker_config self._circuit_breaker = CircuitBreaker(circuit_breaker_config) + self._per_endpoint_circuit_breaker = per_endpoint_circuit_breaker + self._breaker_key_fn = breaker_key_fn + self._endpoint_breakers: dict[str, CircuitBreaker] = {} + self._endpoint_breakers_lock = threading.Lock() self._client = httpx.Client( base_url=self.base_url, @@ -139,9 +168,84 @@ def __exit__(self, *args: Any) -> None: @property def circuit_breaker(self) -> CircuitBreaker: - """Access the circuit breaker state (for monitoring).""" + """Access the (shared) circuit breaker state (for monitoring). + + In per-endpoint mode this returns the original shared breaker, kept + for backward compatibility; the per-path breakers are exposed via + :meth:`circuit_breaker_state_for`. + """ return self._circuit_breaker + def _resolve_breaker_key(self, path: str) -> str: + """Map a request path to its circuit-breaker key. + + Default bucketing strips query/fragment and collapses the path under + the longest matching :class:`Endpoint` prefix, so every sub-resource + under an endpoint shares the parent's breaker. When the caller passes + a custom ``breaker_key_fn`` it wins outright. + """ + if self._breaker_key_fn is not None: + return self._breaker_key_fn(path) + pure = urlsplit(path).path + for ep in Endpoint: + ep_value = ep.value + if pure == ep_value or pure.startswith(ep_value + "/"): + return ep_value + return pure + + def _breaker_for(self, path: str) -> CircuitBreaker: + """Return the breaker that governs ``path``. + + In default mode this is always the shared breaker; in per-endpoint + mode each canonical key gets its own :class:`CircuitBreaker` lazily. + """ + if not self._per_endpoint_circuit_breaker: + return self._circuit_breaker + key = self._resolve_breaker_key(path) + # threading.Lock here (not asyncio.Lock) is intentional: it lets the + # same per-endpoint logic serve both the sync and async clients + # without forcing an `async` accessor, and it correctly serialises + # access if the async client is driven from multiple OS threads + # (e.g. via run_in_executor). + with self._endpoint_breakers_lock: + breaker = self._endpoint_breakers.get(key) + if breaker is None: + breaker = CircuitBreaker(self._circuit_breaker_config) + self._endpoint_breakers[key] = breaker + return breaker + + def circuit_breaker_state_for(self, path: str) -> CircuitState: + """Return the circuit state of the breaker governing ``path``. + + In per-endpoint mode the path is canonicalised the same way as during + a request (default endpoint-prefix bucketing, or ``breaker_key_fn`` + if set); untouched buckets report :attr:`CircuitState.CLOSED` without + allocating a breaker. In the default single-breaker mode the shared + breaker's state is returned for any path, so this method is always + safe to call regardless of the flag. + """ + if not self._per_endpoint_circuit_breaker: + return self._circuit_breaker.state + key = self._resolve_breaker_key(path) + with self._endpoint_breakers_lock: + breaker = self._endpoint_breakers.get(key) + return breaker.state if breaker is not None else CircuitState.CLOSED + + def _circuit_open_message(self, breaker: CircuitBreaker, path: str) -> str: + """Build the error message raised when a request is rejected by an OPEN breaker.""" + if self._per_endpoint_circuit_breaker: + key = self._resolve_breaker_key(path) + return ( + f"Circuit breaker OPEN for {key!r} - API calls to this endpoint suspended. " + f"Consecutive failures: {breaker.failure_count}. " + f"Will recover after {breaker.recovery_timeout}s." + ) + return ( + f"Circuit breaker OPEN - API calls suspended. " + f"Consecutive failures: {breaker.failure_count}. " + f"Will recover after {breaker.recovery_timeout}s." + ) + # ==================== Error Handling ==================== def _parse_error_body(self, response: httpx.Response) -> dict[str, Any]: @@ -310,7 +414,7 @@ def _execute_single_attempt( return response # Success - self._circuit_breaker.record_success() + self._breaker_for(path).record_success() if response.status_code == 204: return {} return response.json() # type: ignore[no-any-return] @@ -336,13 +440,9 @@ def _request( Raises: HyperpingAPIError: On API errors after retries exhausted """ - if not self._circuit_breaker.call_allowed(): - cb = self._circuit_breaker - raise HyperpingAPIError( - f"Circuit breaker OPEN - API calls suspended. " - f"Consecutive failures: {cb.failure_count}. " - f"Will recover after {cb.recovery_timeout}s." - ) + breaker = self._breaker_for(path) + if not breaker.call_allowed(): + raise HyperpingAPIError(self._circuit_open_message(breaker, path)) last_exception: Exception | None = None delay = self.retry_config.initial_delay @@ -374,7 +474,7 @@ def _request( # Only trip circuit breaker on server errors, not client errors if response.status_code >= 500: - self._circuit_breaker.record_failure() + breaker.record_failure() self._handle_response_error(response) except (httpx.TimeoutException, httpx.RequestError) as e: @@ -395,7 +495,7 @@ def _request( self.retry_config.max_delay, ) continue - self._circuit_breaker.record_failure() + breaker.record_failure() if isinstance(e, httpx.TimeoutException): raise HyperpingAPIError(f"Request timeout after {max_attempts} attempts") from e raise HyperpingAPIError(f"Request failed: {e}") from e diff --git a/tests/unit/test_per_endpoint_circuit_breaker.py b/tests/unit/test_per_endpoint_circuit_breaker.py new file mode 100644 index 0000000..f5e8219 --- /dev/null +++ b/tests/unit/test_per_endpoint_circuit_breaker.py @@ -0,0 +1,365 @@ +"""Tests for the per-endpoint circuit breaker option (PY-03). + +The default behaviour (single shared breaker) is exercised by the existing +breaker tests in ``test_sdk_surface.py``, ``test_monitors.py``, and +``test_async_client.py``. These tests cover the new opt-in path: + + HyperpingClient(..., per_endpoint_circuit_breaker=True) + +with isolation between paths, a per-path state accessor, async parity, and +thread safety on the per-path breaker dict. +""" + +from __future__ import annotations + +import threading + +import httpx +import pytest +import pytest_asyncio +import respx + +from hyperping._async_client import AsyncHyperpingClient +from hyperping._circuit_breaker import CircuitBreakerConfig, CircuitState +from hyperping.client import HyperpingClient, RetryConfig +from hyperping.endpoints import API_BASE, Endpoint +from hyperping.exceptions import HyperpingAPIError + + +def _cb_config(threshold: int = 2) -> CircuitBreakerConfig: + """Tight threshold so tests trip the breaker quickly without long timeouts.""" + return CircuitBreakerConfig(failure_threshold=threshold, recovery_timeout=60.0) + + +def _monitor_payload(uuid: str) -> dict: + """Minimal monitor payload that satisfies Monitor.model_validate().""" + return { + "monitorUuid": uuid, + "name": uuid, + "url": "https://example.com", + "method": "GET", + "frequency": 60, + "timeout": 10, + "regions": ["london"], + "headers": {}, + "expectedStatus": 200, + "down": False, + "paused": False, + } + + +# ==================== sync ==================== + + +class TestPerEndpointCircuitBreakerSync: + """Per-endpoint isolation on HyperpingClient.""" + + @respx.mock + def test_per_endpoint_isolation(self) -> None: + """A failing endpoint trips its own breaker; a healthy endpoint is unaffected.""" + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=2), + per_endpoint_circuit_breaker=True, + ) + + respx.get(f"{API_BASE}{Endpoint.MONITORS}").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + respx.get(f"{API_BASE}{Endpoint.INCIDENTS}").mock( + return_value=httpx.Response(200, json={"incidents": []}), + ) + + # Trip /v1/monitors + for _ in range(2): + with pytest.raises(HyperpingAPIError): + client.list_monitors() + + # /v1/monitors breaker is now OPEN; further calls fail-fast. + with pytest.raises(HyperpingAPIError, match="Circuit breaker OPEN"): + client.list_monitors() + + # /v3/incidents breaker is untouched and the call succeeds. + assert client.list_incidents() == [] + + assert client.circuit_breaker_state_for(str(Endpoint.MONITORS)) == CircuitState.OPEN + assert client.circuit_breaker_state_for(str(Endpoint.INCIDENTS)) == CircuitState.CLOSED + + client.close() + + @respx.mock + def test_per_endpoint_state_query_strips_query_string(self) -> None: + """``circuit_breaker_state_for`` keys on path only, ignoring query/fragment.""" + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=1), + per_endpoint_circuit_breaker=True, + ) + + respx.get(f"{API_BASE}{Endpoint.INCIDENTS}").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + + with pytest.raises(HyperpingAPIError): + client.list_incidents(status="investigating") + + # The request used a path with a query string, but the breaker key strips it. + assert client.circuit_breaker_state_for(str(Endpoint.INCIDENTS)) == CircuitState.OPEN + assert ( + client.circuit_breaker_state_for(f"{Endpoint.INCIDENTS}?status=investigating") + == CircuitState.OPEN + ) + + client.close() + + @respx.mock + def test_default_behaviour_unchanged(self) -> None: + """With the flag off (default), a 5xx on one path trips the shared breaker for all paths.""" + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=1), + ) + + respx.get(f"{API_BASE}{Endpoint.MONITORS}").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + + with pytest.raises(HyperpingAPIError): + client.list_monitors() + + # Shared breaker is OPEN; even an unrelated path (which has no mock route) + # is rejected without an HTTP call. + with pytest.raises(HyperpingAPIError, match="Circuit breaker OPEN"): + client.list_incidents() + + assert client.circuit_breaker.state == CircuitState.OPEN + client.close() + + def test_state_for_unknown_path_is_closed(self) -> None: + """Querying a path that has not been touched returns CLOSED (no breaker created).""" + client = HyperpingClient( + api_key="sk_test", + per_endpoint_circuit_breaker=True, + ) + assert client.circuit_breaker_state_for("/v1/unused") == CircuitState.CLOSED + client.close() + + @respx.mock + def test_state_for_default_mode_returns_shared_state(self) -> None: + """In default mode, state_for(any_path) reflects the single shared breaker.""" + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=1), + ) + # Untripped: any path reports CLOSED, identical to the shared breaker. + assert client.circuit_breaker_state_for("/v1/monitors") == CircuitState.CLOSED + assert client.circuit_breaker_state_for("/anything") == CircuitState.CLOSED + + respx.get(f"{API_BASE}{Endpoint.MONITORS}").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + with pytest.raises(HyperpingAPIError): + client.list_monitors() + + # Shared breaker is now OPEN; state_for reports OPEN regardless of path. + assert client.circuit_breaker.state == CircuitState.OPEN + assert client.circuit_breaker_state_for("/v1/monitors") == CircuitState.OPEN + assert client.circuit_breaker_state_for("/v3/incidents") == CircuitState.OPEN + client.close() + + @respx.mock + def test_default_canonicalization_buckets_by_endpoint(self) -> None: + """`/v1/monitors/{uuid}` and `/v1/monitors` share one breaker; other endpoints don't.""" + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=2), + per_endpoint_circuit_breaker=True, + ) + + # Two different monitor UUIDs both fail with 5xx. + respx.get(f"{API_BASE}{Endpoint.MONITORS}/mon_A").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + respx.get(f"{API_BASE}{Endpoint.MONITORS}/mon_B").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + respx.get(f"{API_BASE}{Endpoint.INCIDENTS}").mock( + return_value=httpx.Response(200, json={"incidents": []}), + ) + + # Two failures on mon_A trip the shared `/v1/monitors` breaker. + with pytest.raises(HyperpingAPIError): + client.get_monitor("mon_A") + with pytest.raises(HyperpingAPIError): + client.get_monitor("mon_A") + + # mon_B is now blocked too — it shares the `/v1/monitors` bucket. No HTTP + # request is made (no mock interaction needed beyond the route). + with pytest.raises(HyperpingAPIError, match="Circuit breaker OPEN"): + client.get_monitor("mon_B") + + # The list endpoint also falls under `/v1/monitors` and is blocked. + with pytest.raises(HyperpingAPIError, match="Circuit breaker OPEN"): + client.list_monitors() + + # A different endpoint (`/v3/incidents`) is unaffected. + assert client.list_incidents() == [] + + # State queries: any monitor sub-path resolves to the same key. + assert client.circuit_breaker_state_for(f"{Endpoint.MONITORS}/mon_A") == CircuitState.OPEN + assert client.circuit_breaker_state_for(f"{Endpoint.MONITORS}/mon_B") == CircuitState.OPEN + assert client.circuit_breaker_state_for(str(Endpoint.MONITORS)) == CircuitState.OPEN + assert client.circuit_breaker_state_for(str(Endpoint.INCIDENTS)) == CircuitState.CLOSED + + client.close() + + @respx.mock + def test_custom_breaker_key_fn(self) -> None: + """A caller-supplied key fn overrides the default endpoint bucketing.""" + seen: list[str] = [] + + def per_uuid(path: str) -> str: + seen.append(path) + # Force one breaker per literal path (the pre-canonicalisation behaviour). + return path + + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=2), + per_endpoint_circuit_breaker=True, + breaker_key_fn=per_uuid, + ) + + respx.get(f"{API_BASE}{Endpoint.MONITORS}/mon_A").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + respx.get(f"{API_BASE}{Endpoint.MONITORS}/mon_B").mock( + return_value=httpx.Response(200, json=_monitor_payload("mon_B")), + ) + + with pytest.raises(HyperpingAPIError): + client.get_monitor("mon_A") + with pytest.raises(HyperpingAPIError): + client.get_monitor("mon_A") + + # With a per-UUID key fn, mon_B has its own breaker and the call goes through. + result = client.get_monitor("mon_B") + assert result.uuid == "mon_B" + + assert seen, "custom key fn was not invoked" + assert client.circuit_breaker_state_for(f"{Endpoint.MONITORS}/mon_A") == CircuitState.OPEN + assert client.circuit_breaker_state_for(f"{Endpoint.MONITORS}/mon_B") == CircuitState.CLOSED + + client.close() + + @respx.mock + def test_open_error_message_includes_endpoint_key(self) -> None: + """OPEN error message identifies which endpoint was tripped.""" + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=1), + per_endpoint_circuit_breaker=True, + ) + respx.get(f"{API_BASE}{Endpoint.MONITORS}").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + + with pytest.raises(HyperpingAPIError): + client.list_monitors() + + with pytest.raises(HyperpingAPIError, match=r"Circuit breaker OPEN for '/v1/monitors'"): + client.list_monitors() + + client.close() + + @respx.mock + def test_per_endpoint_threadsafe(self) -> None: + """50 concurrent calls across two paths: failing path opens, healthy path stays closed.""" + client = HyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=3), + per_endpoint_circuit_breaker=True, + ) + + respx.get(f"{API_BASE}{Endpoint.MONITORS}").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + respx.get(f"{API_BASE}{Endpoint.INCIDENTS}").mock( + return_value=httpx.Response(200, json={"incidents": []}), + ) + + def hit_monitors() -> None: + try: + client.list_monitors() + except HyperpingAPIError: + pass + + def hit_incidents() -> None: + client.list_incidents() + + threads: list[threading.Thread] = [] + for i in range(50): + target = hit_monitors if i % 2 == 0 else hit_incidents + threads.append(threading.Thread(target=target)) + for t in threads: + t.start() + for t in threads: + t.join() + + assert client.circuit_breaker_state_for(str(Endpoint.MONITORS)) == CircuitState.OPEN + assert client.circuit_breaker_state_for(str(Endpoint.INCIDENTS)) == CircuitState.CLOSED + + client.close() + + +# ==================== async ==================== + + +@pytest_asyncio.fixture +async def per_endpoint_async_client(): + client = AsyncHyperpingClient( + api_key="sk_test", + retry_config=RetryConfig(max_retries=0), + circuit_breaker_config=_cb_config(threshold=2), + per_endpoint_circuit_breaker=True, + ) + yield client + await client.close() + + +class TestPerEndpointCircuitBreakerAsync: + """Per-endpoint isolation on AsyncHyperpingClient.""" + + @pytest.mark.asyncio + @respx.mock + async def test_per_endpoint_async( + self, per_endpoint_async_client: AsyncHyperpingClient + ) -> None: + client = per_endpoint_async_client + + respx.get(f"{API_BASE}{Endpoint.MONITORS}").mock( + return_value=httpx.Response(500, json={"error": "boom"}), + ) + respx.get(f"{API_BASE}{Endpoint.INCIDENTS}").mock( + return_value=httpx.Response(200, json={"incidents": []}), + ) + + for _ in range(2): + with pytest.raises(HyperpingAPIError): + await client.list_monitors() + + with pytest.raises(HyperpingAPIError, match="Circuit breaker OPEN"): + await client.list_monitors() + + assert await client.list_incidents() == [] + + assert client.circuit_breaker_state_for(str(Endpoint.MONITORS)) == CircuitState.OPEN + assert client.circuit_breaker_state_for(str(Endpoint.INCIDENTS)) == CircuitState.CLOSED