Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apiforgepy/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ def stop(self):
self._flush()

def record(self, event: dict):
key = f"{event['method']}|{event['route']}|{event['env']}|{event.get('release') or ''}"
is_ghost = event.get("is_ghost", False)
key = f"{event['method']}|{event['route']}|{event['env']}|{event.get('release') or ''}|{'1' if is_ghost else '0'}"
with self._lock:
if key not in self._buffer:
self._buffer[key] = {
"method": event["method"],
"route": event["route"],
"env": event["env"],
"release": event.get("release"),
"is_ghost": is_ghost,
"durations": [],
"response_sizes": [],
"status_2xx": 0,
Expand Down Expand Up @@ -85,6 +87,7 @@ def _flush(self):
"method": bucket["method"],
"env": bucket["env"],
"release_tag": bucket["release"],
"is_ghost": 1 if bucket["is_ghost"] else 0,
"status_2xx": bucket["status_2xx"],
"status_4xx": bucket["status_4xx"],
"status_5xx": bucket["status_5xx"],
Expand Down
40 changes: 23 additions & 17 deletions apiforgepy/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,24 @@ def _init(self):
lat_p99 REAL,
lat_min REAL,
lat_max REAL,
bytes_avg REAL
bytes_avg REAL,
is_ghost INTEGER NOT NULL DEFAULT 0
);

CREATE INDEX IF NOT EXISTS idx_route_ts ON api_metrics (route, method, bucket_ts);
CREATE INDEX IF NOT EXISTS idx_bucket_ts ON api_metrics (bucket_ts);
CREATE INDEX IF NOT EXISTS idx_release ON api_metrics (release_tag)
WHERE release_tag IS NOT NULL;
""")
# Migration for databases created before bytes_avg was introduced
# Migrations for databases created before these columns were introduced
try:
c.execute("ALTER TABLE api_metrics ADD COLUMN bytes_avg REAL")
except Exception:
pass # column already exists
pass
try:
c.execute("ALTER TABLE api_metrics ADD COLUMN is_ghost INTEGER NOT NULL DEFAULT 0")
except Exception:
pass
c.commit()

def insert_batch(self, rows: list[dict]):
Expand All @@ -66,11 +71,11 @@ def insert_batch(self, rows: list[dict]):
INSERT INTO api_metrics
(bucket_ts, route, method, env, release_tag,
status_2xx, status_4xx, status_5xx, total_calls,
lat_p50, lat_p90, lat_p99, lat_min, lat_max, bytes_avg)
lat_p50, lat_p90, lat_p99, lat_min, lat_max, bytes_avg, is_ghost)
VALUES (
:bucket_ts, :route, :method, :env, :release_tag,
:status_2xx, :status_4xx, :status_5xx, :total_calls,
:lat_p50, :lat_p90, :lat_p99, :lat_min, :lat_max, :bytes_avg
:lat_p50, :lat_p90, :lat_p99, :lat_min, :lat_max, :bytes_avg, :is_ghost
)
""", rows)
self._conn.commit()
Expand Down Expand Up @@ -107,22 +112,22 @@ def get_summary(self) -> dict:
SUM(status_5xx) as calls_5xx,
AVG(lat_p90) as avg_p90,
AVG(lat_p99) as avg_p99
FROM api_metrics WHERE bucket_ts >= ?
FROM api_metrics WHERE bucket_ts >= ? AND is_ghost = 0
""", (since_24h,)).fetchone()

baseline = c.execute("""
SELECT AVG(lat_p90) as baseline_p90
FROM api_metrics WHERE bucket_ts >= ? AND bucket_ts < ?
FROM api_metrics WHERE bucket_ts >= ? AND bucket_ts < ? AND is_ghost = 0
""", (since_7d, since_24h)).fetchone()

active = c.execute("""
SELECT COUNT(DISTINCT route || '|' || method) as n
FROM api_metrics WHERE bucket_ts >= ?
FROM api_metrics WHERE bucket_ts >= ? AND is_ghost = 0
""", (since_24h,)).fetchone()

total = c.execute("""
SELECT COUNT(DISTINCT route || '|' || method) as n
FROM api_metrics
FROM api_metrics WHERE is_ghost = 0
""").fetchone()

return {
Expand All @@ -136,7 +141,7 @@ def get_routes(self, hours: int = 24) -> list[dict]:
since = _now_sec() - hours * 3600
rows = self._conn.execute("""
SELECT
route, method,
route, method, is_ghost,
SUM(total_calls) as calls,
SUM(status_2xx) as calls_2xx,
SUM(status_4xx) as calls_4xx,
Expand All @@ -148,9 +153,9 @@ def get_routes(self, hours: int = 24) -> list[dict]:
AVG(bytes_avg) as bytes_avg
FROM api_metrics
WHERE bucket_ts >= ?
GROUP BY route, method
ORDER BY calls DESC
LIMIT 50
GROUP BY route, method, is_ghost
ORDER BY is_ghost ASC, calls DESC
LIMIT 100
""", (since,)).fetchall()
return [dict(r) for r in rows]

Expand All @@ -171,6 +176,7 @@ def get_dead_candidates(self, inactive_days: int = 21) -> list[dict]:
rows = self._conn.execute("""
SELECT route, method, MAX(bucket_ts) as last_seen
FROM api_metrics
WHERE is_ghost = 0
GROUP BY route, method
HAVING last_seen < ?
ORDER BY last_seen ASC
Expand Down Expand Up @@ -217,14 +223,14 @@ def get_latency_anomaly_data(self) -> dict:

recent = self._conn.execute("""
SELECT route, method, AVG(lat_p99) as avg_p99
FROM api_metrics WHERE bucket_ts >= ?
FROM api_metrics WHERE bucket_ts >= ? AND is_ghost = 0
GROUP BY route, method
""", (since_1h,)).fetchall()

baseline = self._conn.execute("""
SELECT route, method, lat_p99
FROM api_metrics
WHERE bucket_ts >= ? AND bucket_ts < ? AND lat_p99 IS NOT NULL
WHERE bucket_ts >= ? AND bucket_ts < ? AND lat_p99 IS NOT NULL AND is_ghost = 0
""", (since_7d, since_1h)).fetchall()

return {
Expand Down Expand Up @@ -270,7 +276,7 @@ def get_drift_data(self) -> list[dict]:
CAST(bucket_ts / 86400 AS INTEGER) as day_bucket,
AVG(lat_p90) as p90
FROM api_metrics
WHERE bucket_ts >= ? AND lat_p90 IS NOT NULL
WHERE bucket_ts >= ? AND lat_p90 IS NOT NULL AND is_ghost = 0
GROUP BY route, method, day_bucket
ORDER BY route, method, day_bucket
""", (since_30d,)).fetchall()
Expand All @@ -282,7 +288,7 @@ def get_global_time_series(self, hours: int = 24) -> list[dict]:
SELECT bucket_ts, SUM(total_calls) as calls,
AVG(lat_p50) as p50, AVG(lat_p90) as p90,
AVG(lat_p99) as p99, SUM(status_5xx) as errors
FROM api_metrics WHERE bucket_ts >= ?
FROM api_metrics WHERE bucket_ts >= ? AND is_ghost = 0
GROUP BY bucket_ts ORDER BY bucket_ts ASC
""", (since,)).fetchall()
return [dict(r) for r in rows]
Expand Down
1 change: 1 addition & 0 deletions apiforgepy/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ async def dispatch(self, request: Request, call_next):
"release": self._release,
"service": self._service,
"response_size": int(content_length) if content_length else None,
"is_ghost": route_obj is None,
})
except Exception:
pass # never crash the host application
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "apiforgepy"
version = "2.1.1"
version = "2.1.2"

description = "API observability & intelligence for FastAPI/Starlette — local-first, privacy-first"
readme = "README.md"
Expand Down
1 change: 1 addition & 0 deletions tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def insert_row(db, **overrides):
lat_min=10.0,
lat_max=150.0,
bytes_avg=None,
is_ghost=0,
)
db.insert_batch([{**defaults, **overrides}])

Expand Down
Loading