feat(signals): register Centaur callbacks on centaur_review watch#399
feat(signals): register Centaur callbacks on centaur_review watch#399dimakis wants to merge 4 commits into
Conversation
Three fixes for the SSE transport regression causing 8+ minute message
delivery delays and silent message drops:
1. Periodic sync replay storm prevention (connection-registry):
- Replace isSessionActive with shouldSync that checks session STATE
(ACTIVE/STARTING/CREATED only), not just the isActive boolean.
SUSPENDED/DETACHED/ENDED sessions no longer trigger sync.
- Add MAX_REPLAY_GAP (200 events) cap — when cursor is far behind
head (e.g. after server restart), skip ahead instead of replaying
thousands of events at 50/5s. Client lazy-loads history via API.
- Add getHeadSeq to EventStore for gap detection.
2. POST retry for failed message sends (sse-connection):
- Re-queue send POSTs that get HTTP errors (e.g. 404 from stale
connectionId after reconnect) or network errors. Messages flush
on next successful reconnect instead of being silently dropped.
- Only send endpoints retry — stop/interrupt are fire-and-forget.
3. Guard ProcessTransport.interrupt() crash (chat.ts):
- Wrap queryInstance.interrupt() in try/catch to prevent unhandled
errors from killing the server process. This was the trigger that
caused server restart → in-memory state loss → replay storms.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add MAX_PENDING_SENDS cap to re-queue path in doPost via enqueuePending() helper, preventing unbounded queue growth on repeated flush failures - Add EventStore.getHeadSeq unit tests against real SQLite (0 events, single event, max of multiple events) - Add interrupt() crash survival test — verifies ProcessTransport error doesn't propagate and message still reaches inputQueue - Use mockImplementationOnce in retry tests instead of manual call counters Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
SignalProcessor now registers a callback URL with Centaur when watching a centaur_review gate, enabling push-based resolution alongside the existing 30s polling fallback. Gracefully handles Centaur being down. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
dimakis
left a comment
There was a problem hiding this comment.
Centaur Review
Found 7 issue(s) (3 warning).
packages/client/src/sse-connection.ts
Solid defensive hardening (POST retry, replay-gap cap, interrupt crash guard) with good test coverage, but the send retry path lacks a retry limit and the Centaur callback registration has a race between async register and sync unwatch.
- 🟡 bugs (L336): No retry limit on re-queued sends. When
doPostfails for a 'send' endpoint, it callsenqueuePending, andflushPendingSendscallsdoPostagain. If the server persistently rejects a message (e.g. 400/422 validation error), it cycles through flush → fail → re-queue indefinitely across reconnects. Consider adding a retry counter to break the loop after N attempts.[fixable]
server/signal-processor.ts
Solid defensive hardening (POST retry, replay-gap cap, interrupt crash guard) with good test coverage, but the send retry path lacks a retry limit and the Centaur callback registration has a race between async register and sync unwatch.
- 🟡 bugs (L71): Race condition:
registerCentaurCallbackis async but not awaited inwatch(). Ifunwatch(taskId)is called before the registration POST completes, the DELETE arrives first and the POST creates a dangling registration in Centaur. Consider tracking the registration promise and awaiting it inunwatch/deregisterCentaurCallback.[fixable] - 🔵 style (L202): Uses
(err as Error).messagewhich violates codebase convention oferr instanceof Error ? err.message : String(err)(see chat.ts:1271, etc.). If a non-Error is thrown,.messagewill beundefinedinstead of a useful string.[fixable] - 🔵 unsafe_assumptions (L43): Constructor defaults
centaurBaseUrl='http://localhost:8642'andmitzoBaseUrl='http://localhost:3100', but the production instantiation atserver/index.ts:177passes neither. These hardcoded localhost URLs will break if Centaur or Mitzo run on different hosts/ports. Consider wiring these from env vars (e.g.CENTAUR_BASE_URL,PORT) at the call site.[fixable]
packages/harness/src/connection-registry.ts
Solid defensive hardening (POST retry, replay-gap cap, interrupt crash guard) with good test coverage, but the send retry path lacks a retry limit and the Centaur callback registration has a race between async register and sync unwatch.
- 🟡 unsafe_assumptions (L242): When the cursor is skipped ahead via
MAX_REPLAY_GAP, the client is never notified that events were dropped. The comment says 'Client can lazy-load older events via the /api/sessions/:id/events API' but the client has no way to know it needs to. Consider sending agap_detectedevent to the client so it can trigger a full restore.[fixable]
packages/client/src/__tests__/sse-connection.test.ts
Solid defensive hardening (POST retry, replay-gap cap, interrupt crash guard) with good test coverage, but the send retry path lacks a retry limit and the Centaur callback registration has a race between async register and sync unwatch.
- 🔵 missing_tests: No test for a persistently failing send (e.g. server returns 400 on every retry). The existing tests only cover transient failures that succeed on the next reconnect. A test showing the message is eventually dropped or capped would document the intended behavior.
[fixable]
packages/harness/__tests__/connection-registry.test.ts
Solid defensive hardening (POST retry, replay-gap cap, interrupt crash guard) with good test coverage, but the send retry path lacks a retry limit and the Centaur callback registration has a race between async register and sync unwatch.
- 🔵 missing_tests: No test for
getHeadSeqreturning 0 (no events for session). The arithmetichead - cursor > MAX_REPLAY_GAPevaluates to0 - 0 > 200which is false, so behavior is correct but the edge case is undocumented by tests.[fixable]
| }, | ||
| body: JSON.stringify(body), | ||
| }); | ||
| if (!res.ok && endpoint === 'send') { |
There was a problem hiding this comment.
🟡 bugs: No retry limit on re-queued sends. When doPost fails for a 'send' endpoint, it calls enqueuePending, and flushPendingSends calls doPost again. If the server persistently rejects a message (e.g. 400/422 validation error), it cycles through flush → fail → re-queue indefinitely across reconnects. Consider adding a retry counter to break the loop after N attempts. [fixable]
| this.watches.set(taskId, { taskId, gateConfig, intervalId }); | ||
|
|
||
| // Register callback with Centaur for push-based resolution | ||
| if (gateConfig.type === 'centaur_review') { |
There was a problem hiding this comment.
🟡 bugs: Race condition: registerCentaurCallback is async but not awaited in watch(). If unwatch(taskId) is called before the registration POST completes, the DELETE arrives first and the POST creates a dangling registration in Centaur. Consider tracking the registration promise and awaiting it in unwatch/deregisterCentaurCallback. [fixable]
| // Centaur might not be running — polling fallback covers this case | ||
| log.warn('centaur callback registration error', { | ||
| taskId, | ||
| error: (err as Error).message, |
There was a problem hiding this comment.
🔵 style: Uses (err as Error).message which violates codebase convention of err instanceof Error ? err.message : String(err) (see chat.ts:1271, etc.). If a non-Error is thrown, .message will be undefined instead of a useful string. [fixable]
| constructor( | ||
| store: TaskStore, | ||
| onSignalResolved: (taskId: string) => void, | ||
| centaurBaseUrl = 'http://localhost:8642', |
There was a problem hiding this comment.
🔵 unsafe_assumptions: Constructor defaults centaurBaseUrl='http://localhost:8642' and mitzoBaseUrl='http://localhost:3100', but the production instantiation at server/index.ts:177 passes neither. These hardcoded localhost URLs will break if Centaur or Mitzo run on different hosts/ports. Consider wiring these from env vars (e.g. CENTAUR_BASE_URL, PORT) at the call site. [fixable]
| // Prevents replay storms after server restart or long disconnects. | ||
| if (this.eventStore.getHeadSeq) { | ||
| const head = this.eventStore.getHeadSeq(sessionId); | ||
| if (head - cursor > MAX_REPLAY_GAP) { |
There was a problem hiding this comment.
🟡 unsafe_assumptions: When the cursor is skipped ahead via MAX_REPLAY_GAP, the client is never notified that events were dropped. The comment says 'Client can lazy-load older events via the /api/sessions/:id/events API' but the client has no way to know it needs to. Consider sending a gap_detected event to the client so it can trigger a full restore. [fixable]
…g, env vars - Fix race condition: track pending registrations, await before deregister - Safe error coercion: err instanceof Error check instead of blind cast - Wire CENTAUR_URL and MITZO_URL env vars in SignalProcessor constructor Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Centaur ReviewFound 7 issue(s) (3 warning).
|
Centaur ReviewFound 7 issue(s) (2 warning).
|
Summary
SignalProcessornow registers a callback URL with Centaur's signal bridge when watching acentaur_reviewgatePart of multi-agent orchestration Phase 2 (Centaur signal bridge). Companion PR: centaur side.
Test plan
vitest run server/__tests__/signal-processor.test.ts— 11 tests pass🤖 Generated with Claude Code