Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions BACKLOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Agents: Security, Architecture, Code Quality, Refactor/Dead Code.

- [x] **M1** Async client (`AsyncHyperpingClient`) — shipped in PR #13 (feature/sdk-py-03-async-client).
- [x] **M2** Pagination (`page` param, `hasNextPage` auto-pagination via `collect_all_pages` / `collect_all_pages_async`) — shipped in PR #12 (merged) and PR #13.
- [ ] **M3** Per-endpoint circuit breaker (`per_endpoint_circuit_breaker: bool = False` option). **Deferred — moderate scope, add in follow-up.**
- [x] **M3** Per-endpoint circuit breaker (`per_endpoint_circuit_breaker: bool = False` option) on `HyperpingClient` and `AsyncHyperpingClient`. Per-path state via `circuit_breaker_state_for(path)`. Default off; existing single-shared-breaker behaviour preserved.
- [x] **M4** `MonitorCreate` now has `@model_validator(mode="after")` that raises `ValueError` if DNS fields are set on non-DNS monitors.
- [x] **M5** `MonitorListResponse` is in `__all__` — retained but documented as not returned by any client method. Will be used once pagination lands.
- [x] **M6** `APIErrorResponse` removed from `__all__` (documented as intentionally internal in comment).
Expand Down Expand Up @@ -106,7 +106,7 @@ The following items require either a semver bump, a separate PR, or manual work:

- ~~**M1** Async client~~ — shipped
- ~~**M2** Pagination~~ — shipped
- **M3** Per-endpoint circuit breaker option
- ~~**M3** Per-endpoint circuit breaker option~~ — shipped
- **M11** URL validation for HTTP-protocol monitors (cross-field, needs discriminated union work)
- **M12** DateTime coercion (breaking change — v0.2.0)
- **H11** Pin all GitHub Actions `uses:` to 40-char commit SHAs (requires per-tag SHA lookup)
Expand Down
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Per-endpoint circuit breaker option (`per_endpoint_circuit_breaker: bool = False`) on
`HyperpingClient` and `AsyncHyperpingClient`. When enabled, each `Endpoint` gets its own
breaker state so a single flaky endpoint no longer blocks traffic to healthy ones.
Sub-resource paths (e.g. `/v1/monitors/{uuid}`, `/v1/monitors/{uuid}/reports`) are
bucketed under their parent `Endpoint` prefix so the breaker set stays bounded; pass a
custom `breaker_key_fn` to change that. The OPEN-state error message now identifies
which endpoint tripped. State for a given path is readable via
`client.circuit_breaker_state_for(path)` in either mode. Default behaviour is
unchanged. See README for details.

## [1.5.0] - 2026-04-20

### Added
Expand Down
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,48 @@ client = HyperpingClient(
)
```

### Per-endpoint circuit breaker

By default a single shared circuit breaker covers every request. If one endpoint flakes, every other endpoint is also blocked. Enable `per_endpoint_circuit_breaker=True` to keep one breaker per *endpoint* so a failing endpoint does not punish healthy ones:

```python
client = HyperpingClient(
api_key="sk_...",
per_endpoint_circuit_breaker=True,
)

# Inspect state for an endpoint. The breaker key is canonicalised to the
# matching `Endpoint` prefix, so all sub-resource paths share a bucket:
from hyperping import CircuitState, Endpoint

state = client.circuit_breaker_state_for(str(Endpoint.MONITORS))
# /v1/monitors, /v1/monitors/mon_abc and /v1/monitors/mon_abc/reports all
# report the same state — they share the `/v1/monitors` breaker.
assert client.circuit_breaker_state_for(f"{Endpoint.MONITORS}/mon_abc") == state
assert state in {CircuitState.CLOSED, CircuitState.HALF_OPEN, CircuitState.OPEN}
```

If you need different bucketing (e.g. one breaker per resource UUID, or a single breaker per HTTP verb), pass a `breaker_key_fn`:

```python
def per_resource(path: str) -> str:
# one breaker per literal request path
return path.split("?", 1)[0]

client = HyperpingClient(
api_key="sk_...",
per_endpoint_circuit_breaker=True,
breaker_key_fn=per_resource,
)
```

| Option | Type | Default | Description |
| --- | --- | --- | --- |
| `per_endpoint_circuit_breaker` | `bool` | `False` | When `True`, maintain a separate circuit breaker keyed by request endpoint instead of using one shared breaker. The same `circuit_breaker_config` applies to every per-endpoint breaker. The shared breaker remains accessible via `client.circuit_breaker`. |
| `breaker_key_fn` | `Callable[[str], str] \| None` | `None` | Override the default endpoint-prefix bucketing. Receives the request path and returns the breaker key. Default behaviour collapses every path under the matching `Endpoint` prefix so the breaker set stays bounded (one per `Endpoint`); a custom function takes responsibility for keeping the key set bounded. Ignored unless `per_endpoint_circuit_breaker=True`. |

State for any path is readable via `client.circuit_breaker_state_for(path)`. In the default (single-breaker) mode this returns the shared breaker's state for any path, so the call is always safe regardless of the flag. The same options and method are available on `AsyncHyperpingClient`.

## Type Safety

This package ships a `py.typed` marker (PEP 561) and is fully typed. Works out of the box with mypy and pyright.
Expand Down
109 changes: 96 additions & 13 deletions src/hyperping/_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import asyncio
import logging
import random
import threading
from collections.abc import Callable
from typing import Any
from urllib.parse import urlsplit

import httpx
from pydantic import SecretStr
Expand All @@ -28,10 +31,11 @@
from hyperping._circuit_breaker import (
CircuitBreaker,
CircuitBreakerConfig,
CircuitState,
)
from hyperping._internals import DEFAULT_USER_AGENT, RETRY_AFTER_MAX, sanitize_for_log
from hyperping.client import DEFAULT_RETRY_CONFIG, RetryConfig
from hyperping.endpoints import API_BASE
from hyperping.endpoints import API_BASE, Endpoint
from hyperping.exceptions import (
HyperpingAPIError,
HyperpingAuthError,
Expand Down Expand Up @@ -73,6 +77,8 @@ def __init__(
retry_config: RetryConfig | None = None,
circuit_breaker_config: CircuitBreakerConfig | None = None,
user_agent: str | None = None,
per_endpoint_circuit_breaker: bool = False,
breaker_key_fn: Callable[[str], str] | None = None,
) -> None:
"""Initialize the async Hyperping API client.

Expand All @@ -82,8 +88,19 @@ def __init__(
base_url: Override the default API base URL.
timeout: HTTP request timeout in seconds.
retry_config: Retry behaviour configuration.
circuit_breaker_config: Circuit breaker configuration.
circuit_breaker_config: Circuit breaker configuration. When
``per_endpoint_circuit_breaker`` is ``True`` the same config is
applied to each per-endpoint breaker.
user_agent: Custom ``User-Agent`` header value.
per_endpoint_circuit_breaker: When ``True``, maintain an
independent breaker per :class:`~hyperping.endpoints.Endpoint`
prefix (sub-resources inherit the parent endpoint's breaker,
so the breaker set stays bounded). Default ``False``
preserves the original single-shared-breaker behaviour.
breaker_key_fn: Override the default endpoint-prefix bucketing.
Receives the request path and must return the breaker key.
Ignored unless ``per_endpoint_circuit_breaker`` is ``True``.
Caller is responsible for keeping the key set bounded.
"""
raw_key = api_key.get_secret_value() if isinstance(api_key, SecretStr) else api_key
if not raw_key or not raw_key.strip():
Expand All @@ -92,7 +109,12 @@ def __init__(
self.base_url = (base_url or self.DEFAULT_BASE_URL).rstrip("/")
self.timeout = timeout
self.retry_config = retry_config or DEFAULT_RETRY_CONFIG
self._circuit_breaker_config = circuit_breaker_config
self._circuit_breaker = CircuitBreaker(circuit_breaker_config)
self._per_endpoint_circuit_breaker = per_endpoint_circuit_breaker
self._breaker_key_fn = breaker_key_fn
self._endpoint_breakers: dict[str, CircuitBreaker] = {}
self._endpoint_breakers_lock = threading.Lock()

self._client = httpx.AsyncClient(
base_url=self.base_url,
Expand Down Expand Up @@ -120,9 +142,74 @@ async def __aexit__(self, *args: Any) -> None:

@property
def circuit_breaker(self) -> CircuitBreaker:
"""Access the circuit breaker state (for monitoring)."""
"""Access the (shared) circuit breaker state (for monitoring).

In per-endpoint mode this returns the original shared breaker, kept
for backward compatibility; the per-path breakers are exposed via
:meth:`circuit_breaker_state_for`.
"""
return self._circuit_breaker

def _resolve_breaker_key(self, path: str) -> str:
"""Map a request path to its circuit-breaker key.

Default bucketing strips query/fragment and collapses the path under
the longest matching :class:`Endpoint` prefix; a custom
``breaker_key_fn`` wins outright.
"""
if self._breaker_key_fn is not None:
return self._breaker_key_fn(path)
pure = urlsplit(path).path
for ep in Endpoint:
ep_value = ep.value
if pure == ep_value or pure.startswith(ep_value + "/"):
return ep_value
return pure

def _breaker_for(self, path: str) -> CircuitBreaker:
"""Return the breaker that governs ``path`` (shared, or per-endpoint)."""
if not self._per_endpoint_circuit_breaker:
return self._circuit_breaker
key = self._resolve_breaker_key(path)
# threading.Lock here is intentional: see HyperpingClient._breaker_for
# for the rationale (works under both pure-asyncio and mixed-thread use).
with self._endpoint_breakers_lock:
breaker = self._endpoint_breakers.get(key)
if breaker is None:
breaker = CircuitBreaker(self._circuit_breaker_config)
self._endpoint_breakers[key] = breaker
return breaker

def circuit_breaker_state_for(self, path: str) -> CircuitState:
"""Return the circuit state of the breaker governing ``path``.

In per-endpoint mode the path is canonicalised the same way as during
a request; untouched buckets report :attr:`CircuitState.CLOSED`
without allocating a breaker. In default mode the shared breaker's
state is returned for any path.
"""
if not self._per_endpoint_circuit_breaker:
return self._circuit_breaker.state
key = self._resolve_breaker_key(path)
with self._endpoint_breakers_lock:
breaker = self._endpoint_breakers.get(key)
return breaker.state if breaker is not None else CircuitState.CLOSED

def _circuit_open_message(self, breaker: CircuitBreaker, path: str) -> str:
"""Build the error message raised when a request is rejected by an OPEN breaker."""
if self._per_endpoint_circuit_breaker:
key = self._resolve_breaker_key(path)
return (
f"Circuit breaker OPEN for {key!r} - API calls to this endpoint suspended. "
f"Consecutive failures: {breaker.failure_count}. "
f"Will recover after {breaker.recovery_timeout}s."
)
return (
f"Circuit breaker OPEN - API calls suspended. "
f"Consecutive failures: {breaker.failure_count}. "
f"Will recover after {breaker.recovery_timeout}s."
)

# ==================== Error Handling ====================

def _parse_error_body(self, response: httpx.Response) -> dict[str, Any]:
Expand Down Expand Up @@ -236,7 +323,7 @@ async def _execute_single_attempt(
if response.status_code >= 400:
return response

self._circuit_breaker.record_success()
self._breaker_for(path).record_success()
if response.status_code == 204:
return {}
return response.json() # type: ignore[no-any-return]
Expand All @@ -262,13 +349,9 @@ async def _request(
Raises:
HyperpingAPIError: On API errors after retries exhausted
"""
if not self._circuit_breaker.call_allowed():
cb = self._circuit_breaker
raise HyperpingAPIError(
f"Circuit breaker OPEN - API calls suspended. "
f"Consecutive failures: {cb.failure_count}. "
f"Will recover after {cb.recovery_timeout}s."
)
breaker = self._breaker_for(path)
if not breaker.call_allowed():
raise HyperpingAPIError(self._circuit_open_message(breaker, path))

last_exception: Exception | None = None
delay = self.retry_config.initial_delay
Expand Down Expand Up @@ -299,7 +382,7 @@ async def _request(
continue

if response.status_code >= 500:
self._circuit_breaker.record_failure()
breaker.record_failure()
self._handle_response_error(response)

except (httpx.TimeoutException, httpx.RequestError) as e:
Expand All @@ -320,7 +403,7 @@ async def _request(
self.retry_config.max_delay,
)
continue
self._circuit_breaker.record_failure()
breaker.record_failure()
if isinstance(e, httpx.TimeoutException):
raise HyperpingAPIError(f"Request timeout after {max_attempts} attempts") from e
raise HyperpingAPIError(f"Request failed: {e}") from e
Expand Down
Loading
Loading