Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apiforgepy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from .transport import LocalTransport
from .cloud_transport import CloudTransport

__version__ = "2.1.1"
__version__ = "2.2.0"
__all__ = ["ApiForgeMiddleware"]


Expand Down
113 changes: 76 additions & 37 deletions apiforgepy/aggregator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import threading
import time
import math
Expand Down Expand Up @@ -33,28 +34,41 @@ def record(self, event: dict):
with self._lock:
if key not in self._buffer:
self._buffer[key] = {
"method": event["method"],
"route": event["route"],
"env": event["env"],
"release": event.get("release"),
"is_ghost": is_ghost,
"durations": [],
"response_sizes": [],
"status_2xx": 0,
"status_4xx": 0,
"status_5xx": 0,
"method": event["method"],
"route": event["route"],
"env": event["env"],
"release": event.get("release"),
"is_ghost": is_ghost,
"durations": [],
"ttfb_durations": [],
"response_sizes": [],
"request_sizes": [],
"inflight_samples": [],
"status_2xx": 0,
"status_3xx": 0,
"status_4xx": 0,
"status_5xx": 0,
"status_map": {},
}
bucket = self._buffer[key]
bucket["durations"].append(event["duration_ms"])

if event.get("ttfb_ms") is not None:
bucket["ttfb_durations"].append(event["ttfb_ms"])
if event.get("response_size") is not None:
bucket["response_sizes"].append(event["response_size"])
if event.get("request_size") is not None:
bucket["request_sizes"].append(event["request_size"])
if event.get("inflight") is not None:
bucket["inflight_samples"].append(event["inflight"])

s = event["status"]
if 200 <= s < 300:
bucket["status_2xx"] += 1
elif 400 <= s < 500:
bucket["status_4xx"] += 1
elif s >= 500:
bucket["status_5xx"] += 1
if 200 <= s < 300: bucket["status_2xx"] += 1
elif 300 <= s < 400: bucket["status_3xx"] += 1
elif 400 <= s < 500: bucket["status_4xx"] += 1
elif s >= 500: bucket["status_5xx"] += 1

bucket["status_map"][s] = bucket["status_map"].get(s, 0) + 1

def _schedule(self):
self._timer = threading.Timer(self._flush_interval, self._tick)
Expand All @@ -76,29 +90,54 @@ def _flush(self):
rows = []

for bucket in snapshot.values():
sorted_d = sorted(bucket["durations"])
sorted_d = sorted(bucket["durations"])
sorted_ttfb = sorted(bucket["ttfb_durations"])
n = len(sorted_d)
sizes = bucket["response_sizes"]
bytes_avg = sum(sizes) / len(sizes) if sizes else None
lat_avg = sum(bucket["durations"]) / n if n > 0 else None

sizes = bucket["response_sizes"]
req_sizes = bucket["request_sizes"]
inflight = bucket["inflight_samples"]

bytes_avg = sum(sizes) / len(sizes) if sizes else None
request_size_avg = sum(req_sizes) / len(req_sizes) if req_sizes else None
lat_avg = sum(bucket["durations"]) / n if n > 0 else None
inflight_avg = sum(inflight) / len(inflight) if inflight else None
inflight_max = max(inflight) if inflight else None

# Granular distribution — sorted by count desc
status_dist = (
json.dumps(
dict(sorted(bucket["status_map"].items(), key=lambda x: -x[1]))
)
if bucket["status_map"] else None
)

rows.append({
"bucket_ts": bucket_ts,
"route": bucket["route"],
"method": bucket["method"],
"env": bucket["env"],
"release_tag": bucket["release"],
"is_ghost": 1 if bucket["is_ghost"] else 0,
"status_2xx": bucket["status_2xx"],
"status_4xx": bucket["status_4xx"],
"status_5xx": bucket["status_5xx"],
"total_calls": n,
"lat_p50": _percentile(sorted_d, 0.50),
"lat_p90": _percentile(sorted_d, 0.90),
"lat_p99": _percentile(sorted_d, 0.99),
"lat_avg": lat_avg,
"lat_min": sorted_d[0] if sorted_d else 0,
"lat_max": sorted_d[-1] if sorted_d else 0,
"bytes_avg": bytes_avg,
"bucket_ts": bucket_ts,
"route": bucket["route"],
"method": bucket["method"],
"env": bucket["env"],
"release_tag": bucket["release"],
"is_ghost": 1 if bucket["is_ghost"] else 0,
"status_2xx": bucket["status_2xx"],
"status_3xx": bucket["status_3xx"],
"status_4xx": bucket["status_4xx"],
"status_5xx": bucket["status_5xx"],
"status_dist": status_dist,
"total_calls": n,
"lat_p50": _percentile(sorted_d, 0.50),
"lat_p90": _percentile(sorted_d, 0.90),
"lat_p99": _percentile(sorted_d, 0.99),
"lat_avg": lat_avg,
"lat_min": sorted_d[0] if sorted_d else 0,
"lat_max": sorted_d[-1] if sorted_d else 0,
"lat_ttfb_p50": _percentile(sorted_ttfb, 0.50) if sorted_ttfb else None,
"lat_ttfb_p90": _percentile(sorted_ttfb, 0.90) if sorted_ttfb else None,
"lat_ttfb_p99": _percentile(sorted_ttfb, 0.99) if sorted_ttfb else None,
"bytes_avg": bytes_avg,
"request_size_avg": request_size_avg,
"inflight_avg": inflight_avg,
"inflight_max": inflight_max,
})

self._transport.write(rows)
114 changes: 75 additions & 39 deletions apiforgepy/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,39 +28,59 @@ def _init(self):
);

CREATE TABLE IF NOT EXISTS api_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
bucket_ts INTEGER NOT NULL,
route TEXT NOT NULL,
method TEXT NOT NULL,
env TEXT NOT NULL DEFAULT 'production',
release_tag TEXT,
status_2xx INTEGER NOT NULL DEFAULT 0,
status_4xx INTEGER NOT NULL DEFAULT 0,
status_5xx INTEGER NOT NULL DEFAULT 0,
total_calls INTEGER NOT NULL DEFAULT 0,
lat_p50 REAL,
lat_p90 REAL,
lat_p99 REAL,
lat_min REAL,
lat_max REAL,
bytes_avg REAL,
is_ghost INTEGER NOT NULL DEFAULT 0
id INTEGER PRIMARY KEY AUTOINCREMENT,
bucket_ts INTEGER NOT NULL,
route TEXT NOT NULL,
method TEXT NOT NULL,
env TEXT NOT NULL DEFAULT 'production',
release_tag TEXT,
status_2xx INTEGER NOT NULL DEFAULT 0,
status_3xx INTEGER NOT NULL DEFAULT 0,
status_4xx INTEGER NOT NULL DEFAULT 0,
status_5xx INTEGER NOT NULL DEFAULT 0,
status_dist TEXT,
total_calls INTEGER NOT NULL DEFAULT 0,
lat_p50 REAL,
lat_p90 REAL,
lat_p99 REAL,
lat_avg REAL,
lat_min REAL,
lat_max REAL,
lat_ttfb_p50 REAL,
lat_ttfb_p90 REAL,
lat_ttfb_p99 REAL,
bytes_avg REAL,
request_size_avg REAL,
inflight_avg REAL,
inflight_max INTEGER,
is_ghost INTEGER NOT NULL DEFAULT 0
);

CREATE INDEX IF NOT EXISTS idx_route_ts ON api_metrics (route, method, bucket_ts);
CREATE INDEX IF NOT EXISTS idx_bucket_ts ON api_metrics (bucket_ts);
CREATE INDEX IF NOT EXISTS idx_release ON api_metrics (release_tag)
WHERE release_tag IS NOT NULL;
""")

# Migrations for databases created before these columns were introduced
try:
c.execute("ALTER TABLE api_metrics ADD COLUMN bytes_avg REAL")
except Exception:
pass
try:
c.execute("ALTER TABLE api_metrics ADD COLUMN is_ghost INTEGER NOT NULL DEFAULT 0")
except Exception:
pass
migrations = [
"ALTER TABLE api_metrics ADD COLUMN bytes_avg REAL",
"ALTER TABLE api_metrics ADD COLUMN is_ghost INTEGER NOT NULL DEFAULT 0",
"ALTER TABLE api_metrics ADD COLUMN status_3xx INTEGER NOT NULL DEFAULT 0",
"ALTER TABLE api_metrics ADD COLUMN status_dist TEXT",
"ALTER TABLE api_metrics ADD COLUMN lat_avg REAL",
"ALTER TABLE api_metrics ADD COLUMN lat_ttfb_p50 REAL",
"ALTER TABLE api_metrics ADD COLUMN lat_ttfb_p90 REAL",
"ALTER TABLE api_metrics ADD COLUMN lat_ttfb_p99 REAL",
"ALTER TABLE api_metrics ADD COLUMN request_size_avg REAL",
"ALTER TABLE api_metrics ADD COLUMN inflight_avg REAL",
"ALTER TABLE api_metrics ADD COLUMN inflight_max INTEGER",
]
for sql in migrations:
try:
c.execute(sql)
except Exception:
pass
c.commit()

def insert_batch(self, rows: list[dict]):
Expand All @@ -70,12 +90,22 @@ def insert_batch(self, rows: list[dict]):
self._conn.executemany("""
INSERT INTO api_metrics
(bucket_ts, route, method, env, release_tag,
status_2xx, status_4xx, status_5xx, total_calls,
lat_p50, lat_p90, lat_p99, lat_min, lat_max, bytes_avg, is_ghost)
status_2xx, status_3xx, status_4xx, status_5xx, status_dist,
total_calls,
lat_p50, lat_p90, lat_p99, lat_avg, lat_min, lat_max,
lat_ttfb_p50, lat_ttfb_p90, lat_ttfb_p99,
bytes_avg, request_size_avg,
inflight_avg, inflight_max,
is_ghost)
VALUES (
:bucket_ts, :route, :method, :env, :release_tag,
:status_2xx, :status_4xx, :status_5xx, :total_calls,
:lat_p50, :lat_p90, :lat_p99, :lat_min, :lat_max, :bytes_avg, :is_ghost
:status_2xx, :status_3xx, :status_4xx, :status_5xx, :status_dist,
:total_calls,
:lat_p50, :lat_p90, :lat_p99, :lat_avg, :lat_min, :lat_max,
:lat_ttfb_p50, :lat_ttfb_p90, :lat_ttfb_p99,
:bytes_avg, :request_size_avg,
:inflight_avg, :inflight_max,
:is_ghost
)
""", rows)
self._conn.commit()
Expand Down Expand Up @@ -108,6 +138,7 @@ def get_summary(self) -> dict:
SELECT
SUM(total_calls) as calls_total,
SUM(status_2xx) as calls_2xx,
SUM(status_3xx) as calls_3xx,
SUM(status_4xx) as calls_4xx,
SUM(status_5xx) as calls_5xx,
AVG(lat_p90) as avg_p90,
Expand Down Expand Up @@ -142,15 +173,19 @@ def get_routes(self, hours: int = 24) -> list[dict]:
rows = self._conn.execute("""
SELECT
route, method, is_ghost,
SUM(total_calls) as calls,
SUM(status_2xx) as calls_2xx,
SUM(status_4xx) as calls_4xx,
SUM(status_5xx) as calls_5xx,
AVG(lat_p50) as p50,
AVG(lat_p90) as p90,
AVG(lat_p99) as p99,
MAX(lat_max) as lat_max,
AVG(bytes_avg) as bytes_avg
SUM(total_calls) as calls,
SUM(status_2xx) as calls_2xx,
SUM(status_3xx) as calls_3xx,
SUM(status_4xx) as calls_4xx,
SUM(status_5xx) as calls_5xx,
AVG(lat_p50) as p50,
AVG(lat_p90) as p90,
AVG(lat_p99) as p99,
MAX(lat_max) as lat_max,
AVG(bytes_avg) as bytes_avg,
AVG(request_size_avg) as request_size_avg,
AVG(inflight_avg) as inflight_avg,
MAX(inflight_max) as inflight_max
FROM api_metrics
WHERE bucket_ts >= ?
GROUP BY route, method, is_ghost
Expand All @@ -164,7 +199,8 @@ def get_time_series(self, route: str, method: str, hours: int = 24) -> list[dict
rows = self._conn.execute("""
SELECT bucket_ts, SUM(total_calls) as calls,
AVG(lat_p50) as p50, AVG(lat_p90) as p90,
AVG(lat_p99) as p99, SUM(status_5xx) as errors
AVG(lat_p99) as p99, SUM(status_5xx) as errors,
SUM(status_3xx) as redirects
FROM api_metrics
WHERE route = ? AND method = ? AND bucket_ts >= ?
GROUP BY bucket_ts ORDER BY bucket_ts ASC
Expand Down
Loading
Loading