Pipeline error handler for Python. Captures exceptions, deduplicates failure signatures, optionally diagnoses root causes with LLMs, routes alerts (Telegram / Slack / email / GitHub / S3 / custom), and can generate fix PRs.
Typed, IDE-discoverable configuration. Pydantic v2 models + a fluent FlowDoctor.builder() mean you don't need a yaml file — and when you have one, the schema is enforced at load time.
Fail-loud by default. Configuration errors — missing tokens, unresolved ${VAR} references, misconfigured notifiers — raise ConfigError at construction time instead of silently degrading. A silently-degraded error monitor defeats the purpose.
from flow_doctor import FlowDoctor, TelegramNotifierConfig
fd = (
FlowDoctor.builder("morning-signal")
.add_notifier(TelegramNotifierConfig()) # creds from FLOW_DOCTOR_TELEGRAM_*
.with_dedup(cooldown_minutes=60)
.build()
)
with fd.guard():
run_pipeline() # exceptions captured, deduplicated, routed, re-raisedException → Capture → Dedup → Diagnose (LLM, opt) → Notify (Telegram/...) → Fix PR (opt)
- Capture — exception, traceback, logs, contextvars (
flow_name,stage) - Dedup — same error signature within cooldown window is suppressed (normalized to ignore reqIds, UUIDs, contract symbols, etc.)
- Cascade — if a declared upstream dependency also failed, tag it and skip diagnosis
- Diagnose (opt) — check the knowledge base (free), then call Claude if rate limit allows
- Notify — Telegram / Slack / email / GitHub issue / S3 changelog (rate-limited with daily digest fallback)
- Fix (opt) — human adds
flow-doctor:fixlabel on a filed issue, triggering automated fix PR generation
pip install flow-doctor # core only
pip install "flow-doctor[diagnosis]" # + LLM diagnosis (anthropic SDK)
pip install "flow-doctor[diagnosis,remediation]" # + auto-remediation (boto3)
pip install "flow-doctor[all]" # everythingPython 3.9+. Core install is dependency-light (pyyaml, pydantic v2 + pydantic-settings, python-dotenv, typing_extensions); each extra pulls only what that capability needs.
0.6.0 is in its rc cycle.
pip install flow-doctorresolves to the latest stable (0.5.0); add--pre(e.g.pip install --pre flow-doctor) to pull0.6.0rc1, or pinflow-doctor==0.6.0rc1. 0.6.0 removes the deprecatedflow_doctor.init()— see Migrating.
The builder is typed, IDE-discoverable, and works without a yaml file. Notifier credentials fall through to FLOW_DOCTOR_* env vars when not passed inline.
from flow_doctor import FlowDoctor, TelegramNotifierConfig
fd = (
FlowDoctor.builder("morning-signal")
.add_notifier(TelegramNotifierConfig()) # bot_token + chat_id from env
.with_dedup(cooldown_minutes=60)
.build()
)Three idiomatic ways to use the resulting FlowDoctor in your pipeline:
# 1. Context manager — exception is captured + re-raised
with fd.guard():
run_pipeline()
# 2. Decorator
@fd.monitor
def lambda_handler(event, context):
run_pipeline()
# 3. Direct reporting — never crashes the caller
try:
run_pipeline()
except Exception as e:
fd.report(e, context={"date": "2026-05-13"})Async pipelines:
async def run():
try:
await pipeline()
except Exception as exc:
await fd.report_async(exc)Contextvars propagate automatically. Stamp flow_name / stage once and any fd.report() inside picks them up — no need to thread context=... through every layer:
import flow_doctor
with flow_doctor.context(flow_name="morning-signal", stage="rank"):
run_rank() # any fd.report() inside auto-records flow_name + stagefd.notify_success(...) sends a success ping at Severity.INFO — for the "pipeline finished OK" signal, not just failures. It's persisted like any report but never triggers diagnosis or remediation, and reaches only notifiers that opt into info via notify_on:
fd = (
FlowDoctor.builder("morning-signal")
# failures + success pings both go to this channel
.add_notifier(TelegramNotifierConfig(notify_on=["critical", "error", "info"]))
.build()
)
with fd.guard():
run_pipeline()
fd.notify_success("morning-signal done", body="42 stories, 3 searches")notify_on is per-notifier. When unset it defaults to {critical, error} (warnings and info are skipped), so existing failure-only channels are unaffected. Use it to fan failures to one channel and success/warnings to another. An async notify_success_async(...) mirrors report_async.
Five first-class notifiers ship today, each with its own Pydantic config exposed via the discriminated union NotifierConfig:
| Config | Channel | Setup |
|---|---|---|
TelegramNotifierConfig |
Telegram Bot API | @BotFather → /newbot → bot token + chat_id. Recommended default. |
SlackNotifierConfig |
Slack incoming webhook | Slack app → incoming webhook URL |
EmailNotifierConfig |
SMTP (Gmail / any) | sender + recipients + SMTP password (Gmail App Password works) |
GitHubNotifierConfig |
GitHub issues | PAT with Issues: write on the target repo |
S3NotifierConfig |
System-wide changelog corpus | Bucket + subsystem; IAM allows s3:PutObject on the prefix |
Mix freely:
from flow_doctor import (
EmailNotifierConfig,
FlowDoctor,
GitHubNotifierConfig,
TelegramNotifierConfig,
)
fd = (
FlowDoctor.builder("alpha-engine-predictor")
.add_notifier(TelegramNotifierConfig(message_thread_id=42)) # forum topic
.add_notifier(GitHubNotifierConfig(repo="me/alpha-engine")) # token from env
.add_notifier(EmailNotifierConfig(sender="me@x.com",
recipients=["me@x.com"]))
.build()
)Why Telegram leads the examples:
- Two-minute setup. Message
@BotFather→/newbot→ save the token. Then DM your bot, thenGET https://api.telegram.org/bot<TOKEN>/getUpdatesand grabresult[0].message.chat.id. - Per-flow routing for free. One bot, N channels via
chat_id, or N forum-topic threads viamessage_thread_idin a supergroup. - Mobile push is automatic. No "did the email go to spam" mystery.
- Token rotation is one
@BotFathercall. No app password / SES verified identity / Slack workspace admin.
export FLOW_DOCTOR_TELEGRAM_BOT_TOKEN=1234567890:ABC...
export FLOW_DOCTOR_TELEGRAM_CHAT_ID=-1001234567890 # negative for supergroups/channelsFlowDoctor.builder("pipeline").add_notifier(TelegramNotifierConfig()).build()The plugin is auto-discovered (registered via [project.entry-points.pytest11]). Downstream tests get a flow_doctor_recorder fixture with no imports required.
def test_pipeline_reports_db_errors(flow_doctor_recorder):
run_pipeline_that_should_fail(flow_doctor_recorder)
assert len(flow_doctor_recorder.reports) == 1
assert flow_doctor_recorder.last.exc_type == "DBError"
assert flow_doctor_recorder.last.ambient_context["stage"] == "ingest"flow_doctor_recorder is a RecordingFlowDoctor — it implements FlowDoctorProtocol, so wherever production code expects a FlowDoctorProtocol you can swap it in directly. It also snapshots any active flow_doctor.context() scope onto each captured incident's ambient_context field.
Helpers on the recorder: .clear(), .last, .of_type(exc_name), plus full async support via await recorder.report_async(...).
from flow_doctor import FlowDoctorProtocol
def make_pipeline(fd: FlowDoctorProtocol):
with fd.guard():
...@runtime_checkable, so isinstance(fd, FlowDoctorProtocol) works at runtime as well as at type-check time. Combined with the shipped py.typed marker, mypy --strict and pyright treat flow-doctor's annotations as authoritative.
Pure-Python adapter that serializes a Report into an OTel SpanEvent-shaped dict — ready to ship to a collector via your existing OTLP exporter today. No opentelemetry-* dependency in this release; the bundled OTLP exporter notifier is on the 0.6.0 roadmap.
from flow_doctor.otel import report_to_otel_span_event
span_event = report_to_otel_span_event(report)
# {
# "resource": {"service.name": "<flow_name>"},
# "name": "<stage>",
# "time_unix_nano": ...,
# "severity_text": "ERROR", "severity_number": 17,
# "attributes": {
# "exception.type": "ValueError",
# "exception.message": "...",
# "exception.stacktrace": "...",
# "flow_doctor.error_signature": "...",
# "context.run_id": "...",
# },
# }See the Quick Start. No yaml required.
flow_name: my-pipeline
repo: owner/repo
notify:
- type: telegram
bot_token: ${FLOW_DOCTOR_TELEGRAM_BOT_TOKEN}
chat_id: -1001234567890
message_thread_id: 42 # optional: forum-topic routing
- type: github
repo: owner/repo
- type: s3
bucket: my-changelog-bucket
subsystem: predictor # one of the documented vocab values
store:
type: sqlite
path: flow_doctor.db
diagnosis:
enabled: true
model: claude-sonnet-4-6-20250514
api_key: ${ANTHROPIC_API_KEY}
timeout_seconds: 30
max_daily_cost_usd: 1.00
github:
token: ${GITHUB_TOKEN}
labels: [flow-doctor]
rate_limits:
max_diagnosed_per_day: 3
max_issues_per_day: 3
dedup_cooldown_minutes: 60
dependencies:
- upstream-pipeline
remediation:
enabled: true
dry_run: true
auto_remediate_min_confidence: 0.9
auto_fix:
enabled: true
confidence_threshold: 0.90
test_command: "python -m pytest tests/ -x -q"
scope:
allow: ["src/", "lib/"]
deny: ["*.yaml", "*.yml"]from flow_doctor import FlowDoctor
fd = FlowDoctor.from_config(config_path="flow-doctor.yaml")${VAR} references resolve from the process environment at load time. Unresolved references raise ConfigError — no silent passthrough.
flow-doctor reads credentials from environment variables as its primary configuration mechanism. Every notifier has a documented fallback chain: explicit value → FLOW_DOCTOR_* canonical name → common conventions.
The contract is a typed pydantic-settings model (flow_doctor.core.settings.FlowDoctorSettings), so each field resolves — in precedence order — from:
- the process environment (
FLOW_DOCTOR_*canonical name, then the legacy aliases below), - a
.envfile (path viaFLOW_DOCTOR_ENV_FILE, default.env), - a secrets directory (
FLOW_DOCTOR_SECRETS_DIR— one file per env-var name; Docker / Kubernetes file-mounted secrets).
So a self-hosted / compose deploy can drop a .env next to the app or mount file secrets — no code change. Example .env:
FLOW_DOCTOR_TELEGRAM_BOT_TOKEN=1234567890:ABC...
FLOW_DOCTOR_TELEGRAM_CHAT_ID=-1001234567890| Variable | Used by | Fallback chain | Required when |
|---|---|---|---|
FLOW_DOCTOR_TELEGRAM_BOT_TOKEN |
Telegram notifier | FLOW_DOCTOR_TELEGRAM_BOT_TOKEN → TELEGRAM_BOT_TOKEN |
Telegram notifier config has no explicit bot_token field |
FLOW_DOCTOR_TELEGRAM_CHAT_ID |
Telegram notifier | FLOW_DOCTOR_TELEGRAM_CHAT_ID → TELEGRAM_CHAT_ID |
Telegram notifier config has no explicit chat_id field |
FLOW_DOCTOR_GITHUB_TOKEN |
GitHub notifier, auto-fix PR creator | FLOW_DOCTOR_GITHUB_TOKEN → GH_TOKEN → GITHUB_TOKEN |
Any GitHub notifier or auto-fix is configured |
FLOW_DOCTOR_GITHUB_REPO |
GitHub notifier | FLOW_DOCTOR_GITHUB_REPO |
GitHub notifier config has no explicit repo field |
FLOW_DOCTOR_SMTP_PASSWORD |
Email notifier | FLOW_DOCTOR_SMTP_PASSWORD → GMAIL_APP_PASSWORD |
SMTP requires auth |
FLOW_DOCTOR_SMTP_SENDER |
Email notifier | FLOW_DOCTOR_SMTP_SENDER → EMAIL_SENDER |
Email notifier config has no explicit sender field |
FLOW_DOCTOR_SMTP_RECIPIENTS |
Email notifier | FLOW_DOCTOR_SMTP_RECIPIENTS → EMAIL_RECIPIENTS |
Email notifier config has no explicit recipients field |
FLOW_DOCTOR_SLACK_WEBHOOK |
Slack notifier | FLOW_DOCTOR_SLACK_WEBHOOK → SLACK_WEBHOOK_URL |
Slack notifier config has no explicit webhook_url field |
FLOW_DOCTOR_S3_BUCKET |
S3 notifier | FLOW_DOCTOR_S3_BUCKET → CHANGELOG_BUCKET |
S3 notifier config has no explicit bucket field |
FLOW_DOCTOR_ANTHROPIC_API_KEY |
LLM diagnosis, auto-fix generator | FLOW_DOCTOR_ANTHROPIC_API_KEY → ANTHROPIC_API_KEY |
diagnosis.enabled: true or auto-fix is on |
FLOW_DOCTOR_SKIP_PREFLIGHT |
All notifiers' validate() |
(literal) | Set to 1 in tests / offline boot to bypass token/preflight network calls |
Precedence for every field is: explicit value in kwargs/yaml → canonical FLOW_DOCTOR_* env var → convention fallbacks in the order listed. The first non-empty value wins. Missing values raise ConfigError at construction time naming the specific field and the env vars that would satisfy it.
Two env vars, two lines of Python, working alerts on the next exception:
export FLOW_DOCTOR_TELEGRAM_BOT_TOKEN=1234567890:ABC...
export FLOW_DOCTOR_TELEGRAM_CHAT_ID=-1001234567890from flow_doctor import FlowDoctor, TelegramNotifierConfig
fd = FlowDoctor.builder("pipeline").add_notifier(TelegramNotifierConfig()).build()FlowDoctor.builder().build() and FlowDoctor.from_config() both default to strict=True. Any configuration error (missing required field, unresolved ${VAR}, unknown notifier type) raises ConfigError and prevents startup. This is the recommended default — a non-running flow-doctor is a loud failure; a silently-degraded flow-doctor is a silent one.
If you genuinely want best-effort init that logs errors but keeps running with no notifiers, opt in explicitly:
fd = FlowDoctor.builder("pipeline").build(strict=False)Attach to Python's logging system if you want every WARNING+ log to flow through dedup + diagnosis + notify without touching call sites:
import logging
import flow_doctor
fd = flow_doctor.FlowDoctor.builder("pipeline").add_notifier(
flow_doctor.TelegramNotifierConfig()
).build()
handler = fd.get_handler(level=logging.WARNING)
logging.getLogger().addHandler(handler)
logger.warning("Upstream data is 48h stale") # → captured + routed
logger.error("S3 backup failed: AccessDenied")
logger.exception("Pipeline crashed")The handler is non-blocking — emit() enqueues work and returns immediately; a background thread calls fd.report() asynchronously.
Attach recent logs to the next error report for richer diagnosis context:
with fd.capture_logs(level=logging.INFO):
logger.info("Starting scan with 900 tickers...")
run_pipeline()
# All captured logs are attached to the next fd.report() call- Traceback extraction with frame-based signature hashing
- Configurable cooldown window (default 60 min) — same error captured once, not spammed
- Variable-token normalization: reqIds, conIds, contract symbols, UUIDs, AWS request IDs are stripped before hashing, so a library logging the same error against N objects collapses to one signature
- Cascade detection tags downstream failures caused by upstream dependency outages
- Automatic secret scrubbing (AWS keys, Bearer tokens, passwords in URLs)
- Structured root cause analysis via Claude: category, confidence, affected files, remediation
- Six categories:
TRANSIENT,DATA,CODE,CONFIG,EXTERNAL,INFRA - Knowledge base caching — known patterns matched for free before calling the LLM
- Git context assembly (recent commits, changed files) for better diagnosis accuracy
- Daily cost cap (default $1.00) and rate limiting (default 3 diagnoses/day)
- Telegram — Bot API, per-chat / per-thread routing, mobile push (recommended default)
- Slack — webhook-based alerts with severity emoji + diagnosis snippet
- Email — SMTP (Gmail/any) with detailed body
- GitHub issues — auto-filed with diagnosis, traceback, captured logs, machine-readable metadata
- S3 — writes schema-1.0.0 entries to a system-wide changelog corpus
- Daily digest — summarizes rate-limited / suppressed errors at end of day
- Custom notifiers — subclass
flow_doctor.notify.base.Notifier; the abstract base is a public extension point
Both halves are independently toggleable on the GitHub notifier:
GitHubNotifierConfig(
repo="me/app",
auto_create_issue=True, # toggle 1: file an issue on failure (default True)
auto_fix_pr=False, # toggle 2: auto-generate a fix PR for it (default False)
)auto_create_issue(defaultTrue) — file a GitHub issue on failure. SetFalseto silence issue creation without removing the notifier block.auto_fix_pr(defaultFalse) — whenTrue, Flow Doctor applies thefix_label(flow-doctor:fix) to each filed issue, which fires the fix workflow automatically with no human label step. LeaveFalsefor the human-in-the-loop default below.
The fix-generation pipeline itself is the same either way:
- An error occurs and Flow Doctor creates a GitHub issue with structured diagnosis
- The
flow-doctor:fixlabel is applied — by a human (default) or automatically (auto_fix_pr=True) - GitHub Actions triggers
flow-doctor generate-fix - The CLI generates a diff via LLM, validates against scope rules, runs tests
- If tests pass, a PR is opened. If tests fail, a comment explains what went wrong.
Safety gates — fix generation is skipped when:
- Confidence below threshold (default 90%)
- Category is
EXTERNALorINFRA(nothing to fix in code) - Config issue involves credentials/secrets
- Generated diff touches files outside configured scope
- Tests fail after applying the fix
Define patterns that map failure signatures to automated actions:
from flow_doctor.remediation.playbook import (
Playbook, PlaybookPattern, RemediationAction, RemediationType,
)
my_playbook = Playbook(patterns=[
PlaybookPattern(
name="service_down",
description="App service not responding",
category="INFRA",
message_pattern=r"(connection refused|service unavailable)",
action=RemediationAction(
action_type=RemediationType.RESTART_SERVICE,
description="Restart the app service",
commands=["sudo systemctl restart myapp"],
ssm_target="app-server",
),
),
])flow-doctor generate-fix \
--issue-number 42 \
--repo owner/repo \
--token $GITHUB_TOKEN \
--config flow-doctor.yaml \
--dry-runGitHub Actions workflow (copy to your repo at .github/workflows/flow-doctor-fix.yml):
name: Flow Doctor Fix
on:
issues:
types: [labeled]
jobs:
generate-fix:
if: github.event.label.name == 'flow-doctor:fix'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- run: pip install "flow-doctor[diagnosis]"
- run: |
python -m flow_doctor.fix.cli generate-fix \
--issue-number ${{ github.event.issue.number }} \
--repo ${{ github.repository }} \
--token $GITHUB_TOKEN
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}flow_doctor.init() was removed in 0.6.0 (it was @deprecated through 0.5.x). Two drop-in replacements:
# was: fd = flow_doctor.init(config_path="flow-doctor.yaml")
# now, same yaml + kwargs contract:
fd = FlowDoctor.from_config(config_path="flow-doctor.yaml")
# or the typed, IDE-discoverable builder (no yaml):
fd = (
FlowDoctor.builder("pipeline")
.add_notifier(TelegramNotifierConfig())
.build()
)Existing flow-doctor.yaml files keep working unchanged via from_config. The omnibus NotifyChannelConfig is no longer a public/deprecated surface — construct typed *NotifierConfig objects from flow_doctor.notify instead.
flow_doctor/
core/ # Client, builder, config (Pydantic v2), models, dedup,
# rate limiting, scrubber, logging handler, contextvars
_protocol.py # FlowDoctorProtocol public contract
notify/ # Telegram, Slack, Email, GitHub, S3 — concrete notifiers
# + typed Pydantic config models (discriminated union)
diagnosis/ # LLM provider, context assembly, knowledge base, git context
digest/ # Daily digest generator
fix/ # Auto-fix: LLM generator, scope guard, test validator, PR creator, CLI
remediation/ # Decision gate, executor, playbook patterns
storage/ # SQLite backend (thread-safe, per-thread connections)
testing/ # RecordingFlowDoctor + pytest plugin (auto-discovered)
otel.py # Report → OTel SpanEvent serialization adapter
py.typed # PEP 561 marker — annotations are authoritative for mypy/pyright
git clone https://github.com/cipher813/flow-doctor.git
cd flow-doctor
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev,diagnosis]"
python -m pytest tests/ -x -q # 414 tests
python -m coverage run -m pytest && python -m coverage report # coverage
python examples/smoke_test.py # end-to-end smoke test