Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 45 additions & 35 deletions apiforgepy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,56 @@
apiforgepy — API observability & intelligence for FastAPI/Starlette.
Local-first, privacy-first. Dashboard on port 4242.

Usage:
Usage (local):
from apiforgepy import ApiForgeMiddleware
app.add_middleware(ApiForgeMiddleware, mode="local")
app.add_middleware(ApiForgeMiddleware)

Usage (cloud):
app.add_middleware(
ApiForgeMiddleware,
cloud_url="https://api.apiforge.fr",
api_key="af_...",
)
"""

import os

from .aggregator import Aggregator
from .database import ApiForgeDatabase
from .dashboard import start_dashboard
from .middleware import ApiForgeMiddleware as _Base
from .transport import LocalTransport
from .aggregator import Aggregator
from .database import ApiForgeDatabase
from .dashboard import start_dashboard
from .middleware import ApiForgeMiddleware as _Base
from .transport import LocalTransport
from .cloud_transport import CloudTransport

__version__ = "0.1.0"
__version__ = "2.0.0"
__all__ = ["ApiForgeMiddleware"]


class ApiForgeMiddleware(_Base):
"""
Starlette/FastAPI middleware for APIForge local-first observability.
Starlette/FastAPI middleware for APIForge observability.

Parameters
----------
app: The ASGI app to wrap.
mode: Storage mode. Only 'local' (SQLite) in v0.x.
db_path: SQLite file path. Default: '.apiforge.db'
dashboard_port: Port for the built-in dashboard. Set 0 to disable. Default: 4242.
cloud_url: Cloud mode: SaaS API base URL (e.g. 'https://api.apiforge.fr').
api_key: Cloud mode: project API key starting with 'af_'.
db_path: Local mode: SQLite file path. Default: '.apiforge.db'.
dashboard_port: Local mode: dashboard port. 0 = disabled. Default: 4242.
flush_interval: Aggregation flush interval in ms. Default: 60 000.
env: Environment label. Default: NODE_ENV or 'production'.
release: Release tag for deployment correlation. Default: APP_VERSION env var.
service: Service name for multi-service setups. Default: 'default'.
env: Environment label. Default: ENV env var or 'production'.
release: Release tag. Default: APP_VERSION env var.
service: Service name. Default: 'default'.
sampling: Sample rate 0.0–1.0. Default: 1.0.
ignore_paths: Paths to exclude. Default: ['/favicon.ico'].
"""

_instance_db = None
_instance_transport = None
_instance_aggregator = None
_instance_dashboard = None

def __init__(
self,
app,
*,
mode: str = "local",
cloud_url: str | None = None,
api_key: str | None = None,
db_path: str = ".apiforge.db",
dashboard_port: int = 4242,
flush_interval: int = 60_000,
Expand All @@ -56,34 +61,39 @@ def __init__(
sampling: float = 1.0,
ignore_paths: list[str] = None,
):
if mode != "local":
raise ValueError(f"[apiforgepy] mode '{mode}' is not yet supported. Use 'local'.")
is_cloud = bool(cloud_url and api_key)

if (cloud_url and not api_key) or (api_key and not cloud_url):
raise ValueError("[apiforgepy] Cloud mode requires both cloud_url and api_key.")

config = {
"mode": mode,
"db_path": db_path,
"mode": "cloud" if is_cloud else "local",
"env": env or os.environ.get("ENV", "production"),
"release": release or os.environ.get("APP_VERSION"),
"service": service,
"sampling": sampling,
"ignore_paths": ignore_paths or ["/favicon.ico"],
}

db = ApiForgeDatabase(db_path)
transport = LocalTransport(db)
aggregator = Aggregator(transport, flush_interval)
self._db = None

if is_cloud:
transport = CloudTransport(cloud_url, api_key, service)
else:
self._db = ApiForgeDatabase(db_path)
transport = LocalTransport(self._db)

aggregator = Aggregator(transport, flush_interval)
aggregator.start()

if dashboard_port:
start_dashboard(db, dashboard_port)
if not is_cloud and dashboard_port:
start_dashboard(self._db, dashboard_port)

self._db = db
self._transport = transport
self._aggregator_ref = aggregator

super().__init__(app, aggregator=aggregator, config=config)

def shutdown(self):
"""Flush remaining buffer and close the SQLite connection."""
"""Flush remaining buffer and close resources."""
self._aggregator_ref.stop()
self._db.close()
if self._db:
self._db.close()
2 changes: 2 additions & 0 deletions apiforgepy/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def _flush(self):
n = len(sorted_d)
sizes = bucket["response_sizes"]
bytes_avg = sum(sizes) / len(sizes) if sizes else None
lat_avg = sum(bucket["durations"]) / n if n > 0 else None
rows.append({
"bucket_ts": bucket_ts,
"route": bucket["route"],
Expand All @@ -91,6 +92,7 @@ def _flush(self):
"lat_p50": _percentile(sorted_d, 0.50),
"lat_p90": _percentile(sorted_d, 0.90),
"lat_p99": _percentile(sorted_d, 0.99),
"lat_avg": lat_avg,
"lat_min": sorted_d[0] if sorted_d else 0,
"lat_max": sorted_d[-1] if sorted_d else 0,
"bytes_avg": bytes_avg,
Expand Down
71 changes: 71 additions & 0 deletions apiforgepy/cloud_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import json
import time
import threading
import urllib.request
import urllib.error
from datetime import datetime, timezone

_CIRCUIT_OPEN_S = 60
_FAILURE_THRESHOLD = 5


class CloudTransport:
"""Sends aggregated metrics to the APIForge SaaS ingest endpoint."""

def __init__(self, cloud_url: str, api_key: str, service: str):
self._url = cloud_url.rstrip("/") + "/ingest"
self._api_key = api_key
self._service = service
self._failures = 0
self._open_until = 0.0
self._lock = threading.Lock()

def write(self, rows: list[dict]) -> None:
if not rows:
return
if time.monotonic() < self._open_until:
return

metrics = [
{
"route": r["route"],
"method": r["method"],
"service": self._service,
"env": r["env"],
"release": r.get("release_tag"),
"time": datetime.fromtimestamp(r["bucket_ts"], tz=timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.000Z'),
"calls_total": r["total_calls"],
"calls_2xx": r["status_2xx"],
"calls_4xx": r["status_4xx"],
"calls_5xx": r["status_5xx"],
"lat_p50": r.get("lat_p50"),
"lat_p90": r.get("lat_p90"),
"lat_p99": r.get("lat_p99"),
"lat_avg": r.get("lat_avg"),
"bytes_avg": r.get("bytes_avg"),
}
for r in rows
]

payload = json.dumps({"metrics": metrics}).encode()
req = urllib.request.Request(
self._url,
data=payload,
headers={"Content-Type": "application/json", "X-API-Key": self._api_key},
method="POST",
)

try:
with urllib.request.urlopen(req, timeout=10):
with self._lock:
self._failures = 0
except (urllib.error.URLError, OSError) as exc:
with self._lock:
self._failures += 1
if self._failures >= _FAILURE_THRESHOLD:
self._open_until = time.monotonic() + _CIRCUIT_OPEN_S
self._failures = 0
print(
f"[apiforgepy] Cloud flush failures — pausing for {_CIRCUIT_OPEN_S}s. "
f"Error: {exc}"
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "apiforgepy"
version = "1.0.3"
version = "2.0.0"

description = "API observability & intelligence for FastAPI/Starlette — local-first, privacy-first"
readme = "README.md"
Expand Down
1 change: 0 additions & 1 deletion tests/test_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ def make_app(db_path=":memory:", sampling=1.0, ignore_paths=None):
app = FastAPI()
app.add_middleware(
ApiForgeMiddleware,
mode="local",
db_path=db_path,
dashboard_port=0,
flush_interval=999_999,
Expand Down
7 changes: 3 additions & 4 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ def make_app(db_path=":memory:"):
app = FastAPI()
app.add_middleware(
ApiForgeMiddleware,
mode="local",
db_path=db_path,
dashboard_port=0,
flush_interval=999_999,
Expand Down Expand Up @@ -62,10 +61,10 @@ def test_multiple_methods():
assert client.post("/users").status_code == 200


def test_invalid_mode_raises():
with pytest.raises(ValueError, match="not yet supported"):
def test_partial_cloud_config_raises():
with pytest.raises(ValueError, match="both cloud_url and api_key"):
app = FastAPI()
app.add_middleware(ApiForgeMiddleware, mode="saas", dashboard_port=0)
app.add_middleware(ApiForgeMiddleware, cloud_url="https://api.apiforge.fr", dashboard_port=0)
client = TestClient(app)
client.get("/")

Expand Down
Loading