Skip to content

feat(signals): register Centaur callbacks on centaur_review watch#399

Open
dimakis wants to merge 4 commits into
mainfrom
session/2026-06-20-7b0ba3eb145b
Open

feat(signals): register Centaur callbacks on centaur_review watch#399
dimakis wants to merge 4 commits into
mainfrom
session/2026-06-20-7b0ba3eb145b

Conversation

@dimakis

@dimakis dimakis commented Jun 25, 2026

Copy link
Copy Markdown
Owner

Summary

  • SignalProcessor now registers a callback URL with Centaur's signal bridge when watching a centaur_review gate
  • Deregisters on unwatch (best-effort)
  • Polling fallback continues alongside push — belt and suspenders
  • Gracefully handles Centaur being unavailable (fire-and-forget registration)

Part of multi-agent orchestration Phase 2 (Centaur signal bridge). Companion PR: centaur side.

Test plan

  • vitest run server/__tests__/signal-processor.test.ts — 11 tests pass
  • Manual integration test with Centaur

🤖 Generated with Claude Code

dimakis and others added 3 commits June 25, 2026 13:31
Three fixes for the SSE transport regression causing 8+ minute message
delivery delays and silent message drops:

1. Periodic sync replay storm prevention (connection-registry):
   - Replace isSessionActive with shouldSync that checks session STATE
     (ACTIVE/STARTING/CREATED only), not just the isActive boolean.
     SUSPENDED/DETACHED/ENDED sessions no longer trigger sync.
   - Add MAX_REPLAY_GAP (200 events) cap — when cursor is far behind
     head (e.g. after server restart), skip ahead instead of replaying
     thousands of events at 50/5s. Client lazy-loads history via API.
   - Add getHeadSeq to EventStore for gap detection.

2. POST retry for failed message sends (sse-connection):
   - Re-queue send POSTs that get HTTP errors (e.g. 404 from stale
     connectionId after reconnect) or network errors. Messages flush
     on next successful reconnect instead of being silently dropped.
   - Only send endpoints retry — stop/interrupt are fire-and-forget.

3. Guard ProcessTransport.interrupt() crash (chat.ts):
   - Wrap queryInstance.interrupt() in try/catch to prevent unhandled
     errors from killing the server process. This was the trigger that
     caused server restart → in-memory state loss → replay storms.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add MAX_PENDING_SENDS cap to re-queue path in doPost via enqueuePending()
  helper, preventing unbounded queue growth on repeated flush failures
- Add EventStore.getHeadSeq unit tests against real SQLite (0 events, single
  event, max of multiple events)
- Add interrupt() crash survival test — verifies ProcessTransport error
  doesn't propagate and message still reaches inputQueue
- Use mockImplementationOnce in retry tests instead of manual call counters

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
SignalProcessor now registers a callback URL with Centaur when watching
a centaur_review gate, enabling push-based resolution alongside the
existing 30s polling fallback. Gracefully handles Centaur being down.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

@dimakis dimakis left a comment

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Centaur Review

Found 7 issue(s) (3 warning).

packages/client/src/sse-connection.ts

Solid defensive hardening (POST retry, replay-gap cap, interrupt crash guard) with good test coverage, but the send retry path lacks a retry limit and the Centaur callback registration has a race between async register and sync unwatch.

  • 🟡 bugs (L336): No retry limit on re-queued sends. When doPost fails for a 'send' endpoint, it calls enqueuePending, and flushPendingSends calls doPost again. If the server persistently rejects a message (e.g. 400/422 validation error), it cycles through flush → fail → re-queue indefinitely across reconnects. Consider adding a retry counter to break the loop after N attempts. [fixable]

server/signal-processor.ts

Solid defensive hardening (POST retry, replay-gap cap, interrupt crash guard) with good test coverage, but the send retry path lacks a retry limit and the Centaur callback registration has a race between async register and sync unwatch.

  • 🟡 bugs (L71): Race condition: registerCentaurCallback is async but not awaited in watch(). If unwatch(taskId) is called before the registration POST completes, the DELETE arrives first and the POST creates a dangling registration in Centaur. Consider tracking the registration promise and awaiting it in unwatch/deregisterCentaurCallback. [fixable]
  • 🔵 style (L202): Uses (err as Error).message which violates codebase convention of err instanceof Error ? err.message : String(err) (see chat.ts:1271, etc.). If a non-Error is thrown, .message will be undefined instead of a useful string. [fixable]
  • 🔵 unsafe_assumptions (L43): Constructor defaults centaurBaseUrl='http://localhost:8642' and mitzoBaseUrl='http://localhost:3100', but the production instantiation at server/index.ts:177 passes neither. These hardcoded localhost URLs will break if Centaur or Mitzo run on different hosts/ports. Consider wiring these from env vars (e.g. CENTAUR_BASE_URL, PORT) at the call site. [fixable]

packages/harness/src/connection-registry.ts

Solid defensive hardening (POST retry, replay-gap cap, interrupt crash guard) with good test coverage, but the send retry path lacks a retry limit and the Centaur callback registration has a race between async register and sync unwatch.

  • 🟡 unsafe_assumptions (L242): When the cursor is skipped ahead via MAX_REPLAY_GAP, the client is never notified that events were dropped. The comment says 'Client can lazy-load older events via the /api/sessions/:id/events API' but the client has no way to know it needs to. Consider sending a gap_detected event to the client so it can trigger a full restore. [fixable]

packages/client/src/__tests__/sse-connection.test.ts

Solid defensive hardening (POST retry, replay-gap cap, interrupt crash guard) with good test coverage, but the send retry path lacks a retry limit and the Centaur callback registration has a race between async register and sync unwatch.

  • 🔵 missing_tests: No test for a persistently failing send (e.g. server returns 400 on every retry). The existing tests only cover transient failures that succeed on the next reconnect. A test showing the message is eventually dropped or capped would document the intended behavior. [fixable]

packages/harness/__tests__/connection-registry.test.ts

Solid defensive hardening (POST retry, replay-gap cap, interrupt crash guard) with good test coverage, but the send retry path lacks a retry limit and the Centaur callback registration has a race between async register and sync unwatch.

  • 🔵 missing_tests: No test for getHeadSeq returning 0 (no events for session). The arithmetic head - cursor > MAX_REPLAY_GAP evaluates to 0 - 0 > 200 which is false, so behavior is correct but the edge case is undocumented by tests. [fixable]

},
body: JSON.stringify(body),
});
if (!res.ok && endpoint === 'send') {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 bugs: No retry limit on re-queued sends. When doPost fails for a 'send' endpoint, it calls enqueuePending, and flushPendingSends calls doPost again. If the server persistently rejects a message (e.g. 400/422 validation error), it cycles through flush → fail → re-queue indefinitely across reconnects. Consider adding a retry counter to break the loop after N attempts. [fixable]

this.watches.set(taskId, { taskId, gateConfig, intervalId });

// Register callback with Centaur for push-based resolution
if (gateConfig.type === 'centaur_review') {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 bugs: Race condition: registerCentaurCallback is async but not awaited in watch(). If unwatch(taskId) is called before the registration POST completes, the DELETE arrives first and the POST creates a dangling registration in Centaur. Consider tracking the registration promise and awaiting it in unwatch/deregisterCentaurCallback. [fixable]

Comment thread server/signal-processor.ts Outdated
// Centaur might not be running — polling fallback covers this case
log.warn('centaur callback registration error', {
taskId,
error: (err as Error).message,

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 style: Uses (err as Error).message which violates codebase convention of err instanceof Error ? err.message : String(err) (see chat.ts:1271, etc.). If a non-Error is thrown, .message will be undefined instead of a useful string. [fixable]

constructor(
store: TaskStore,
onSignalResolved: (taskId: string) => void,
centaurBaseUrl = 'http://localhost:8642',

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 unsafe_assumptions: Constructor defaults centaurBaseUrl='http://localhost:8642' and mitzoBaseUrl='http://localhost:3100', but the production instantiation at server/index.ts:177 passes neither. These hardcoded localhost URLs will break if Centaur or Mitzo run on different hosts/ports. Consider wiring these from env vars (e.g. CENTAUR_BASE_URL, PORT) at the call site. [fixable]

// Prevents replay storms after server restart or long disconnects.
if (this.eventStore.getHeadSeq) {
const head = this.eventStore.getHeadSeq(sessionId);
if (head - cursor > MAX_REPLAY_GAP) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 unsafe_assumptions: When the cursor is skipped ahead via MAX_REPLAY_GAP, the client is never notified that events were dropped. The comment says 'Client can lazy-load older events via the /api/sessions/:id/events API' but the client has no way to know it needs to. Consider sending a gap_detected event to the client so it can trigger a full restore. [fixable]

…g, env vars

- Fix race condition: track pending registrations, await before deregister
- Safe error coercion: err instanceof Error check instead of blind cast
- Wire CENTAUR_URL and MITZO_URL env vars in SignalProcessor constructor

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@dimakis

dimakis commented Jun 26, 2026

Copy link
Copy Markdown
Owner Author

Centaur Review

Found 7 issue(s) (3 warning).

server/signal-processor.ts

Solid PR with good test coverage for SSE retry, replay caps, and interrupt crash resilience. Main gap: unwatchAll() doesn't deregister Centaur callbacks (bug), and checkCentaurReview still hardcodes the Centaur URL instead of using the newly injected base URL.

  • 🟡 bugs (L96): unwatchAll() does not call deregisterCentaurCallback() for centaur_review entries. When the orchestrator stops (or process shuts down), Centaur retains stale registered callbacks that will POST to dead endpoints. unwatchAll() should iterate entries and call deregisterCentaurCallback for centaur_review tasks, matching the logic in unwatch(). [fixable]
  • 🟡 bugs (L89): unwatch() is synchronous but calls async deregisterCentaurCallback() without awaiting. deregisterCentaurCallback internally awaits pendingRegistrations (line 214-217), but since unwatch() doesn't await, that await is lost. If unwatch() is called right after watch(), the DELETE can race ahead of the POST, and the registration becomes a dangling entry in Centaur. Consider making unwatch() async, or at minimum documenting the fire-and-forget semantics. [fixable]
  • 🟡 unsafe_assumptions (L337): checkCentaurReview() (standalone function at line 337) hardcodes 'http://localhost:8642' instead of using the injected centaurBaseUrl. The constructor now accepts centaurBaseUrl for register/deregister, but polling still bypasses it. If CENTAUR_URL is set to a non-localhost value, polling will hit the wrong endpoint while register/deregister hit the correct one. [fixable]
  • 🔵 unsafe_assumptions (L187): registerCentaurCallback calls fetch() without an AbortSignal timeout. If Centaur hangs (DNS, network), the promise never resolves and the pendingRegistrations entry leaks indefinitely. Consider adding AbortSignal.timeout(5000) to the fetch options for both register and deregister calls. [fixable]

server/__tests__/signal-processor.test.ts

Solid PR with good test coverage for SSE retry, replay caps, and interrupt crash resilience. Main gap: unwatchAll() doesn't deregister Centaur callbacks (bug), and checkCentaurReview still hardcodes the Centaur URL instead of using the newly injected base URL.

  • 🔵 missing_tests: No test verifies that unwatchAll() deregisters centaur_review callbacks (which it currently doesn't — see related bug finding). Once the bug is fixed, a test should confirm that unwatchAll() sends DELETE requests for all centaur_review entries. [fixable]

packages/harness/__tests__/connection-registry.test.ts

Solid PR with good test coverage for SSE retry, replay caps, and interrupt crash resilience. Main gap: unwatchAll() doesn't deregister Centaur callbacks (bug), and checkCentaurReview still hardcodes the Centaur URL instead of using the newly injected base URL.

  • 🔵 missing_tests (L511): The MAX_REPLAY_GAP test 'caps replay depth when cursor is far behind head' is missing a call to registry.stopPeriodicSync() at the end, unlike the companion test on line 557. This leaks the periodic timer. Cosmetic since tests use fake timers, but inconsistent. [fixable]

server/index.ts

Solid PR with good test coverage for SSE retry, replay caps, and interrupt crash resilience. Main gap: unwatchAll() doesn't deregister Centaur callbacks (bug), and checkCentaurReview still hardcodes the Centaur URL instead of using the newly injected base URL.

  • 🔵 style (L190): MITZO_URL default uses string interpolation with PORT: http://localhost:${PORT}. If PORT parsing fails (NaN from invalid env var), this silently creates 'http://localhost:NaN'. Low risk since PORT is validated by the HTTP server bind, but the fallback is fragile. [fixable]

@dimakis

dimakis commented Jun 26, 2026

Copy link
Copy Markdown
Owner Author

Centaur Review

Found 7 issue(s) (2 warning).

server/signal-processor.ts

Solid reliability improvements (SSE retry, replay cap, interrupt guard). The main actionable issue is that unwatchAll() doesn't deregister Centaur callbacks, inconsistent with unwatch() — a straightforward fix.

  • 🟡 bugs (L96): unwatchAll() does not call deregisterCentaurCallback() for centaur_review watches, leaving dangling callback registrations in Centaur. This is inconsistent with unwatch() (line 88-90) which does deregister. Called during graceful shutdown (server/index.ts:962), so registered Centaur callbacks will persist across server restarts. Fix: iterate entries and call deregisterCentaurCallback(taskId) for centaur_review gates before clearing the map. [fixable]
  • 🔵 unsafe_assumptions (L89): unwatch() is synchronous (void) but fire-and-forgets the async deregisterCentaurCallback(). If unwatch is followed by a watch for the same taskId before deregister completes, the DELETE could arrive after the new registration, creating a dangling registration. This is unlikely in practice (orchestrator tick is single-threaded), but worth noting. Consider returning the promise or documenting the constraint. [fixable]
  • 🔵 style (L75): The type assertion gateConfig as GateConfig & { pr_url: string } is used in both watch() (line 75) and checkCentaurReview() (line ~260). Consider adding a CentaurReviewGateConfig discriminated type to avoid repeated unsafe casts. [fixable]

server/__tests__/signal-processor.test.ts

Solid reliability improvements (SSE retry, replay cap, interrupt guard). The main actionable issue is that unwatchAll() doesn't deregister Centaur callbacks, inconsistent with unwatch() — a straightforward fix.

  • 🟡 missing_tests: No test verifies that unwatchAll() deregisters Centaur callbacks — the existing unwatchAll test (line 146) only uses human_approval gates. This gap masks the bug where unwatchAll doesn't call deregisterCentaurCallback. [fixable]

server/index.ts

Solid reliability improvements (SSE retry, replay cap, interrupt guard). The main actionable issue is that unwatchAll() doesn't deregister Centaur callbacks, inconsistent with unwatch() — a straightforward fix.

  • 🔵 unsafe_assumptions (L188): process.env.CENTAUR_URL || 'http://localhost:8642' — an empty string env var (CENTAUR_URL='') passes the || check and falls through to the default. This is probably fine, but ?? 'http://localhost:8642' would be more precise if you want to distinguish empty from unset. [fixable]

packages/harness/src/connection-registry.ts

Solid reliability improvements (SSE retry, replay cap, interrupt guard). The main actionable issue is that unwatchAll() doesn't deregister Centaur callbacks, inconsistent with unwatch() — a straightforward fix.

  • 🔵 regressions (L240): When replay is capped (cursor skipped ahead), the client receives no notification that events were dropped. The log.warn is server-side only. The client may have stale state with no way to know it needs a full refresh. Consider emitting a replay_gap event to the client so it can trigger a full session restore via the REST API. [fixable]

packages/client/src/sse-connection.ts

Solid reliability improvements (SSE retry, replay cap, interrupt guard). The main actionable issue is that unwatchAll() doesn't deregister Centaur callbacks, inconsistent with unwatch() — a straightforward fix.

  • 🔵 bugs (L335): When doPost re-queues a failed send, the send() method already returned true to the caller. If the re-queued message also fails on the next reconnect, it will be re-queued again indefinitely (no retry limit). This is bounded by MAX_PENDING_SENDS eviction, but a message could cycle between queue and failure forever if the server consistently rejects it (e.g. invalid payload). Consider adding a retry counter. [fixable]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant