Skip to content

Optimize broker concurrency and add rate limiting middleware#23

Merged
artugro merged 2 commits intomainfrom
claude/review-open-issues-4GIYy
Apr 1, 2026
Merged

Optimize broker concurrency and add rate limiting middleware#23
artugro merged 2 commits intomainfrom
claude/review-open-issues-4GIYy

Conversation

@artugro
Copy link
Copy Markdown
Collaborator

@artugro artugro commented Mar 30, 2026

Summary

This PR improves performance and reliability of the agent broker by batching independent async lookups, adds Redis-backed rate limiting middleware, enhances the concurrency limiter with pub/sub notifications, and adds smoke tests for CI.

Key Changes

Broker Performance Optimization (src/services/broker.py)

  • Batch async lookups: Refactored invoke_agent() to use asyncio.gather() for independent database/repository calls (config, agent, conversation) instead of sequential awaits, reducing latency
  • Conditional gathering: Only fetches conversation when conv_id is provided, avoiding unnecessary queries
  • Removed duplicate agent fetch: Agent is now fetched once via the batched gather instead of separately later in the function
  • Code formatting improvements for readability (line wrapping for long expressions)

Rate Limiting Middleware (src/core/rate_limit.py - new)

  • Added RateLimitMiddleware using Redis fixed-window counters per client (by user ID or IP)
  • Graceful degradation: allows all requests when Redis is unavailable
  • Returns 429 with Retry-After and rate limit headers when limit exceeded
  • Skips health check endpoints to avoid false positives
  • Configurable via RATE_LIMIT_ENABLED and RATE_LIMIT_REQUESTS_PER_MINUTE settings

Concurrency Limiter Enhancement (src/workflow/utils/concurrency.py)

  • Pub/sub notifications: RedisSemaphore now uses Redis pub/sub to notify waiters when slots are released instead of polling
  • Fallback to polling: If pub/sub subscription fails, automatically falls back to polling with configurable interval
  • Improved acquire logic: Tries immediate acquisition first, then waits for notifications with timeout
  • Better docstrings explaining the pub/sub + polling hybrid approach

API Metadata Endpoints (src/main.py)

  • Added /.well-known/agent.json endpoint for A2A (agent-to-agent) discovery with AgentCard format
  • Reformatted existing /.well-known/mcp/server-card.json for readability
  • Added RateLimitMiddleware to middleware stack

Settings (src/core/settings.py)

  • Added RATE_LIMIT_ENABLED (default: True) and RATE_LIMIT_REQUESTS_PER_MINUTE configuration options

Testing (tests/test_smoke.py - new, tests/conftest.py - new)

  • Added smoke tests that verify FastAPI app imports and basic endpoints respond without a live backend
  • Tests cover: health endpoint, OpenAPI schema, A2A agent card, MCP server card
  • Added pytest configuration to auto-mark integration tests so CI can skip them with -m "not integration"

CI (github/workflows/ci.yml)

  • Re-enabled test job (was temporarily skipped)

Implementation Details

  • The broker optimization reduces sequential I/O by parallelizing independent lookups, improving P99 latency for agent invocations
  • Rate limiting uses a simple fixed-window counter (not sliding window) for Redis efficiency; window resets every 60 seconds
  • Concurrency limiter's pub/sub approach reduces polling overhead in high-concurrency scenarios while maintaining correctness via race condition handling
  • All new features include graceful degradation when Redis is unavailable

https://claude.ai/code/session_015zTWwVFtUWeN5ameM6teEa

… A2A endpoint, rate limiting, CI tests

- #7: Batch independent DB queries in broker invoke_agent() with asyncio.gather()
  to reduce sequential round trips (config + agent + conversation fetched concurrently)
- #8: Replace polling-based Redis semaphore with pub/sub notifications for near-instant
  slot acquisition, with fallback to polling if pub/sub fails
- #15: Add A2A-compatible AgentCard endpoint at /.well-known/agent.json for
  agent-to-agent discovery
- #11: Add Redis-backed rate limiting middleware with Retry-After headers and
  graceful degradation when Redis is unavailable
- #17: Re-enable CI test job (remove if:false gate), add conftest to skip live
  integration tests, add smoke tests for CI

All changes are internal optimizations or additive endpoints — no breaking
changes to the public API surface or intuno-sdk compatibility.

https://claude.ai/code/session_015zTWwVFtUWeN5ameM6teEa
@artugro
Copy link
Copy Markdown
Collaborator Author

artugro commented Apr 1, 2026

@claude

@artugro artugro self-assigned this Apr 1, 2026
- Use pipeline expire instead of conditional TTL check to prevent
  key living forever if process crashes between INCR and EXPIRE
- Fix docstring: "sliding window" → "fixed-window" to match implementation
- Replace hardcoded agent card URL with settings.BASE_URL for staging/dev
- Add BASE_URL setting (defaults to https://api.intuno.ai)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@artugro artugro merged commit afa9021 into main Apr 1, 2026
0 of 2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants