Skip to content

feat: add LangfuseEventHandler telemetry package v0.2.1#3

Open
mrkeithelliott wants to merge 3 commits into
mainfrom
pr-c-langfuse-telemetry
Open

feat: add LangfuseEventHandler telemetry package v0.2.1#3
mrkeithelliott wants to merge 3 commits into
mainfrom
pr-c-langfuse-telemetry

Conversation

@mrkeithelliott

Copy link
Copy Markdown
Contributor

Summary

  • Adds agentflow/telemetry/ package with LangfuseEventHandler that maps EventBus events to the Langfuse trace/span/generation hierarchy
  • WORKFLOW_STARTEDlangfuse.trace(), NODE_STARTEDtrace.span(), LLM_CALL_COMPLETEDspan.generation() (with model, token counts, latency), TOOL_CALLED → child span, NODE_COMPLETED/WORKFLOW_COMPLETED close their objects, ERROR marks spans as failed
  • Lazy import in top-level __init__.py via __getattr__ — importing agentflow never fails even when langfuse is not installed
  • Bumps version to 0.2.1; adds telemetry = ["langfuse>=2.0"] optional dependency group

Test plan

  • pip install -e ".[dev]" passes (no langfuse installed) — lazy import does not raise
  • pip install -e ".[dev,telemetry]" then from agentflow import LangfuseEventHandler works
  • Existing test suite: pytest tests/ -v — 150 pass, 1 pre-existing Qdrant mock failure unrelated to this PR
  • End-to-end: set LANGFUSE_* env vars, send a message through openclaw, verify trace tree appears in Langfuse cloud

🤖 Generated with Claude Code

mrkeithelliott and others added 3 commits March 11, 2026 13:43
…abs-agentflow v0.1.1

- Rename package to gittielabs-agentflow in pyproject.toml (import stays agentflow)
- Add LLM_CALL_STARTED / LLM_CALL_COMPLETED constants to events.py
- Emit LLM call events from AgentExecutor.run() with node_id, session_id, timing, and token usage
- Fix google_genai.py: thinking metadata was computed but dropped; now passed to AgentResponse
- Export new event constants from agentflow.__init__
- Add orchestration and telemetry optional dependency groups

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- session/multi_user.py: MultiUserHistory with HistoryPersistence protocol
  - Per-user in-memory history with configurable max and optional persistence backend
  - load()/save() for pluggable DB persistence (e.g. PostgreSQL adapter)
  - Lazy loading: each user's history is loaded from persistence exactly once per session
- orchestration/types.py: Plan and PlanStep TypedDicts
- orchestration/executor.py: DAGExecutor with topological round-batching
  - asyncio.gather() for independent steps, dependency resolution via {{key.result}} refs
  - Graceful fallback to sequential execution on circular deps
- orchestration/classifier.py: ComplexityClassifier
  - Fast bypass for short messages (< 20 words, no multi-step markers)
  - Haiku LLM call for ambiguous messages, defaults to SIMPLE on failure
- Export all new symbols from agentflow.__init__

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- telemetry/langfuse_handler.py: LangfuseEventHandler
  - Maps EventBus events to Langfuse trace/span/generation hierarchy
  - WORKFLOW_STARTED -> trace, NODE_STARTED -> span, LLM_CALL_COMPLETED -> generation
  - TOOL_CALLED -> child span under node span, NODE_COMPLETED/ERROR -> close span
  - LLM generations nested under correct node span via node_id thread-through
  - flush() method for graceful shutdown
- Lazy import via __getattr__ to avoid hard failure when langfuse not installed
- env var is LANGFUSE_BASE_URL (not LANGFUSE_HOST) for US cloud

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant