feat(orchestrator): multi-agent goal execution via session_policy spawn#377
Conversation
dimakis
left a comment
There was a problem hiding this comment.
Centaur Review
Found 8 issue(s) (5 warning).
server/task-orchestrator.ts
Solid feature addition with clean separation between spawn/reuse paths; main concerns are race conditions in the async spawn-failure callback (stale this.goalId/this.pinnedClientId after stop()) and a missing human_approval case in the signal resolve endpoint.
- 🟡 bugs (L355): In the spawn failure callback (
.thenhandler),this.goalId!is used with a non-null assertion, butstop()could have been called between spawn initiation and callback resolution, settinggoalIdtonull. The!assertion would then pass a null tosetTaskContext. Guard withif (!this.goalId) return;before the fallback logic.[fixable] - 🟡 bugs (L356): The spawn-failure fallback sets task context and sends prompt to the pinned session, but does NOT set
this.activeTaskId = next.id. This means the orchestrator's status reports no active task, yet a task is being worked on by the pinned session. If anothertick()fires (e.g., from a concurrent task completing),getNextExecutablewon't return this task (it's alreadyactive), but the missingactiveTaskIdbreaks the status reporting contract. Additionally,this.pinnedClientIdcould also be null by the time the callback fires ifstop()was called.[fixable] - 🔵 unsafe_assumptions (L368): The recursive
this.tick()after spawning is safe (bounded by tasks transitioning frompendingtoactive), but it runs synchronously on the call stack. With many spawn-policy tasks (e.g., 50+ parallel subtasks), this creates deep recursion. Consider usingqueueMicrotask(() => this.tick())orsetImmediateto avoid stack overflow in large task trees.[fixable]
server/app.ts
Solid feature addition with clean separation between spawn/reuse paths; main concerns are race conditions in the async spawn-failure callback (stale this.goalId/this.pinnedClientId after stop()) and a missing human_approval case in the signal resolve endpoint.
- 🟡 bugs (L1006):
SignalResolveBodyacceptstype: 'human_approval'but the switch statement in the resolve endpoint has nocase 'human_approval'branch. Ahuman_approvalsignal will match the gate type viafindActiveSignalTasksbutisMatchwill remainfalsesince no case handles it — the signal will silently be ignored. Either add a matching case or removehuman_approvalfrom the schema.[fixable] - 🟡 missing_tests (L980): The new
POST /api/signals/resolveendpoint has no integration tests. The existing/api/tasks/:id/signalendpoint also lacks tests, but adding a new gate-matching endpoint with per-type dispatch logic (centaur_review, gh_ci, gh_review) without tests is risky — especially the matching logic with optionalrepo/pr/pr_urlfields.[fixable] - 🔵 style (L1008): The gate config property access pattern
(gc as Record<string, unknown>).repo as string | undefinedis repeated 6 times across 3 switch cases. Consider destructuring the gate config once before the switch:const { repo: taskRepo, pr: taskPr, pr_url: taskPrUrl } = gc as Record<string, unknown>. This reduces duplication and makes the matching logic easier to read.[fixable]
server/index.ts
Solid feature addition with clean separation between spawn/reuse paths; main concerns are race conditions in the async spawn-failure callback (stale this.goalId/this.pinnedClientId after stop()) and a missing human_approval case in the signal resolve endpoint.
- 🟡 unsafe_assumptions (L240):
createSessionWorktrees(wtId, BASE_REPO, config.repos)is called synchronously and can throw (e.g., git failures). Thetry/catchhandles this, but if worktree creation fails, the function returnsnulland the orchestrator falls back to the pinned session — yet the task is already markedactivein the orchestrator (line 342). The fallback in the orchestrator's.then(null)path sends to the pinned session but the task was spawned for isolation. Consider whether worktree failure should revert the task topendinginstead.[fixable]
server/__tests__/task-orchestrator.test.ts
Solid feature addition with clean separation between spawn/reuse paths; main concerns are race conditions in the async spawn-failure callback (stale this.goalId/this.pinnedClientId after stop()) and a missing human_approval case in the signal resolve endpoint.
- 🔵 missing_tests (L576): The spawn tests verify the happy path and fallback, but don't test the rejection path (line 359-363 in task-orchestrator.ts) where
spawnSessionthrows — the task should revert topending. Also missing: a test for multiple spawn tasks being dispatched in parallel (verifying the recursivetick()spawns all pending tasks in sequence).[fixable]
| log.info('spawned session for task', { taskId: next.id, clientId }); | ||
| } else { | ||
| log.error('failed to spawn session, falling back to pinned', { taskId: next.id }); | ||
| this.deps.setTaskContext(next.id, this.goalId!); |
There was a problem hiding this comment.
🟡 bugs: In the spawn failure callback (.then handler), this.goalId! is used with a non-null assertion, but stop() could have been called between spawn initiation and callback resolution, setting goalId to null. The ! assertion would then pass a null to setTaskContext. Guard with if (!this.goalId) return; before the fallback logic. [fixable]
| } else { | ||
| log.error('failed to spawn session, falling back to pinned', { taskId: next.id }); | ||
| this.deps.setTaskContext(next.id, this.goalId!); | ||
| if (this.pinnedClientId) sendToChat(this.pinnedClientId, prompt); |
There was a problem hiding this comment.
🟡 bugs: The spawn-failure fallback sets task context and sends prompt to the pinned session, but does NOT set this.activeTaskId = next.id. This means the orchestrator's status reports no active task, yet a task is being worked on by the pinned session. If another tick() fires (e.g., from a concurrent task completing), getNextExecutable won't return this task (it's already active), but the missing activeTaskId breaks the status reporting contract. Additionally, this.pinnedClientId could also be null by the time the callback fires if stop() was called. [fixable]
|
|
||
| // Don't set activeTaskId — spawned tasks run independently. | ||
| // Continue ticking to find more parallel work. | ||
| this.tick(); |
There was a problem hiding this comment.
🔵 unsafe_assumptions: The recursive this.tick() after spawning is safe (bounded by tasks transitioning from pending to active), but it runs synchronously on the call stack. With many spawn-policy tasks (e.g., 50+ parallel subtasks), this creates deep recursion. Consider using queueMicrotask(() => this.tick()) or setImmediate to avoid stack overflow in large task trees. [fixable]
| if (!gc) continue; | ||
|
|
||
| let isMatch = false; | ||
| switch (type) { |
There was a problem hiding this comment.
🟡 bugs: SignalResolveBody accepts type: 'human_approval' but the switch statement in the resolve endpoint has no case 'human_approval' branch. A human_approval signal will match the gate type via findActiveSignalTasks but isMatch will remain false since no case handles it — the signal will silently be ignored. Either add a matching case or remove human_approval from the schema. [fixable]
| * External agents (e.g. Centaur) POST here after completing work — | ||
| * they don't need to know task IDs, just the gate parameters. | ||
| */ | ||
| app.post('/api/signals/resolve', (req, res) => { |
There was a problem hiding this comment.
🟡 missing_tests: The new POST /api/signals/resolve endpoint has no integration tests. The existing /api/tasks/:id/signal endpoint also lacks tests, but adding a new gate-matching endpoint with per-type dispatch logic (centaur_review, gh_ci, gh_review) without tests is risky — especially the matching logic with optional repo/pr/pr_url fields. [fixable]
| let isMatch = false; | ||
| switch (type) { | ||
| case 'centaur_review': { | ||
| const taskPrUrl = (gc as Record<string, unknown>).pr_url as string | undefined; |
There was a problem hiding this comment.
🔵 style: The gate config property access pattern (gc as Record<string, unknown>).repo as string | undefined is repeated 6 times across 3 switch cases. Consider destructuring the gate config once before the switch: const { repo: taskRepo, pr: taskPr, pr_url: taskPrUrl } = gc as Record<string, unknown>. This reduces duplication and makes the matching logic easier to read. [fixable]
| const config = getRepoConfig(); | ||
|
|
||
| try { | ||
| createSessionWorktrees(wtId, BASE_REPO, config.repos); |
There was a problem hiding this comment.
🟡 unsafe_assumptions: createSessionWorktrees(wtId, BASE_REPO, config.repos) is called synchronously and can throw (e.g., git failures). The try/catch handles this, but if worktree creation fails, the function returns null and the orchestrator falls back to the pinned session — yet the task is already marked active in the orchestrator (line 342). The fallback in the orchestrator's .then(null) path sends to the pinned session but the task was spawned for isolation. Consider whether worktree failure should revert the task to pending instead. [fixable]
|
|
||
| const [, prompt, goalArg] = spawnSession.mock.calls[0]; | ||
| expect(goalArg).toBe(goal.id); | ||
| expect(prompt).toContain('Spawn task'); |
There was a problem hiding this comment.
🔵 missing_tests: The spawn tests verify the happy path and fallback, but don't test the rejection path (line 359-363 in task-orchestrator.ts) where spawnSession throws — the task should revert to pending. Also missing: a test for multiple spawn tasks being dispatched in parallel (verifying the recursive tick() spawns all pending tasks in sequence). [fixable]
Enable the TaskOrchestrator to spawn dedicated headless sessions for individual tasks, unlocking multi-agent goal coordination. Changes: - task-orchestrator: implement session_policy 'spawn' path in tick(). Spawned tasks get independent sessions and don't block activeTaskId, allowing parallel execution. Falls back to pinned session on failure. - index.ts: wire spawnSession dep — creates worktrees, registers in event store, starts headless session with NullTransport. - app.ts: add POST /api/signals/resolve endpoint for external agents (e.g. Centaur) to resolve wait_for_signal tasks by gate metadata (type + repo + PR) without knowing task IDs. - api-schemas: add SignalResolveBody zod schema. - task-store: add findActiveSignalTasks(gateType) query method. - Tests: 7 new tests covering spawn dispatch, activeTaskId semantics, fallback behavior, and signal task lookup. Part of multi-agent orchestration initiative — Phase 1 of 4. Next: Centaur signal bridge (Phase 2), PR Shepherd goals (Phase 3). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
615d0d3 to
c000d4f
Compare
|
@centaur-review Please review this PR. This is Phase 1 of the multi-agent orchestration initiative — enables Key areas to review:
|
dimakis
left a comment
There was a problem hiding this comment.
Centaur Review
Found 7 issue(s) (4 warning).
server/task-orchestrator.ts
Sound multi-agent spawn design with correct DFS dispatch and orphan recovery, but the async .then() callbacks read this.goalId/this.pinnedClientId live — creating a race with stop() that can pass null to setTaskContext. The /api/signals/resolve endpoint needs test coverage and has an unhandled human_approval gate type.
- 🟡 bugs (L355): Race condition:
this.goalId!is read inside an async.then()callback, butstop()nullifiesthis.goalId(line 141). If the orchestrator is stopped beforespawnSessionresolves and returns null,this.goalId!evaluates tonull— passed tosetTaskContextas a null goalId. CapturegoalIdbefore the async call (e.g.const goalId = this.goalId;) and use the captured value in the callback, with a guard for the null case.[fixable] - 🟡 bugs (L356): Same race:
this.pinnedClientIdis read inside the async.then()callback, butstop()nullifies it (line 145). If the orchestrator stops before the spawn-failure fallback fires, the fallback silently does nothing — the task staysactivewith no session and no pinned client to handle it, effectively orphaned until the next tick reclaims it. Consider guarding withif (this.state !== 'running') return;at the top of both.then()callbacks.[fixable] - 🔵 unsafe_assumptions (L368): The recursive
this.tick()call after spawning enables parallel task dispatch, but there's no depth guard. If a goal has many spawn-policy leaf tasks, tick() recurses once per task before any stack frame returns. For a goal with ~100 spawn tasks this could approach stack limits. Consider using a loop orqueueMicrotask(()=> this.tick())to avoid unbounded recursion.[fixable]
server/app.ts
Sound multi-agent spawn design with correct DFS dispatch and orphan recovery, but the async .then() callbacks read this.goalId/this.pinnedClientId live — creating a race with stop() that can pass null to setTaskContext. The /api/signals/resolve endpoint needs test coverage and has an unhandled human_approval gate type.
- 🟡 bugs (L1027):
human_approvalis accepted by theSignalResolveBodyschema (api-schemas.ts:188) but has no case in the switch statement (lines 1027-1043). Tasks withtype: 'human_approval'will be fetched from the DB but never matched, silently returning{ ok: true, matched: [] }. Either add a matching case or removehuman_approvalfrom the schema's enum to avoid misleading callers into thinking their resolve request succeeded.[fixable] - 🟡 missing_tests (L1001): The
/api/signals/resolveendpoint has no test coverage. The gate-matching logic (centaur_review by pr_url/repo+pr, gh_ci/gh_review by repo+pr) is non-trivial and untested at the HTTP layer. At minimum, add tests for: successful match by repo+pr, match by pr_url, no-match when gate metadata doesn't align, and auth rejection.[fixable] - 🔵 style (L1029): Repeated
(gc as Record<string, unknown>).fieldName as Typecasts across the switch cases are noisy. Consider extracting a typed interface for gate config metadata (e.g.{ repo?: string; pr?: number; pr_url?: string }) or a helper function to reduce the casting boilerplate.[fixable]
server/__tests__/task-orchestrator.test.ts
Sound multi-agent spawn design with correct DFS dispatch and orphan recovery, but the async .then() callbacks read this.goalId/this.pinnedClientId live — creating a race with stop() that can pass null to setTaskContext. The /api/signals/resolve endpoint needs test coverage and has an unhandled human_approval gate type.
- 🔵 missing_tests: No test covers the spawn-failure fallback path (spawnSession returning null or rejecting). The existing tests verify the happy path and the missing-spawnSession fallback, but not the case where spawnSession is provided and fails. This is where the
this.goalId!race condition lives.[fixable]
| log.info('spawned session for task', { taskId: next.id, clientId }); | ||
| } else { | ||
| log.error('failed to spawn session, falling back to pinned', { taskId: next.id }); | ||
| this.deps.setTaskContext(next.id, this.goalId!); |
There was a problem hiding this comment.
🟡 bugs: Race condition: this.goalId! is read inside an async .then() callback, but stop() nullifies this.goalId (line 141). If the orchestrator is stopped before spawnSession resolves and returns null, this.goalId! evaluates to null — passed to setTaskContext as a null goalId. Capture goalId before the async call (e.g. const goalId = this.goalId;) and use the captured value in the callback, with a guard for the null case. [fixable]
| } else { | ||
| log.error('failed to spawn session, falling back to pinned', { taskId: next.id }); | ||
| this.deps.setTaskContext(next.id, this.goalId!); | ||
| if (this.pinnedClientId) sendToChat(this.pinnedClientId, prompt); |
There was a problem hiding this comment.
🟡 bugs: Same race: this.pinnedClientId is read inside the async .then() callback, but stop() nullifies it (line 145). If the orchestrator stops before the spawn-failure fallback fires, the fallback silently does nothing — the task stays active with no session and no pinned client to handle it, effectively orphaned until the next tick reclaims it. Consider guarding with if (this.state !== 'running') return; at the top of both .then() callbacks. [fixable]
|
|
||
| // Don't set activeTaskId — spawned tasks run independently. | ||
| // Continue ticking to find more parallel work. | ||
| this.tick(); |
There was a problem hiding this comment.
🔵 unsafe_assumptions: The recursive this.tick() call after spawning enables parallel task dispatch, but there's no depth guard. If a goal has many spawn-policy leaf tasks, tick() recurses once per task before any stack frame returns. For a goal with ~100 spawn tasks this could approach stack limits. Consider using a loop or queueMicrotask(()=> this.tick()) to avoid unbounded recursion. [fixable]
| if (!gc) continue; | ||
|
|
||
| let isMatch = false; | ||
| switch (type) { |
There was a problem hiding this comment.
🟡 bugs: human_approval is accepted by the SignalResolveBody schema (api-schemas.ts:188) but has no case in the switch statement (lines 1027-1043). Tasks with type: 'human_approval' will be fetched from the DB but never matched, silently returning { ok: true, matched: [] }. Either add a matching case or remove human_approval from the schema's enum to avoid misleading callers into thinking their resolve request succeeded. [fixable]
| * External agents (e.g. Centaur) POST here after completing work — | ||
| * they don't need to know task IDs, just the gate parameters. | ||
| */ | ||
| app.post('/api/signals/resolve', (req, res) => { |
There was a problem hiding this comment.
🟡 missing_tests: The /api/signals/resolve endpoint has no test coverage. The gate-matching logic (centaur_review by pr_url/repo+pr, gh_ci/gh_review by repo+pr) is non-trivial and untested at the HTTP layer. At minimum, add tests for: successful match by repo+pr, match by pr_url, no-match when gate metadata doesn't align, and auth rejection. [fixable]
| let isMatch = false; | ||
| switch (type) { | ||
| case 'centaur_review': { | ||
| const taskPrUrl = (gc as Record<string, unknown>).pr_url as string | undefined; |
There was a problem hiding this comment.
🔵 style: Repeated (gc as Record<string, unknown>).fieldName as Type casts across the switch cases are noisy. Consider extracting a typed interface for gate config metadata (e.g. { repo?: string; pr?: number; pr_url?: string }) or a helper function to reduce the casting boilerplate. [fixable]
- Fix race condition: capture goalId/pinnedClientId before async boundary and compare against current state in callbacks. Handles stop()+start() with different goal, not just stop() alone. - Add spawn depth guard (MAX_SPAWN_DEPTH=50) to prevent runaway recursive tick dispatch for goals with many spawn tasks. - Fix fallback conflict: when spawn returns null, only claim pinned session if activeTaskId is null; otherwise mark task as blocked. - Add human_approval case to /api/signals/resolve switch statement. - Extract typed destructuring for gate config in resolve endpoint. - Add tests: goal-restart race, pinned-session-busy fallback, gate matching logic for all signal types, endpoint resolve flow. All 63 tests pass. Addresses all findings from Centaur reviews. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@centaur-review Please re-review — all findings from the previous two reviews have been addressed:
|
dimakis
left a comment
There was a problem hiding this comment.
Centaur Review
Found 9 issue(s) (3 critical) (4 warning).
server/index.ts
The spawn lifecycle has three critical bugs: double worktree creation with divergent IDs (leaking worktrees), an EventStore entry disconnected from the real session, and await startChat() blocking for the full session lifetime rather than returning early — making the fire-and-forget spawn pattern ineffective.
- 🔴 bugs (L249): Double worktree creation with divergent IDs.
spawnSessioncallscreateSessionWorktrees(wtId, BASE_REPO, config.repos)fromworktree.ts, creating primary + secondary worktrees. Thenawait startChat(...)internally callsgenerateWtId()(line 799 of chat.ts) to produce a DIFFERENT wtId and creates its own worktrees. The agent runs in the second set; the first set is orphaned and leaked. Either remove thecreateSessionWorktreescall here (letstartChathandle it) or pass the pre-created worktree path viastartChat'scwdoption to skip its internal worktree creation.[fixable] - 🔴 bugs (L251): EventStore entry uses the wrong sessionId.
eventStore.upsertSession({ sessionId: wtId, ... })stores an entry keyed by the spawner'swtId, butstartChatgenerates its ownwtIdinternally and creates a separate EventStore entry for the actual session. The entry created here (withtelosTaskIdlinking to the goal) is disconnected from the real session, and the real session won't havetelosTaskIdset.[fixable] - 🔴 bugs (L261):
await startChat(...)blocks untilrunQueryLoop()finishes — i.e., the entire agent session lifecycle. This meansspawnSessiononly returns theclientIdafter the agent has completed (or crashed). The.then()callback in the orchestrator (task-orchestrator.ts:361) that callssetSessionId()fires when the session is already over, making the session assignment pointless. The intent appears to be fire-and-forget spawning, but the actual behavior is 'await full session completion then return'. ThestartChatcall should not be awaited here, or a callback likeonSessionResolvedshould be used to get the clientId early.[fixable]
server/task-orchestrator.ts
The spawn lifecycle has three critical bugs: double worktree creation with divergent IDs (leaking worktrees), an EventStore entry disconnected from the real session, and await startChat() blocking for the full session lifetime rather than returning early — making the fire-and-forget spawn pattern ineffective.
- 🟡 bugs (L366): Orphan detection ID format mismatch.
setSessionId(next.id, clientId)stores the clientId (formatheadless:xxx) as the task's sessionId, butgetActiveSessionIds()(index.ts:235-241) returns SDK-assigned UUIDs from the registry (viasession.sessionId). These formats will never match, so if this code path executes while the task is still active, orphan detection will falsely reclaim it. Currently mitigated by timing (the callback fires after session completion), but it's a latent correctness issue.[fixable] - 🟡 unsafe_assumptions (L147): No spawned session cleanup on
stop(). When the orchestrator stops, in-flight spawned headless sessions continue running with no way to abort them. Thestop()method resets local state but doesn't track or abort spawned sessions. Consider tracking spawned clientIds and aborting them on stop, or adding anabortSpawnedSessioncallback toOrchestratorDeps.[fixable]
server/app.ts
The spawn lifecycle has three critical bugs: double worktree creation with divergent IDs (leaking worktrees), an EventStore entry disconnected from the real session, and await startChat() blocking for the full session lifetime rather than returning early — making the fire-and-forget spawn pattern ineffective.
- 🟡 unsafe_assumptions (L1041):
human_approvalgate type matches ALL active tasks of that type without any discriminator. If multiple tasks are waiting for human approval, a single resolve signal resolves every one of them. Consider requiring a task-specific identifier (e.g., agate_idfield) to disambiguate which approval is being granted.[fixable] - 🟡 missing_tests (L1001): No HTTP integration test for
POST /api/signals/resolve. The signal-processor unit tests mirror the matching logic, but the actual endpoint (auth guard, Zod validation, response shape, task broadcast, 503 when signalProcessor is null) is untested. Other endpoints in this file have supertest-based route tests.[fixable]
server/task-store.ts
The spawn lifecycle has three critical bugs: double worktree creation with divergent IDs (leaking worktrees), an EventStore entry disconnected from the real session, and await startChat() blocking for the full session lifetime rather than returning early — making the fire-and-forget spawn pattern ineffective.
- 🔵 style (L508):
findActiveSignalTasksfetches all active signal tasks from SQLite then filters by gate type in JavaScript viaJSON.parse(inrowToTask). Sincegate_configis stored as JSON text, the filter could use SQLite'sjson_extract(gate_config, '$.type') = ?to push filtering into the query and avoid deserializing non-matching rows.[fixable]
server/__tests__/signal-processor.test.ts
The spawn lifecycle has three critical bugs: double worktree creation with divergent IDs (leaking worktrees), an EventStore entry disconnected from the real session, and await startChat() blocking for the full session lifetime rather than returning early — making the fire-and-forget spawn pattern ineffective.
- 🔵 style (L250): The test describe block
gate matching (mirrors /api/signals/resolve logic)testsfindActiveSignalTasks+ manual matching in test code, but the actual matching logic lives separately inapp.ts. If the endpoint logic diverges from these tests, the tests won't catch it. Consider extracting the matching logic into a shared function that both the endpoint and tests use.[fixable]
| const config = getRepoConfig(); | ||
|
|
||
| try { | ||
| createSessionWorktrees(wtId, BASE_REPO, config.repos); |
There was a problem hiding this comment.
🔴 bugs: Double worktree creation with divergent IDs. spawnSession calls createSessionWorktrees(wtId, BASE_REPO, config.repos) from worktree.ts, creating primary + secondary worktrees. Then await startChat(...) internally calls generateWtId() (line 799 of chat.ts) to produce a DIFFERENT wtId and creates its own worktrees. The agent runs in the second set; the first set is orphaned and leaked. Either remove the createSessionWorktrees call here (let startChat handle it) or pass the pre-created worktree path via startChat's cwd option to skip its internal worktree creation. [fixable]
| try { | ||
| createSessionWorktrees(wtId, BASE_REPO, config.repos); | ||
|
|
||
| eventStore.upsertSession({ |
There was a problem hiding this comment.
🔴 bugs: EventStore entry uses the wrong sessionId. eventStore.upsertSession({ sessionId: wtId, ... }) stores an entry keyed by the spawner's wtId, but startChat generates its own wtId internally and creates a separate EventStore entry for the actual session. The entry created here (with telosTaskId linking to the goal) is disconnected from the real session, and the real session won't have telosTaskId set. [fixable]
| }); | ||
|
|
||
| const transport = new NullTransport(); | ||
| await startChat(transport, clientId, prompt, { |
There was a problem hiding this comment.
🔴 bugs: await startChat(...) blocks until runQueryLoop() finishes — i.e., the entire agent session lifecycle. This means spawnSession only returns the clientId after the agent has completed (or crashed). The .then() callback in the orchestrator (task-orchestrator.ts:361) that calls setSessionId() fires when the session is already over, making the session assignment pointless. The intent appears to be fire-and-forget spawning, but the actual behavior is 'await full session completion then return'. The startChat call should not be awaited here, or a callback like onSessionResolved should be used to get the clientId early. [fixable]
| if (this.goalId !== capturedGoalId) return; | ||
|
|
||
| if (clientId) { | ||
| this.deps.store.setSessionId(next.id, clientId); |
There was a problem hiding this comment.
🟡 bugs: Orphan detection ID format mismatch. setSessionId(next.id, clientId) stores the clientId (format headless:xxx) as the task's sessionId, but getActiveSessionIds() (index.ts:235-241) returns SDK-assigned UUIDs from the registry (via session.sessionId). These formats will never match, so if this code path executes while the task is still active, orphan detection will falsely reclaim it. Currently mitigated by timing (the callback fires after session completion), but it's a latent correctness issue. [fixable]
| @@ -141,6 +147,7 @@ export class TaskOrchestrator { | |||
| this.specMode = false; | |||
There was a problem hiding this comment.
🟡 unsafe_assumptions: No spawned session cleanup on stop(). When the orchestrator stops, in-flight spawned headless sessions continue running with no way to abort them. The stop() method resets local state but doesn't track or abort spawned sessions. Consider tracking spawned clientIds and aborting them on stop, or adding an abortSpawnedSession callback to OrchestratorDeps. [fixable]
| } | ||
| case 'human_approval': { | ||
| // human_approval signals match any active task of this gate type | ||
| isMatch = true; |
There was a problem hiding this comment.
🟡 unsafe_assumptions: human_approval gate type matches ALL active tasks of that type without any discriminator. If multiple tasks are waiting for human approval, a single resolve signal resolves every one of them. Consider requiring a task-specific identifier (e.g., a gate_id field) to disambiguate which approval is being granted. [fixable]
| * External agents (e.g. Centaur) POST here after completing work — | ||
| * they don't need to know task IDs, just the gate parameters. | ||
| */ | ||
| app.post('/api/signals/resolve', (req, res) => { |
There was a problem hiding this comment.
🟡 missing_tests: No HTTP integration test for POST /api/signals/resolve. The signal-processor unit tests mirror the matching logic, but the actual endpoint (auth guard, Zod validation, response shape, task broadcast, 503 when signalProcessor is null) is untested. Other endpoints in this file have supertest-based route tests. [fixable]
|
|
||
| /** Find active wait_for_signal tasks matching a gate type. */ | ||
| findActiveSignalTasks(gateType: string): Task[] { | ||
| const rows = this.getDb() |
There was a problem hiding this comment.
🔵 style: findActiveSignalTasks fetches all active signal tasks from SQLite then filters by gate type in JavaScript via JSON.parse (in rowToTask). Since gate_config is stored as JSON text, the filter could use SQLite's json_extract(gate_config, '$.type') = ? to push filtering into the query and avoid deserializing non-matching rows. [fixable]
| }); | ||
| }); | ||
|
|
||
| describe('gate matching (mirrors /api/signals/resolve logic)', () => { |
There was a problem hiding this comment.
🔵 style: The test describe block gate matching (mirrors /api/signals/resolve logic) tests findActiveSignalTasks + manual matching in test code, but the actual matching logic lives separately in app.ts. If the endpoint logic diverges from these tests, the tests won't catch it. Consider extracting the matching logic into a shared function that both the endpoint and tests use. [fixable]
dimakis
left a comment
There was a problem hiding this comment.
Centaur Review
Found 7 issue(s) (1 critical) (4 warning).
server/index.ts
Critical double-worktree-creation bug in spawnSession (index.ts creates worktrees then startChat creates more with a different ID); matching logic has a potential type coercion issue; missing HTTP-level tests for the new resolve endpoint.
- 🔴 bugs (L249): Double worktree creation.
spawnSessioncallscreateSessionWorktrees(wtId, BASE_REPO, config.repos)fromworktree.ts, then callsstartChat(transport, clientId, prompt, { isolation: true }). Inside_startChatInner(chat.ts:799), a newwtIdis generated andcreateSessionWorktreesfromchat.tsis called again withisolation: true— creating a second, orphaned worktree. The agent session will use the second worktree (from startChat), while the first (from index.ts) is leaked. Fix: either passisolation: falseto startChat (and pass acwdpointing to the worktree created on line 249), or remove the explicitcreateSessionWorktreescall on line 249 and let startChat handle it.[fixable] - 🟡 bugs (L251):
eventStore.upsertSessionis called withsessionId: wtId, butstartChatwill internally generate a differentwtId(chat.ts:799) and register the session under a different ID. The session metadata stored here won't match the actual session created by startChat, since startChat resolves its own sessionId asynchronously via the SDK.[fixable]
server/app.ts
Critical double-worktree-creation bug in spawnSession (index.ts creates worktrees then startChat creates more with a different ID); matching logic has a potential type coercion issue; missing HTTP-level tests for the new resolve endpoint.
- 🟡 unsafe_assumptions (L1026):
gateConfigis cast toRecord<string, unknown>and destructured forrepo,pr,pr_url. Theprfield from the request body is anumber(per Zod schema), buttaskPrfrom the gate config is parsed from JSON and could be a number or string depending on how it was stored. The===comparison on line 1031/1036 (taskPr === pr) will fail if one is a string and the other a number. Consider using==or explicit coercion for theprcomparison.[fixable] - 🟡 missing_tests (L1001): The
/api/signals/resolveendpoint has no HTTP-level integration test. The signal-processor test suite mirrors the matching logic in-process but doesn't test the endpoint's auth check, Zod validation, or 503 when signalProcessor is null. An app.test.ts (supertest) test would cover these layers.[fixable]
server/task-orchestrator.ts
Critical double-worktree-creation bug in spawnSession (index.ts creates worktrees then startChat creates more with a different ID); matching logic has a potential type coercion issue; missing HTTP-level tests for the new resolve endpoint.
- 🟡 unsafe_assumptions (L345):
next.sessionPolicy ?? 'reuse'is dead code —sessionPolicyalways has a value from the DB (default'auto'). Tasks created via the API without specifyingsessionPolicyget'auto', which falls through to theelse(reuse) branch because'auto' !== 'spawn'. This works but'auto'is undocumented as a policy. If'auto'should eventually mean something different (e.g., spawn when resources are available), the fallback logic needs updating. Consider making this explicit:policy === 'spawn'is the only check that matters.
server/task-store.ts
Critical double-worktree-creation bug in spawnSession (index.ts creates worktrees then startChat creates more with a different ID); matching logic has a potential type coercion issue; missing HTTP-level tests for the new resolve endpoint.
- 🔵 style (L509):
findActiveSignalTasksfetches all active signal tasks from SQLite, then filters by gate type in JavaScript. The gate type could be pushed into the SQL query usingjson_extract(gate_config, '$.type') = ?to reduce data transfer and deserialization overhead, especially if the number of active signal tasks grows.[fixable]
server/__tests__/task-orchestrator.test.ts
Critical double-worktree-creation bug in spawnSession (index.ts creates worktrees then startChat creates more with a different ID); matching logic has a potential type coercion issue; missing HTTP-level tests for the new resolve endpoint.
- 🔵 missing_tests: No test covers the
spawnSessionerror handler when the orchestrator is still on the same goal (lines 386-399 of task-orchestrator.ts). The existing 'blocks task when spawnSession rejects' test validates the task status change, but doesn't verify thatbroadcastTasksis called in the rejection handler. Also, no test covers theMAX_SPAWN_DEPTHlimit being hit — the depth guard at line 405 is untested.[fixable]
| const config = getRepoConfig(); | ||
|
|
||
| try { | ||
| createSessionWorktrees(wtId, BASE_REPO, config.repos); |
There was a problem hiding this comment.
🔴 bugs: Double worktree creation. spawnSession calls createSessionWorktrees(wtId, BASE_REPO, config.repos) from worktree.ts, then calls startChat(transport, clientId, prompt, { isolation: true }). Inside _startChatInner (chat.ts:799), a new wtId is generated and createSessionWorktrees from chat.ts is called again with isolation: true — creating a second, orphaned worktree. The agent session will use the second worktree (from startChat), while the first (from index.ts) is leaked. Fix: either pass isolation: false to startChat (and pass a cwd pointing to the worktree created on line 249), or remove the explicit createSessionWorktrees call on line 249 and let startChat handle it. [fixable]
| try { | ||
| createSessionWorktrees(wtId, BASE_REPO, config.repos); | ||
|
|
||
| eventStore.upsertSession({ |
There was a problem hiding this comment.
🟡 bugs: eventStore.upsertSession is called with sessionId: wtId, but startChat will internally generate a different wtId (chat.ts:799) and register the session under a different ID. The session metadata stored here won't match the actual session created by startChat, since startChat resolves its own sessionId asynchronously via the SDK. [fixable]
| const gc = task.gateConfig; | ||
| if (!gc) continue; | ||
|
|
||
| const { repo: taskRepo, pr: taskPr, pr_url: taskPrUrl } = gc as Record<string, unknown>; |
There was a problem hiding this comment.
🟡 unsafe_assumptions: gateConfig is cast to Record<string, unknown> and destructured for repo, pr, pr_url. The pr field from the request body is a number (per Zod schema), but taskPr from the gate config is parsed from JSON and could be a number or string depending on how it was stored. The === comparison on line 1031/1036 (taskPr === pr) will fail if one is a string and the other a number. Consider using == or explicit coercion for the pr comparison. [fixable]
| * External agents (e.g. Centaur) POST here after completing work — | ||
| * they don't need to know task IDs, just the gate parameters. | ||
| */ | ||
| app.post('/api/signals/resolve', (req, res) => { |
There was a problem hiding this comment.
🟡 missing_tests: The /api/signals/resolve endpoint has no HTTP-level integration test. The signal-processor test suite mirrors the matching logic in-process but doesn't test the endpoint's auth check, Zod validation, or 503 when signalProcessor is null. An app.test.ts (supertest) test would cover these layers. [fixable]
| this.deps.setTaskContext(next.id, this.goalId); | ||
| this.deps.broadcastTasks(); | ||
| this.deps.broadcastStatus(this.getStatus()); | ||
| const policy = next.sessionPolicy ?? 'reuse'; |
There was a problem hiding this comment.
🟡 unsafe_assumptions: next.sessionPolicy ?? 'reuse' is dead code — sessionPolicy always has a value from the DB (default 'auto'). Tasks created via the API without specifying sessionPolicy get 'auto', which falls through to the else (reuse) branch because 'auto' !== 'spawn'. This works but 'auto' is undocumented as a policy. If 'auto' should eventually mean something different (e.g., spawn when resources are available), the fallback logic needs updating. Consider making this explicit: policy === 'spawn' is the only check that matters.
| /** Find active wait_for_signal tasks matching a gate type. */ | ||
| findActiveSignalTasks(gateType: string): Task[] { | ||
| const rows = this.getDb() | ||
| .prepare( |
There was a problem hiding this comment.
🔵 style: findActiveSignalTasks fetches all active signal tasks from SQLite, then filters by gate type in JavaScript. The gate type could be pushed into the SQL query using json_extract(gate_config, '$.type') = ? to reduce data transfer and deserialization overhead, especially if the number of active signal tasks grows. [fixable]
…duplicate worktrees Fixes three critical bugs in spawnSession (Centaur review on #377): 1. Remove duplicate worktree creation — startChat handles worktrees internally via createSessionWorktrees. The pre-creation in spawnSession leaked orphaned worktree directories. 2. Remove duplicate EventStore entry — startChat persists session metadata via onSessionResolved callback. The pre-creation stored an entry keyed by a different wtId than the real session. 3. Don't await startChat — it blocks until runQueryLoop completes (full session lifetime). Fire-and-forget with .catch() for errors. Also: push gate_config type filtering into SQLite via json_extract, and document human_approval broad-match as intentional MVP design. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Centaur flagged that taskPr from JSON gate_config could be string or number while the Zod-validated request pr is always number. Using === would silently fail to match. Coerce both sides via Number() for safe comparison. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Centaur ReviewFound 7 issue(s) (1 critical) (3 warning).
|
Centaur ReviewFound 6 issue(s) (1 critical) (2 warning).
|
Summary
Enable the TaskOrchestrator to spawn dedicated headless sessions for individual tasks, unlocking multi-agent goal coordination. This is Phase 1 of 4 in the multi-agent orchestration initiative.
What changed
task-orchestrator.ts: Implement
session_policy: 'spawn'path intick(). When a task hassessionPolicy: 'spawn', the orchestrator creates a dedicated headless session instead of routing to the pinned session. Spawned tasks don't setactiveTaskId, enabling parallel execution. Falls back to pinned session on failure.index.ts: Wire
spawnSessiondependency — creates worktrees, registers in EventStore, starts headless session withNullTransport.app.ts: Add
POST /api/signals/resolveendpoint. External agents (e.g. Centaur) can resolvewait_for_signaltasks by gate metadata (type+repo+PR) without knowing task IDs. Authenticated via internal token.api-schemas.ts: Add
SignalResolveBodyzod schema for the new endpoint.task-store.ts: Add
findActiveSignalTasks(gateType)query method for gate-metadata-based task lookup.Context: Multi-Agent Orchestration Roadmap
The
session_policycolumn andwait_for_signalstage type already existed in the schema but were unused. This PR activates them.Phase 1 (this PR): Multi-session
tick()+ signal resolve endpointPhase 2: Centaur signal bridge — Centaur POSTs
ReviewCompletedto/api/signals/resolvePhase 3: PR Shepherd creates Task Board goals instead of ad-hoc sessions
Phase 4: Scheduled PR discovery trigger
A PR lifecycle would look like:
Key files
server/task-orchestrator.ts— spawn path in tick()server/index.ts— spawnSession wiringserver/app.ts— POST /api/signals/resolveserver/task-store.ts— findActiveSignalTasks()Test plan
🤖 Generated with Claude Code