From 6dddc3f7960d894242724f1231366ad9ff3a4414 Mon Sep 17 00:00:00 2001 From: Waleed Date: Wed, 15 Apr 2026 16:20:39 -0700 Subject: [PATCH 1/2] fix(brightdata): fix async Discover API, echo-back fields, and registry ordering (#4188) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(brightdata): use params for echo-back fields in transformResponse transformResponse receives params as its second argument. Use it to return the original url, query, snapshotId, and searchEngine values instead of hardcoding null or extracting from response data that may not contain them. Co-Authored-By: Claude Opus 4.6 * fix(brightdata): handle async Discover API with polling The Bright Data Discover API is asynchronous — POST /discover returns a task_id, and results must be polled via GET /discover?task_id=... The previous implementation incorrectly treated it as synchronous, always returning empty results. Uses postProcess (matching Firecrawl crawl pattern) to poll every 3s with a 120s timeout until status is "done". Co-Authored-By: Claude Opus 4.6 * fix(brightdata): alphabetize block registry entry Move box before brandfetch/brightdata to maintain alphabetical ordering. Co-Authored-By: Claude Opus 4.6 * lint * fix(brightdata): return error objects instead of throwing in postProcess The executor wraps postProcess in try-catch and falls back to the intermediate transformResponse result on error, which has success: true with empty results. Throwing errors would silently return empty results. Match Firecrawl's pattern: return { ...result, success: false, error } instead of throwing. Also add taskId to BrightDataDiscoverResponse type to eliminate unsafe casts. Co-Authored-By: Claude Opus 4.6 * fix(brightdata): use platform execution timeout for Discover polling Replace hardcoded 120s timeout with DEFAULT_EXECUTION_TIMEOUT_MS to match Firecrawl and other async polling tools. Respects platform- configured limits (300s free, 3000s paid). Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: Claude Opus 4.6 --- apps/docs/content/docs/en/tools/agiloft.mdx | 2 +- apps/sim/blocks/registry.ts | 2 +- apps/sim/tools/brightdata/cancel_snapshot.ts | 6 +- apps/sim/tools/brightdata/discover.ts | 132 ++++++++++++++---- .../sim/tools/brightdata/download_snapshot.ts | 4 +- apps/sim/tools/brightdata/scrape_url.ts | 4 +- apps/sim/tools/brightdata/serp_search.ts | 11 +- apps/sim/tools/brightdata/types.ts | 1 + bun.lock | 1 + 9 files changed, 126 insertions(+), 37 deletions(-) diff --git a/apps/docs/content/docs/en/tools/agiloft.mdx b/apps/docs/content/docs/en/tools/agiloft.mdx index b5579d98ff..235300ea25 100644 --- a/apps/docs/content/docs/en/tools/agiloft.mdx +++ b/apps/docs/content/docs/en/tools/agiloft.mdx @@ -7,7 +7,7 @@ import { BlockInfoCard } from "@/components/ui/block-info-card" {/* MANUAL-CONTENT-START:intro */} diff --git a/apps/sim/blocks/registry.ts b/apps/sim/blocks/registry.ts index 270bea945c..2b2541a4d3 100644 --- a/apps/sim/blocks/registry.ts +++ b/apps/sim/blocks/registry.ts @@ -245,9 +245,9 @@ export const registry: Record = { ashby: AshbyBlock, athena: AthenaBlock, attio: AttioBlock, + box: BoxBlock, brandfetch: BrandfetchBlock, brightdata: BrightDataBlock, - box: BoxBlock, browser_use: BrowserUseBlock, calcom: CalComBlock, calendly: CalendlyBlock, diff --git a/apps/sim/tools/brightdata/cancel_snapshot.ts b/apps/sim/tools/brightdata/cancel_snapshot.ts index a5a2328979..8f4823ccc3 100644 --- a/apps/sim/tools/brightdata/cancel_snapshot.ts +++ b/apps/sim/tools/brightdata/cancel_snapshot.ts @@ -38,17 +38,17 @@ export const brightDataCancelSnapshotTool: ToolConfig< }), }, - transformResponse: async (response: Response) => { + transformResponse: async (response: Response, params) => { if (!response.ok) { const errorText = await response.text() throw new Error(errorText || `Cancel snapshot failed with status ${response.status}`) } - const data = (await response.json().catch(() => null)) as Record | null + await response.json().catch(() => null) return { success: true, output: { - snapshotId: (data?.snapshot_id as string) ?? null, + snapshotId: params?.snapshotId ?? null, cancelled: true, }, } diff --git a/apps/sim/tools/brightdata/discover.ts b/apps/sim/tools/brightdata/discover.ts index 8f0f8ced6b..153bf465c6 100644 --- a/apps/sim/tools/brightdata/discover.ts +++ b/apps/sim/tools/brightdata/discover.ts @@ -1,6 +1,13 @@ +import { createLogger } from '@sim/logger' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import type { BrightDataDiscoverParams, BrightDataDiscoverResponse } from '@/tools/brightdata/types' import type { ToolConfig } from '@/tools/types' +const logger = createLogger('tools:brightdata:discover') + +const POLL_INTERVAL_MS = 3000 +const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS + export const brightDataDiscoverTool: ToolConfig< BrightDataDiscoverParams, BrightDataDiscoverResponse @@ -84,7 +91,7 @@ export const brightDataDiscoverTool: ToolConfig< }, }, - transformResponse: async (response: Response) => { + transformResponse: async (response: Response, params) => { if (!response.ok) { const errorText = await response.text() throw new Error(errorText || `Discover request failed with status ${response.status}`) @@ -92,37 +99,112 @@ export const brightDataDiscoverTool: ToolConfig< const data = await response.json() - let results: Array<{ - url: string | null - title: string | null - description: string | null - relevanceScore: number | null - content: string | null - }> = [] - - const items = Array.isArray(data) ? data : (data?.results ?? data?.data ?? []) - - if (Array.isArray(items)) { - results = items.map((item: Record) => ({ - url: (item.link as string) ?? (item.url as string) ?? null, - title: (item.title as string) ?? null, - description: (item.description as string) ?? (item.snippet as string) ?? null, - relevanceScore: (item.relevance_score as number) ?? null, - content: - (item.content as string) ?? (item.text as string) ?? (item.markdown as string) ?? null, - })) - } - return { success: true, output: { - results, - query: null, - totalResults: results.length, + results: [], + query: params?.query ?? null, + totalResults: 0, + taskId: data.task_id ?? null, }, } }, + postProcess: async (result, params) => { + if (!result.success) return result + + const taskId = result.output.taskId + if (!taskId) { + return { + ...result, + success: false, + error: 'Discover API did not return a task_id. Cannot poll for results.', + } + } + + logger.info(`Bright Data Discover task ${taskId} created, polling for results...`) + + let elapsedTime = 0 + + while (elapsedTime < MAX_POLL_TIME_MS) { + try { + const pollResponse = await fetch( + `https://api.brightdata.com/discover?task_id=${encodeURIComponent(taskId)}`, + { + method: 'GET', + headers: { + Authorization: `Bearer ${params.apiKey}`, + }, + } + ) + + if (!pollResponse.ok) { + return { + ...result, + success: false, + error: `Failed to poll discover results: ${pollResponse.statusText}`, + } + } + + const data = await pollResponse.json() + logger.info(`Bright Data Discover task ${taskId} status: ${data.status}`) + + if (data.status === 'done') { + const items = Array.isArray(data.results) ? data.results : [] + + const results = items.map((item: Record) => ({ + url: (item.link as string) ?? (item.url as string) ?? null, + title: (item.title as string) ?? null, + description: (item.description as string) ?? (item.snippet as string) ?? null, + relevanceScore: (item.relevance_score as number) ?? null, + content: (item.content as string) ?? null, + })) + + return { + success: true, + output: { + results, + query: params.query ?? null, + totalResults: results.length, + }, + } + } + + if (data.status === 'failed' || data.status === 'error') { + return { + ...result, + success: false, + error: `Discover task failed: ${data.error ?? 'Unknown error'}`, + } + } + + await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)) + elapsedTime += POLL_INTERVAL_MS + } catch (error) { + logger.error('Error polling for discover task:', { + message: error instanceof Error ? error.message : String(error), + taskId, + }) + + return { + ...result, + success: false, + error: `Error polling for discover task: ${error instanceof Error ? error.message : String(error)}`, + } + } + } + + logger.warn( + `Discover task ${taskId} did not complete within the maximum polling time (${MAX_POLL_TIME_MS / 1000}s)` + ) + + return { + ...result, + success: false, + error: `Discover task ${taskId} timed out after ${MAX_POLL_TIME_MS / 1000}s. Check status manually.`, + } + }, + outputs: { results: { type: 'array', diff --git a/apps/sim/tools/brightdata/download_snapshot.ts b/apps/sim/tools/brightdata/download_snapshot.ts index c62cfc4c68..555be41aab 100644 --- a/apps/sim/tools/brightdata/download_snapshot.ts +++ b/apps/sim/tools/brightdata/download_snapshot.ts @@ -56,7 +56,7 @@ export const brightDataDownloadSnapshotTool: ToolConfig< }), }, - transformResponse: async (response: Response) => { + transformResponse: async (response: Response, params) => { if (response.status === 409) { throw new Error( 'Snapshot is not ready for download. Check the snapshot status first and wait until it is "ready".' @@ -89,7 +89,7 @@ export const brightDataDownloadSnapshotTool: ToolConfig< output: { data, format: contentType, - snapshotId: (data[0]?.snapshot_id as string) ?? null, + snapshotId: params?.snapshotId ?? null, }, } }, diff --git a/apps/sim/tools/brightdata/scrape_url.ts b/apps/sim/tools/brightdata/scrape_url.ts index 1fe284cd31..1d62d4c6df 100644 --- a/apps/sim/tools/brightdata/scrape_url.ts +++ b/apps/sim/tools/brightdata/scrape_url.ts @@ -66,7 +66,7 @@ export const brightDataScrapeUrlTool: ToolConfig< }, }, - transformResponse: async (response: Response) => { + transformResponse: async (response: Response, params) => { const contentType = response.headers.get('content-type') || '' if (!response.ok) { @@ -86,7 +86,7 @@ export const brightDataScrapeUrlTool: ToolConfig< success: true, output: { content, - url: null, + url: params?.url ?? null, statusCode: response.status, }, } diff --git a/apps/sim/tools/brightdata/serp_search.ts b/apps/sim/tools/brightdata/serp_search.ts index e9ed8ef1de..3acf3b2f07 100644 --- a/apps/sim/tools/brightdata/serp_search.ts +++ b/apps/sim/tools/brightdata/serp_search.ts @@ -129,7 +129,7 @@ export const brightDataSerpSearchTool: ToolConfig< }, }, - transformResponse: async (response: Response) => { + transformResponse: async (response: Response, params) => { if (!response.ok) { const errorText = await response.text() throw new Error(errorText || `SERP request failed with status ${response.status}`) @@ -178,9 +178,14 @@ export const brightDataSerpSearchTool: ToolConfig< success: true, output: { results, - query: ((data?.general as Record | undefined)?.query as string) ?? null, + query: + ((data?.general as Record | undefined)?.query as string) ?? + params?.query ?? + null, searchEngine: - ((data?.general as Record | undefined)?.search_engine as string) ?? null, + ((data?.general as Record | undefined)?.search_engine as string) ?? + params?.searchEngine ?? + null, }, } }, diff --git a/apps/sim/tools/brightdata/types.ts b/apps/sim/tools/brightdata/types.ts index 3197826996..d0a3e8aa39 100644 --- a/apps/sim/tools/brightdata/types.ts +++ b/apps/sim/tools/brightdata/types.ts @@ -131,6 +131,7 @@ export interface BrightDataDiscoverResponse extends ToolResponse { }> query: string | null totalResults: number + taskId?: string | null } } diff --git a/bun.lock b/bun.lock index aaa61ed6da..654302c866 100644 --- a/bun.lock +++ b/bun.lock @@ -1,5 +1,6 @@ { "lockfileVersion": 1, + "configVersion": 0, "workspaces": { "": { "name": "simstudio", From 377712c9f320e9d0317c716c41dbd1c95bdc7f36 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 15 Apr 2026 16:43:22 -0700 Subject: [PATCH 2/2] fix(mothership): chat stream structuring + logs resource post fix (#4189) * fix(mothership): chat streaming structure * fix logs resource thinking bug" * address comments * address comments --- apps/sim/app/api/copilot/chat/queries.ts | 14 + .../app/api/copilot/chat/stop/route.test.ts | 160 +++++ apps/sim/app/api/copilot/chat/stop/route.ts | 14 +- .../api/mothership/chats/[chatId]/route.ts | 15 +- .../[workspaceId]/home/hooks/use-chat.ts | 644 ++++++++++++++---- apps/sim/hooks/use-task-events.test.ts | 19 +- apps/sim/hooks/use-task-events.ts | 13 +- .../copilot/chat/effective-transcript.test.ts | 263 +++++++ .../lib/copilot/chat/effective-transcript.ts | 412 +++++++++++ apps/sim/lib/copilot/chat/post.ts | 5 +- .../lib/copilot/chat/stream-tool-outcome.ts | 46 ++ 11 files changed, 1446 insertions(+), 159 deletions(-) create mode 100644 apps/sim/app/api/copilot/chat/stop/route.test.ts create mode 100644 apps/sim/lib/copilot/chat/effective-transcript.test.ts create mode 100644 apps/sim/lib/copilot/chat/effective-transcript.ts create mode 100644 apps/sim/lib/copilot/chat/stream-tool-outcome.ts diff --git a/apps/sim/app/api/copilot/chat/queries.ts b/apps/sim/app/api/copilot/chat/queries.ts index 4828a15aa7..9977f5419b 100644 --- a/apps/sim/app/api/copilot/chat/queries.ts +++ b/apps/sim/app/api/copilot/chat/queries.ts @@ -4,7 +4,9 @@ import { createLogger } from '@sim/logger' import { and, desc, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository' +import { buildEffectiveChatTranscript } from '@/lib/copilot/chat/effective-transcript' import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle' +import { normalizeMessage } from '@/lib/copilot/chat/persisted-message' import { authenticateCopilotRequestSessionOnly, createBadRequestResponse, @@ -113,11 +115,23 @@ export async function GET(req: NextRequest) { } } + const normalizedMessages = Array.isArray(chat.messages) + ? chat.messages + .filter((message): message is Record => Boolean(message)) + .map(normalizeMessage) + : [] + const effectiveMessages = buildEffectiveChatTranscript({ + messages: normalizedMessages, + activeStreamId: chat.conversationId || null, + ...(streamSnapshot ? { streamSnapshot } : {}), + }) + logger.info(`Retrieved chat ${chatId}`) return NextResponse.json({ success: true, chat: { ...transformChat(chat), + messages: effectiveMessages, ...(streamSnapshot ? { streamSnapshot } : {}), }, }) diff --git a/apps/sim/app/api/copilot/chat/stop/route.test.ts b/apps/sim/app/api/copilot/chat/stop/route.test.ts new file mode 100644 index 0000000000..21c32e38a7 --- /dev/null +++ b/apps/sim/app/api/copilot/chat/stop/route.test.ts @@ -0,0 +1,160 @@ +/** + * @vitest-environment node + */ +import { NextRequest } from 'next/server' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { + mockGetSession, + mockSelect, + mockFrom, + mockWhereSelect, + mockLimit, + mockUpdate, + mockSet, + mockWhereUpdate, + mockReturning, + mockPublishStatusChanged, + mockSql, +} = vi.hoisted(() => ({ + mockGetSession: vi.fn(), + mockSelect: vi.fn(), + mockFrom: vi.fn(), + mockWhereSelect: vi.fn(), + mockLimit: vi.fn(), + mockUpdate: vi.fn(), + mockSet: vi.fn(), + mockWhereUpdate: vi.fn(), + mockReturning: vi.fn(), + mockPublishStatusChanged: vi.fn(), + mockSql: vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ strings, values })), +})) + +vi.mock('@/lib/auth', () => ({ + getSession: mockGetSession, +})) + +vi.mock('@sim/db', () => ({ + db: { + select: mockSelect, + update: mockUpdate, + }, +})) + +vi.mock('@sim/db/schema', () => ({ + copilotChats: { + id: 'id', + userId: 'userId', + workspaceId: 'workspaceId', + messages: 'messages', + conversationId: 'conversationId', + }, +})) + +vi.mock('drizzle-orm', () => ({ + and: vi.fn((...conditions: unknown[]) => ({ conditions, type: 'and' })), + eq: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'eq' })), + sql: mockSql, +})) + +vi.mock('@/lib/copilot/tasks', () => ({ + taskPubSub: { + publishStatusChanged: mockPublishStatusChanged, + }, +})) + +import { POST } from '@/app/api/copilot/chat/stop/route' + +function createRequest(body: Record) { + return new NextRequest('http://localhost:3000/api/copilot/chat/stop', { + method: 'POST', + body: JSON.stringify(body), + headers: { 'Content-Type': 'application/json' }, + }) +} + +describe('copilot chat stop route', () => { + beforeEach(() => { + vi.clearAllMocks() + + mockGetSession.mockResolvedValue({ user: { id: 'user-1' } }) + + mockLimit.mockResolvedValue([ + { + workspaceId: 'ws-1', + messages: [{ id: 'stream-1', role: 'user', content: 'hello' }], + }, + ]) + mockWhereSelect.mockReturnValue({ limit: mockLimit }) + mockFrom.mockReturnValue({ where: mockWhereSelect }) + mockSelect.mockReturnValue({ from: mockFrom }) + + mockReturning.mockResolvedValue([{ workspaceId: 'ws-1' }]) + mockWhereUpdate.mockReturnValue({ returning: mockReturning }) + mockSet.mockReturnValue({ where: mockWhereUpdate }) + mockUpdate.mockReturnValue({ set: mockSet }) + }) + + it('returns 401 when unauthenticated', async () => { + mockGetSession.mockResolvedValueOnce(null) + + const response = await POST( + createRequest({ + chatId: 'chat-1', + streamId: 'stream-1', + content: '', + }) + ) + + expect(response.status).toBe(401) + expect(await response.json()).toEqual({ error: 'Unauthorized' }) + }) + + it('is a no-op when the chat is missing', async () => { + mockLimit.mockResolvedValueOnce([]) + + const response = await POST( + createRequest({ + chatId: 'missing-chat', + streamId: 'stream-1', + content: '', + }) + ) + + expect(response.status).toBe(200) + expect(await response.json()).toEqual({ success: true }) + expect(mockUpdate).not.toHaveBeenCalled() + }) + + it('appends a stopped assistant message even with no content', async () => { + const response = await POST( + createRequest({ + chatId: 'chat-1', + streamId: 'stream-1', + content: '', + }) + ) + + expect(response.status).toBe(200) + expect(await response.json()).toEqual({ success: true }) + + const setArg = mockSet.mock.calls[0]?.[0] + expect(setArg).toBeTruthy() + expect(setArg.conversationId).toBeNull() + expect(setArg.messages).toBeTruthy() + + const appendedPayload = JSON.parse(setArg.messages.values[1] as string) + expect(appendedPayload).toHaveLength(1) + expect(appendedPayload[0]).toMatchObject({ + role: 'assistant', + content: '', + contentBlocks: [{ type: 'complete', status: 'cancelled' }], + }) + + expect(mockPublishStatusChanged).toHaveBeenCalledWith({ + workspaceId: 'ws-1', + chatId: 'chat-1', + type: 'completed', + }) + }) +}) diff --git a/apps/sim/app/api/copilot/chat/stop/route.ts b/apps/sim/app/api/copilot/chat/stop/route.ts index 8a742d7080..05e5935aa4 100644 --- a/apps/sim/app/api/copilot/chat/stop/route.ts +++ b/apps/sim/app/api/copilot/chat/stop/route.ts @@ -7,6 +7,7 @@ import { z } from 'zod' import { getSession } from '@/lib/auth' import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message' import { taskPubSub } from '@/lib/copilot/tasks' +import { generateId } from '@/lib/core/utils/uuid' const logger = createLogger('CopilotChatStopAPI') @@ -70,7 +71,6 @@ export async function POST(req: NextRequest) { } const { chatId, streamId, content, contentBlocks } = StopSchema.parse(await req.json()) - const [row] = await db .select({ workspaceId: copilotChats.workspaceId, @@ -106,14 +106,18 @@ export async function POST(req: NextRequest) { const hasContent = content.trim().length > 0 const hasBlocks = Array.isArray(contentBlocks) && contentBlocks.length > 0 - - if ((hasContent || hasBlocks) && canAppendAssistant) { + const synthesizedStoppedBlocks = hasBlocks + ? contentBlocks + : hasContent + ? [{ type: 'text', channel: 'assistant', content }, { type: 'stopped' }] + : [{ type: 'stopped' }] + if (canAppendAssistant) { const normalized = normalizeMessage({ - id: crypto.randomUUID(), + id: generateId(), role: 'assistant', content, timestamp: new Date().toISOString(), - ...(hasBlocks ? { contentBlocks } : {}), + contentBlocks: synthesizedStoppedBlocks, }) const assistantMessage: PersistedMessage = normalized setClause.messages = sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb` diff --git a/apps/sim/app/api/mothership/chats/[chatId]/route.ts b/apps/sim/app/api/mothership/chats/[chatId]/route.ts index e5fc73f301..cf94fdda83 100644 --- a/apps/sim/app/api/mothership/chats/[chatId]/route.ts +++ b/apps/sim/app/api/mothership/chats/[chatId]/route.ts @@ -5,7 +5,9 @@ import { and, eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository' +import { buildEffectiveChatTranscript } from '@/lib/copilot/chat/effective-transcript' import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle' +import { normalizeMessage } from '@/lib/copilot/chat/persisted-message' import { authenticateCopilotRequestSessionOnly, createBadRequestResponse, @@ -93,12 +95,23 @@ export async function GET( } } + const normalizedMessages = Array.isArray(chat.messages) + ? chat.messages + .filter((message): message is Record => Boolean(message)) + .map(normalizeMessage) + : [] + const effectiveMessages = buildEffectiveChatTranscript({ + messages: normalizedMessages, + activeStreamId: chat.conversationId || null, + ...(streamSnapshot ? { streamSnapshot } : {}), + }) + return NextResponse.json({ success: true, chat: { id: chat.id, title: chat.title, - messages: Array.isArray(chat.messages) ? chat.messages : [], + messages: effectiveMessages, conversationId: chat.conversationId || null, resources: Array.isArray(chat.resources) ? chat.resources : [], createdAt: chat.createdAt, diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 4b7b9ff5d0..8575e9a1b4 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -3,16 +3,20 @@ import { createLogger } from '@sim/logger' import { useQueryClient } from '@tanstack/react-query' import { usePathname, useRouter } from 'next/navigation' import { toDisplayMessage } from '@/lib/copilot/chat/display-message' +import { getLiveAssistantMessageId } from '@/lib/copilot/chat/effective-transcript' import type { PersistedFileAttachment, PersistedMessage, } from '@/lib/copilot/chat/persisted-message' -import { MOTHERSHIP_CHAT_API_PATH } from '@/lib/copilot/constants' +import { normalizeMessage } from '@/lib/copilot/chat/persisted-message' +import { resolveStreamToolOutcome } from '@/lib/copilot/chat/stream-tool-outcome' +import { MOTHERSHIP_CHAT_API_PATH, STREAM_STORAGE_KEY } from '@/lib/copilot/constants' import type { MothershipStreamV1ErrorPayload, MothershipStreamV1ToolUI, } from '@/lib/copilot/generated/mothership-stream-v1' import { + MothershipStreamV1CompletionStatus, MothershipStreamV1EventType, MothershipStreamV1ResourceOp, MothershipStreamV1RunKind, @@ -172,6 +176,8 @@ const RECONNECT_TAIL_ERROR = const MAX_RECONNECT_ATTEMPTS = 10 const RECONNECT_BASE_DELAY_MS = 1000 const RECONNECT_MAX_DELAY_MS = 30_000 +const QUEUED_SEND_HANDOFF_STORAGE_KEY = `${STREAM_STORAGE_KEY}:queued-send-handoff` +const QUEUED_SEND_HANDOFF_CLAIM_STORAGE_KEY = `${STREAM_STORAGE_KEY}:queued-send-handoff-claim` const logger = createLogger('useChat') @@ -188,6 +194,100 @@ type ActiveTurn = { optimisticAssistantMessage: ChatMessage } +interface QueuedSendHandoffState { + id: string + chatId: string + workspaceId: string + supersededStreamId: string | null + userMessageId: string + message: string + fileAttachments?: FileAttachmentForApi[] + contexts?: ChatContext[] + requestedAt: number +} + +interface QueuedSendHandoffSeed { + id: string + chatId: string + supersededStreamId: string | null + userMessageId?: string +} + +function readQueuedSendHandoffState(): QueuedSendHandoffState | null { + if (typeof window === 'undefined') return null + + try { + const raw = window.sessionStorage.getItem(QUEUED_SEND_HANDOFF_STORAGE_KEY) + if (!raw) return null + + const parsed = JSON.parse(raw) as Partial + if ( + typeof parsed?.id !== 'string' || + typeof parsed.chatId !== 'string' || + typeof parsed.workspaceId !== 'string' || + typeof parsed.userMessageId !== 'string' || + typeof parsed.message !== 'string' || + typeof parsed.requestedAt !== 'number' + ) { + return null + } + + return { + id: parsed.id, + chatId: parsed.chatId, + workspaceId: parsed.workspaceId, + supersededStreamId: + typeof parsed.supersededStreamId === 'string' ? parsed.supersededStreamId : null, + userMessageId: parsed.userMessageId, + message: parsed.message, + ...(Array.isArray(parsed.fileAttachments) + ? { fileAttachments: parsed.fileAttachments as FileAttachmentForApi[] } + : {}), + ...(Array.isArray(parsed.contexts) ? { contexts: parsed.contexts as ChatContext[] } : {}), + requestedAt: parsed.requestedAt, + } + } catch { + return null + } +} + +function writeQueuedSendHandoffState(state: QueuedSendHandoffState) { + if (typeof window === 'undefined') return + window.sessionStorage.setItem(QUEUED_SEND_HANDOFF_STORAGE_KEY, JSON.stringify(state)) +} + +function clearQueuedSendHandoffState(expectedId?: string) { + if (typeof window === 'undefined') return + if (expectedId) { + const current = readQueuedSendHandoffState() + if (current && current.id !== expectedId) { + return + } + } + window.sessionStorage.removeItem(QUEUED_SEND_HANDOFF_STORAGE_KEY) +} + +function readQueuedSendHandoffClaim(): string | null { + if (typeof window === 'undefined') return null + return window.sessionStorage.getItem(QUEUED_SEND_HANDOFF_CLAIM_STORAGE_KEY) +} + +function writeQueuedSendHandoffClaim(id: string) { + if (typeof window === 'undefined') return + window.sessionStorage.setItem(QUEUED_SEND_HANDOFF_CLAIM_STORAGE_KEY, id) +} + +function clearQueuedSendHandoffClaim(expectedId?: string) { + if (typeof window === 'undefined') return + if (expectedId) { + const current = readQueuedSendHandoffClaim() + if (current && current !== expectedId) { + return + } + } + window.sessionStorage.removeItem(QUEUED_SEND_HANDOFF_CLAIM_STORAGE_KEY) +} + function stringParam(value: unknown): string | undefined { return typeof value === 'string' && value.trim() ? value.trim() : undefined } @@ -594,6 +694,122 @@ function parseStreamBatchResponse(value: unknown): StreamBatchResponse { } } +function toRawPersistedContentBlock(block: ContentBlock): Record | null { + switch (block.type) { + case 'text': + return { + type: MothershipStreamV1EventType.text, + ...(block.subagent ? { lane: 'subagent' } : {}), + content: block.content ?? '', + } + case 'tool_call': + if (!block.toolCall) { + return null + } + return { + type: MothershipStreamV1EventType.tool, + phase: MothershipStreamV1ToolPhase.call, + toolCall: { + id: block.toolCall.id, + name: block.toolCall.name, + state: block.toolCall.status, + ...(block.toolCall.params ? { params: block.toolCall.params } : {}), + ...(block.toolCall.result ? { result: block.toolCall.result } : {}), + ...(block.toolCall.calledBy ? { calledBy: block.toolCall.calledBy } : {}), + ...(block.toolCall.displayTitle + ? { + display: { + title: block.toolCall.displayTitle, + }, + } + : {}), + }, + } + case 'subagent': + return { + type: MothershipStreamV1EventType.span, + kind: MothershipStreamV1SpanPayloadKind.subagent, + lifecycle: MothershipStreamV1SpanLifecycleEvent.start, + content: block.content ?? '', + } + case 'subagent_end': + return { + type: MothershipStreamV1EventType.span, + kind: MothershipStreamV1SpanPayloadKind.subagent, + lifecycle: MothershipStreamV1SpanLifecycleEvent.end, + } + case 'stopped': + return { + type: MothershipStreamV1EventType.complete, + status: MothershipStreamV1CompletionStatus.cancelled, + } + default: + return null + } +} + +function buildAssistantSnapshotMessage(params: { + id: string + content: string + contentBlocks: ContentBlock[] + requestId?: string +}): PersistedMessage { + const rawContentBlocks = params.contentBlocks + .map(toRawPersistedContentBlock) + .filter((block): block is Record => block !== null) + + return normalizeMessage({ + id: params.id, + role: 'assistant', + content: params.content, + timestamp: new Date().toISOString(), + ...(params.requestId ? { requestId: params.requestId } : {}), + ...(rawContentBlocks.length > 0 ? { contentBlocks: rawContentBlocks } : {}), + }) +} + +function markMessageStopped(message: PersistedMessage): PersistedMessage { + if (!message.contentBlocks?.some((block) => block.toolCall?.state === 'executing')) { + return message + } + + const nextBlocks = message.contentBlocks.map((block) => { + if (block.toolCall?.state !== 'executing') { + return block + } + + return { + ...block, + toolCall: { + ...block.toolCall, + state: 'cancelled' as const, + display: { + ...(block.toolCall.display ?? {}), + title: 'Stopped by user', + }, + }, + } + }) + + if ( + !nextBlocks.some( + (block) => + block.type === MothershipStreamV1EventType.complete && + block.status === MothershipStreamV1CompletionStatus.cancelled + ) + ) { + nextBlocks.push({ + type: MothershipStreamV1EventType.complete, + status: MothershipStreamV1CompletionStatus.cancelled, + }) + } + + return normalizeMessage({ + ...message, + contentBlocks: nextBlocks, + }) +} + function buildChatHistoryHydrationKey(chatHistory: TaskChatHistory): string { const resourceKey = chatHistory.resources .map((resource) => `${resource.type}:${resource.id}:${resource.title}`) @@ -667,22 +883,10 @@ function resolveLiveToolStatus( payload: Partial<{ status: string success: boolean + output: unknown }> ): ToolCallStatus { - switch (payload.status) { - case MothershipStreamV1ToolOutcome.success: - return ToolCallStatus.success - case MothershipStreamV1ToolOutcome.error: - return ToolCallStatus.error - case MothershipStreamV1ToolOutcome.cancelled: - return ToolCallStatus.cancelled - case MothershipStreamV1ToolOutcome.skipped: - return ToolCallStatus.skipped - case MothershipStreamV1ToolOutcome.rejected: - return ToolCallStatus.rejected - default: - return payload.success === true ? ToolCallStatus.success : ToolCallStatus.error - } + return resolveStreamToolOutcome(payload) as ToolCallStatus } /** Adds a workflow to the React Query cache with a top-insertion sort order if it doesn't already exist. */ @@ -808,7 +1012,7 @@ export function useChat( const pathname = usePathname() const router = useRouter() const queryClient = useQueryClient() - const [messages, setMessages] = useState([]) + const [pendingMessages, setPendingMessages] = useState([]) const [isSending, setIsSending] = useState(false) const [isReconnecting, setIsReconnecting] = useState(false) const [error, setError] = useState(null) @@ -855,6 +1059,22 @@ export function useChat( const activeResourceIdRef = useRef(effectiveActiveResourceId) activeResourceIdRef.current = effectiveActiveResourceId + const upsertTaskChatHistory = useCallback( + (chatId: string, updater: (current: TaskChatHistory) => TaskChatHistory) => { + queryClient.setQueryData(taskKeys.detail(chatId), (current) => { + const base: TaskChatHistory = current ?? { + id: chatId, + title: null, + messages: [], + activeStreamId: null, + resources: resourcesRef.current, + } + return updater(base) + }) + }, + [queryClient] + ) + const { previewSession, previewSessionsById, @@ -975,6 +1195,7 @@ export function useChat( (opts: { streamId: string; assistantId: string; gen: number }) => Promise >(async () => false) const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {}) + const recoveringQueuedSendHandoffIdRef = useRef(null) const resetEphemeralPreviewState = useCallback( (options?: { removeStreamingResource?: boolean }) => { @@ -1101,7 +1322,7 @@ export function useChat( setResolvedChatId(undefined) appliedChatHistoryKeyRef.current = undefined abortControllerRef.current = null - setMessages([]) + setPendingMessages([]) setError(null) setTransportIdle() setResources([]) @@ -1111,38 +1332,11 @@ export function useChat( clearQueueDispatchState() }, [clearActiveTurn, clearQueueDispatchState, resetEphemeralPreviewState, setTransportIdle]) - const mergeServerMessagesWithActiveTurn = useCallback( - (serverMessages: ChatMessage[], previousMessages: ChatMessage[]) => { - const activeTurn = activeTurnRef.current - if (!activeTurn || !sendingRef.current) { - return serverMessages - } - - const nextMessages = [...serverMessages] - const localStreamingUser = - previousMessages.find( - (message) => message.id === activeTurn.userMessageId && message.role === 'user' - ) ?? activeTurn.optimisticUserMessage - const localStreamingAssistant = - previousMessages.find( - (message) => message.id === activeTurn.assistantMessageId && message.role === 'assistant' - ) ?? activeTurn.optimisticAssistantMessage - - if (!nextMessages.some((message) => message.id === localStreamingUser.id)) { - nextMessages.push(localStreamingUser) - } - - if (!nextMessages.some((message) => message.id === localStreamingAssistant.id)) { - nextMessages.push(localStreamingAssistant) - } - - return nextMessages - }, - [] + const { data: chatHistory } = useChatHistory(resolvedChatId) + const messages = useMemo( + () => chatHistory?.messages.map(toDisplayMessage) ?? pendingMessages, + [chatHistory, pendingMessages] ) - - const { data: chatHistory } = useChatHistory(initialChatId) - const addResource = useCallback((resource: MothershipResource): boolean => { if (resourcesRef.current.some((r) => r.type === resource.type && r.id === resource.id)) { return false @@ -1268,12 +1462,12 @@ export function useChat( ) useEffect(() => { + const streamOwnerId = chatIdRef.current + const navigatedToDifferentChat = + sendingRef.current && + initialChatId !== streamOwnerId && + (initialChatId !== undefined || streamOwnerId !== undefined) if (sendingRef.current) { - const streamOwnerId = chatIdRef.current - const navigatedToDifferentChat = - initialChatId !== streamOwnerId && - (initialChatId !== undefined || streamOwnerId !== undefined) - if (navigatedToDifferentChat) { const abandonedChatId = streamOwnerId // Detach the current UI from the old stream without cancelling it on the server. @@ -1296,7 +1490,7 @@ export function useChat( clearActiveTurn() setResolvedChatId(initialChatId) appliedChatHistoryKeyRef.current = undefined - setMessages([]) + setPendingMessages([]) setError(null) setTransportIdle() setResources([]) @@ -1344,13 +1538,6 @@ export function useChat( if (!activeStreamId && locallyTerminalStreamIdRef.current) { locallyTerminalStreamIdRef.current = undefined } - const shouldPreserveLocalActiveTurn = sendingRef.current && activeTurnRef.current !== null - - if (shouldPreserveLocalActiveTurn) { - setMessages((prev) => mergeServerMessagesWithActiveTurn(mappedMessages, prev)) - } else { - setMessages(mappedMessages) - } void recoverPendingClientWorkflowTools(mappedMessages) @@ -1399,7 +1586,7 @@ export function useChat( lastCursorRef.current = '0' setTransportReconnecting() - const assistantId = generateId() + const assistantId = getLiveAssistantMessageId(activeStreamId) const reconnect = async () => { const initialSnapshot = chatHistory.streamSnapshot @@ -1458,7 +1645,6 @@ export function useChat( queryClient, recoverPendingClientWorkflowTools, seedPreviewSessions, - mergeServerMessagesWithActiveTurn, setTransportIdle, setTransportReconnecting, ]) @@ -1569,22 +1755,41 @@ export function useChat( const flush = () => { if (isStale()) return streamingBlocksRef.current = [...blocks] - const snapshot: Partial = { - content: runningText, - contentBlocks: [...blocks], - } - if (streamRequestId) snapshot.requestId = streamRequestId - setMessages((prev) => { - if (expectedGen !== undefined && streamGenRef.current !== expectedGen) return prev - const idx = prev.findIndex((m) => m.id === assistantId) - if (idx >= 0) { - return prev.map((m) => (m.id === assistantId ? { ...m, ...snapshot } : m)) + const activeChatId = chatIdRef.current + if (!activeChatId) { + const snapshot: Partial = { + content: runningText, + contentBlocks: [...blocks], } - return [ - ...prev, - { id: assistantId, role: 'assistant' as const, content: '', ...snapshot }, - ] + if (streamRequestId) snapshot.requestId = streamRequestId + setPendingMessages((prev) => { + if (expectedGen !== undefined && streamGenRef.current !== expectedGen) return prev + const idx = prev.findIndex((m) => m.id === assistantId) + if (idx >= 0) { + return prev.map((m) => (m.id === assistantId ? { ...m, ...snapshot } : m)) + } + return [ + ...prev, + { id: assistantId, role: 'assistant' as const, content: '', ...snapshot }, + ] + }) + return + } + + const assistantMessage = buildAssistantSnapshotMessage({ + id: assistantId, + content: runningText, + contentBlocks: blocks, + ...(streamRequestId ? { requestId: streamRequestId } : {}), }) + upsertTaskChatHistory(activeChatId, (current) => ({ + ...current, + messages: [ + ...current.messages.filter((message) => message.id !== assistantId), + assistantMessage, + ], + activeStreamId: streamIdRef.current ?? current.activeStreamId, + })) } const flushText = () => { @@ -1690,14 +1895,23 @@ export function useChat( const userMsg = pendingUserMsgRef.current const activeStreamId = streamIdRef.current if (userMsg && activeStreamId) { + const assistantMessage = buildAssistantSnapshotMessage({ + id: + activeTurnRef.current?.assistantMessageId ?? + getLiveAssistantMessageId(activeStreamId), + content: streamingContentRef.current, + contentBlocks: streamingBlocksRef.current, + }) + const seededMessages = [userMsg, assistantMessage] queryClient.setQueryData(taskKeys.detail(payloadChatId), { id: payloadChatId, title: null, - messages: [userMsg], + messages: seededMessages, activeStreamId, resources: resourcesRef.current, }) } + setPendingMessages([]) if (!workflowIdRef.current) { window.history.replaceState( null, @@ -2273,6 +2487,7 @@ export function useChat( workspaceId, router, queryClient, + upsertTaskChatHistory, addResource, removeResource, applyPreviewSessionUpdate, @@ -2691,28 +2906,32 @@ export function useChat( [] ) - const invalidateChatQueries = useCallback(() => { - const activeChatId = chatIdRef.current - if (activeChatId) { - queryClient.invalidateQueries({ - queryKey: taskKeys.detail(activeChatId), - }) - } - queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) }) - }, [workspaceId, queryClient]) + const invalidateChatQueries = useCallback( + (options?: { includeDetail?: boolean }) => { + const activeChatId = chatIdRef.current + if (options?.includeDetail !== false && activeChatId) { + queryClient.invalidateQueries({ + queryKey: taskKeys.detail(activeChatId), + }) + } + queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) }) + }, + [workspaceId, queryClient] + ) const messagesRef = useRef(messages) messagesRef.current = messages const finalize = useCallback( (options?: { error?: boolean }) => { + const hasQueuedFollowUp = !options?.error && messageQueueRef.current.length > 0 reconcileTerminalPreviewSessions() locallyTerminalStreamIdRef.current = streamIdRef.current ?? activeTurnRef.current?.userMessageId ?? undefined clearActiveTurn() setTransportIdle() abortControllerRef.current = null - invalidateChatQueries() + invalidateChatQueries({ includeDetail: !hasQueuedFollowUp }) if (!options?.error) { const cid = chatIdRef.current @@ -2725,7 +2944,7 @@ export function useChat( return } - if (messageQueueRef.current.length > 0) { + if (hasQueuedFollowUp) { void enqueueQueueDispatchRef.current({ type: 'send_head' }) } }, @@ -2738,7 +2957,9 @@ export function useChat( message: string, fileAttachments?: FileAttachmentForApi[], contexts?: ChatContext[], - pendingStopOverride?: Promise | null + pendingStopOverride?: Promise | null, + onOptimisticSendApplied?: () => void, + queuedSendHandoff?: QueuedSendHandoffSeed ) => { if (!message.trim() || !workspaceId) return false const pendingStop = pendingStopOverride ?? pendingStopPromiseRef.current @@ -2750,8 +2971,8 @@ export function useChat( setTransportStreaming() locallyTerminalStreamIdRef.current = undefined - const userMessageId = generateId() - const assistantId = generateId() + const userMessageId = queuedSendHandoff?.userMessageId ?? generateId() + const assistantId = getLiveAssistantMessageId(userMessageId) streamIdRef.current = userMessageId lastCursorRef.current = '0' @@ -2769,6 +2990,19 @@ export function useChat( : undefined const requestChatId = selectedChatIdRef.current ?? chatIdRef.current + if (queuedSendHandoff) { + writeQueuedSendHandoffState({ + id: queuedSendHandoff.id, + chatId: queuedSendHandoff.chatId, + workspaceId, + supersededStreamId: queuedSendHandoff.supersededStreamId, + userMessageId, + message, + ...(fileAttachments ? { fileAttachments } : {}), + ...(contexts ? { contexts } : {}), + requestedAt: Date.now(), + }) + } const messageContexts = contexts?.map((c) => ({ kind: c.kind, label: c.label, @@ -2818,21 +3052,33 @@ export function useChat( optimisticAssistantMessage, } + if (requestChatId) { + await queryClient.cancelQueries({ queryKey: taskKeys.detail(requestChatId) }) + } + const applyOptimisticSend = () => { + const assistantSnapshot = buildAssistantSnapshotMessage({ + id: assistantId, + content: '', + contentBlocks: [], + }) if (requestChatId) { - queryClient.setQueryData(taskKeys.detail(requestChatId), (old) => { - if (!old) return undefined - const nextMessages = old.messages.filter((m) => m.id !== userMessageId) - return { - ...old, - resources: old.resources.filter((r) => r.id !== 'streaming-file'), - messages: [...nextMessages, cachedUserMsg], - activeStreamId: userMessageId, - } - }) + upsertTaskChatHistory(requestChatId, (current) => ({ + ...current, + resources: current.resources.filter((resource) => resource.id !== 'streaming-file'), + messages: [ + ...current.messages.filter( + (persistedMessage) => + persistedMessage.id !== userMessageId && persistedMessage.id !== assistantId + ), + cachedUserMsg, + assistantSnapshot, + ], + activeStreamId: userMessageId, + })) } - setMessages((prev) => { + setPendingMessages((prev) => { const nextMessages = prev.filter((m) => m.id !== userMessageId && m.id !== assistantId) return [...nextMessages, optimisticUserMessage, optimisticAssistantMessage] }) @@ -2840,20 +3086,27 @@ export function useChat( const rollbackOptimisticSend = () => { if (requestChatId) { - queryClient.setQueryData(taskKeys.detail(requestChatId), (old) => { - if (!old) return undefined - return { - ...old, - messages: old.messages.filter((m) => m.id !== userMessageId), - activeStreamId: old.activeStreamId === userMessageId ? null : old.activeStreamId, - } - }) + upsertTaskChatHistory(requestChatId, (current) => ({ + ...current, + messages: current.messages.filter( + (persistedMessage) => + persistedMessage.id !== userMessageId && persistedMessage.id !== assistantId + ), + activeStreamId: + current.activeStreamId === userMessageId ? null : current.activeStreamId, + })) } - setMessages((prev) => prev.filter((m) => m.id !== userMessageId && m.id !== assistantId)) + setPendingMessages((prev) => + prev.filter( + (pendingMessage) => + pendingMessage.id !== userMessageId && pendingMessage.id !== assistantId + ) + ) } applyOptimisticSend() + onOptimisticSendApplied?.() consumedByTranscript = true const abortController = new AbortController() @@ -2863,8 +3116,9 @@ export function useChat( if (pendingStop) { try { await pendingStop - // Query invalidation from the stop barrier can briefly stomp the optimistic tail. - // Re-apply it before the real POST so the mothership UI stays immediate. + if (requestChatId) { + await queryClient.cancelQueries({ queryKey: taskKeys.detail(requestChatId) }) + } applyOptimisticSend() } catch (err) { rollbackOptimisticSend() @@ -2928,6 +3182,10 @@ export function useChat( throw new Error(errorData.error || `Request failed: ${response.status}`) } + if (queuedSendHandoff) { + clearQueuedSendHandoffState(queuedSendHandoff.id) + } + if (!response.body) throw new Error('No response body') const streamResult = await processSSEStream(response.body.getReader(), assistantId, gen) @@ -2986,6 +3244,7 @@ export function useChat( [ workspaceId, queryClient, + upsertTaskChatHistory, processSSEStream, finalize, resumeOrFinalize, @@ -3015,6 +3274,69 @@ export function useChat( }, [workspaceId, startSendMessage] ) + useEffect(() => { + if (typeof window === 'undefined') return + + const clearClaim = () => { + clearQueuedSendHandoffClaim() + } + + window.addEventListener('pagehide', clearClaim) + window.addEventListener('beforeunload', clearClaim) + return () => { + window.removeEventListener('pagehide', clearClaim) + window.removeEventListener('beforeunload', clearClaim) + } + }, []) + useEffect(() => { + if (!workspaceId || !chatHistory || sendingRef.current || pendingStopPromiseRef.current) return + + const handoff = readQueuedSendHandoffState() + if (!handoff) return + if (handoff.workspaceId !== workspaceId || handoff.chatId !== chatHistory.id) return + if (recoveringQueuedSendHandoffIdRef.current === handoff.id) return + if (readQueuedSendHandoffClaim() === handoff.id) return + + if ( + chatHistory.activeStreamId === handoff.userMessageId || + chatHistory.messages.some((message) => message.id === handoff.userMessageId) + ) { + clearQueuedSendHandoffState(handoff.id) + clearQueuedSendHandoffClaim(handoff.id) + return + } + + if (chatHistory.activeStreamId === handoff.supersededStreamId) { + return + } + + if (chatHistory.activeStreamId && chatHistory.activeStreamId !== handoff.supersededStreamId) { + clearQueuedSendHandoffState(handoff.id) + clearQueuedSendHandoffClaim(handoff.id) + return + } + + recoveringQueuedSendHandoffIdRef.current = handoff.id + writeQueuedSendHandoffClaim(handoff.id) + void startSendMessage( + handoff.message, + handoff.fileAttachments, + handoff.contexts, + null, + undefined, + { + id: handoff.id, + chatId: handoff.chatId, + supersededStreamId: handoff.supersededStreamId, + userMessageId: handoff.userMessageId, + } + ).finally(() => { + if (recoveringQueuedSendHandoffIdRef.current === handoff.id) { + recoveringQueuedSendHandoffIdRef.current = null + } + clearQueuedSendHandoffClaim(handoff.id) + }) + }, [workspaceId, chatHistory, startSendMessage]) const cancelActiveWorkflowExecutions = useCallback(() => { const execState = useExecutionStore.getState() const consoleStore = useTerminalConsoleStore.getState() @@ -3066,6 +3388,7 @@ export function useChat( } const wasSending = sendingRef.current + const activeChatId = chatIdRef.current const sid = streamIdRef.current || activeTurnRef.current?.userMessageId || @@ -3088,24 +3411,36 @@ export function useChat( abortControllerRef.current = null setTransportIdle() - setMessages((prev) => - prev.map((msg) => { - if (!msg.contentBlocks?.some((b) => b.toolCall?.status === 'executing')) return msg - const updated = msg.contentBlocks!.map((block) => { - if (block.toolCall?.status !== 'executing') return block - return { - ...block, - toolCall: { - ...block.toolCall, - status: 'cancelled' as const, - displayTitle: 'Stopped by user', - }, + if (activeChatId) { + await queryClient.cancelQueries({ queryKey: taskKeys.detail(activeChatId) }) + upsertTaskChatHistory(activeChatId, (current) => ({ + ...current, + messages: current.messages.map(markMessageStopped), + })) + } else { + setPendingMessages((prev) => + prev.map((msg) => { + if (!msg.contentBlocks?.some((block) => block.toolCall?.status === 'executing')) { + return msg } + const updatedBlocks = msg.contentBlocks.map((block) => { + if (block.toolCall?.status !== 'executing') { + return block + } + return { + ...block, + toolCall: { + ...block.toolCall, + status: 'cancelled' as const, + displayTitle: 'Stopped by user', + }, + } + }) + updatedBlocks.push({ type: 'stopped' as const }) + return { ...msg, contentBlocks: updatedBlocks } }) - updated.push({ type: 'stopped' as const }) - return { ...msg, contentBlocks: updated } - }) - ) + ) + } // Cancel active run-tool executions before waiting for the server-side stream // shutdown barrier; otherwise the abort settle can sit behind tool execution teardown. @@ -3175,6 +3510,7 @@ export function useChat( persistPartialResponse, queryClient, resetEphemeralPreviewState, + upsertTaskChatHistory, clearActiveTurn, setTransportIdle, ]) @@ -3198,16 +3534,27 @@ export function useChat( let originalIndex = 0 let removedFromQueue = false + const removeQueuedMessage = () => { + if (removedFromQueue || action.epoch !== queueDispatchEpochRef.current) { + return + } + removedFromQueue = true + setMessageQueue((prev) => prev.filter((queued) => queued.id !== msg.id)) + } try { const currentIndex = messageQueueRef.current.findIndex((queued) => queued.id === msg.id) if (currentIndex !== -1) { originalIndex = currentIndex - removedFromQueue = true - setMessageQueue((prev) => prev.filter((queued) => queued.id !== msg.id)) } - const consumed = await startSendMessage(msg.content, msg.fileAttachments, msg.contexts) + const consumed = await startSendMessage( + msg.content, + msg.fileAttachments, + msg.contexts, + undefined, + removeQueuedMessage + ) if (!consumed && removedFromQueue && action.epoch === queueDispatchEpochRef.current) { setMessageQueue((prev) => { if (prev.some((queued) => queued.id === msg.id)) return prev @@ -3250,6 +3597,8 @@ export function useChat( enqueueQueueDispatchRef.current = enqueueQueueDispatch const removeFromQueue = useCallback((id: string) => { + clearQueuedSendHandoffState(id) + clearQueuedSendHandoffClaim(id) setMessageQueue((prev) => prev.filter((m) => m.id !== id)) }, []) @@ -3272,6 +3621,13 @@ export function useChat( let originalIndex = initialIndex let removedFromQueue = false + const removeQueuedMessage = () => { + if (removedFromQueue || epoch !== queueDispatchEpochRef.current) { + return + } + removedFromQueue = true + setMessageQueue((prev) => prev.filter((queued) => queued.id !== msg.id)) + } const restoreQueuedMessage = () => { if (!removedFromQueue || epoch !== queueDispatchEpochRef.current) { return @@ -3291,15 +3647,29 @@ export function useChat( } originalIndex = currentIndex - removedFromQueue = true - setMessageQueue((prev) => prev.filter((queued) => queued.id !== msg.id)) + const queuedSendHandoff = + sendingRef.current && workspaceId + ? { + id: msg.id, + chatId: selectedChatIdRef.current ?? chatIdRef.current ?? '', + supersededStreamId: + streamIdRef.current || + activeTurnRef.current?.userMessageId || + queryClient.getQueryData( + taskKeys.detail(selectedChatIdRef.current ?? chatIdRef.current) + )?.activeStreamId || + null, + } + : undefined const pendingStop = sendingRef.current ? stopGeneration() : pendingStopPromiseRef.current const consumed = await startSendMessage( msg.content, msg.fileAttachments, msg.contexts, - pendingStop + pendingStop, + removeQueuedMessage, + queuedSendHandoff?.chatId ? queuedSendHandoff : undefined ) if (!consumed) { @@ -3324,6 +3694,8 @@ export function useChat( const editQueuedMessage = useCallback((id: string): QueuedMessage | undefined => { const msg = messageQueueRef.current.find((m) => m.id === id) if (!msg) return undefined + clearQueuedSendHandoffState(id) + clearQueuedSendHandoffClaim(id) setMessageQueue((prev) => prev.filter((m) => m.id !== id)) return msg }, []) diff --git a/apps/sim/hooks/use-task-events.test.ts b/apps/sim/hooks/use-task-events.test.ts index ac58e6cf27..d62b32696a 100644 --- a/apps/sim/hooks/use-task-events.test.ts +++ b/apps/sim/hooks/use-task-events.test.ts @@ -16,9 +16,10 @@ describe('handleTaskStatusEvent', () => { vi.clearAllMocks() }) - it('invalidates the task list and completed chat detail', () => { + it('invalidates only the task list for completed task events', () => { handleTaskStatusEvent( queryClient, + 'ws-1', JSON.stringify({ chatId: 'chat-1', type: 'completed', @@ -26,18 +27,16 @@ describe('handleTaskStatusEvent', () => { }) ) - expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(2) - expect(queryClient.invalidateQueries).toHaveBeenNthCalledWith(1, { - queryKey: taskKeys.lists(), - }) - expect(queryClient.invalidateQueries).toHaveBeenNthCalledWith(2, { - queryKey: taskKeys.detail('chat-1'), + expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(1) + expect(queryClient.invalidateQueries).toHaveBeenCalledWith({ + queryKey: taskKeys.list('ws-1'), }) }) it('keeps list invalidation only for non-completed task events', () => { handleTaskStatusEvent( queryClient, + 'ws-1', JSON.stringify({ chatId: 'chat-1', type: 'started', @@ -47,16 +46,16 @@ describe('handleTaskStatusEvent', () => { expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(1) expect(queryClient.invalidateQueries).toHaveBeenCalledWith({ - queryKey: taskKeys.lists(), + queryKey: taskKeys.list('ws-1'), }) }) it('preserves list invalidation when task event payload is invalid', () => { - handleTaskStatusEvent(queryClient, '{') + handleTaskStatusEvent(queryClient, 'ws-1', '{') expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(1) expect(queryClient.invalidateQueries).toHaveBeenCalledWith({ - queryKey: taskKeys.lists(), + queryKey: taskKeys.list('ws-1'), }) }) }) diff --git a/apps/sim/hooks/use-task-events.ts b/apps/sim/hooks/use-task-events.ts index 04bff3df49..e10a7599b1 100644 --- a/apps/sim/hooks/use-task-events.ts +++ b/apps/sim/hooks/use-task-events.ts @@ -38,19 +38,16 @@ function parseTaskStatusEventPayload(data: unknown): TaskStatusEventPayload | nu export function handleTaskStatusEvent( queryClient: Pick, + workspaceId: string, data: unknown ): void { - queryClient.invalidateQueries({ queryKey: taskKeys.lists() }) + queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) }) const payload = parseTaskStatusEventPayload(data) if (!payload) { logger.warn('Received invalid task_status payload') return } - - if (payload.type === 'completed' && payload.chatId) { - queryClient.invalidateQueries({ queryKey: taskKeys.detail(payload.chatId) }) - } } /** @@ -67,7 +64,11 @@ export function useTaskEvents(workspaceId: string | undefined) { ) eventSource.addEventListener('task_status', (event) => { - handleTaskStatusEvent(queryClient, event instanceof MessageEvent ? event.data : undefined) + handleTaskStatusEvent( + queryClient, + workspaceId, + event instanceof MessageEvent ? event.data : undefined + ) }) eventSource.onerror = () => { diff --git a/apps/sim/lib/copilot/chat/effective-transcript.test.ts b/apps/sim/lib/copilot/chat/effective-transcript.test.ts new file mode 100644 index 0000000000..285743d37a --- /dev/null +++ b/apps/sim/lib/copilot/chat/effective-transcript.test.ts @@ -0,0 +1,263 @@ +/** + * @vitest-environment node + */ + +import { describe, expect, it } from 'vitest' +import { + buildEffectiveChatTranscript, + getLiveAssistantMessageId, +} from '@/lib/copilot/chat/effective-transcript' +import { normalizeMessage } from '@/lib/copilot/chat/persisted-message' +import { + MothershipStreamV1CompletionStatus, + MothershipStreamV1EventType, + MothershipStreamV1SessionKind, + MothershipStreamV1TextChannel, +} from '@/lib/copilot/generated/mothership-stream-v1' +import type { StreamBatchEvent } from '@/lib/copilot/request/session/types' + +function toBatchEvent(eventId: number, event: StreamBatchEvent['event']): StreamBatchEvent { + return { + eventId, + streamId: event.stream.streamId, + event, + } +} + +function buildUserMessage(id: string, content: string) { + return normalizeMessage({ + id, + role: 'user', + content, + timestamp: '2026-04-15T12:00:00.000Z', + }) +} + +describe('buildEffectiveChatTranscript', () => { + it('returns the existing transcript when the stream owner is no longer the trailing user', () => { + const messages = [ + buildUserMessage('stream-1', 'Hello'), + normalizeMessage({ + id: 'assistant-1', + role: 'assistant', + content: 'Persisted response', + timestamp: '2026-04-15T12:00:01.000Z', + }), + ] + + const result = buildEffectiveChatTranscript({ + messages, + activeStreamId: 'stream-1', + streamSnapshot: { + events: [ + toBatchEvent(1, { + v: 1, + seq: 1, + ts: '2026-04-15T12:00:01.000Z', + type: MothershipStreamV1EventType.text, + stream: { streamId: 'stream-1' }, + payload: { + channel: MothershipStreamV1TextChannel.assistant, + text: 'Live response', + }, + }), + ], + previewSessions: [], + status: 'active', + }, + }) + + expect(result).toEqual(messages) + }) + + it('appends a placeholder assistant while an active stream has not produced text yet', () => { + const result = buildEffectiveChatTranscript({ + messages: [buildUserMessage('stream-1', 'Hello')], + activeStreamId: 'stream-1', + streamSnapshot: { + events: [ + toBatchEvent(1, { + v: 1, + seq: 1, + ts: '2026-04-15T12:00:01.000Z', + type: MothershipStreamV1EventType.session, + stream: { streamId: 'stream-1' }, + payload: { + kind: MothershipStreamV1SessionKind.start, + }, + }), + ], + previewSessions: [], + status: 'active', + }, + }) + + expect(result).toHaveLength(2) + expect(result[1]).toEqual( + expect.objectContaining({ + id: getLiveAssistantMessageId('stream-1'), + role: 'assistant', + content: '', + }) + ) + }) + + it('materializes a live assistant response from redis-backed stream events', () => { + const result = buildEffectiveChatTranscript({ + messages: [buildUserMessage('stream-1', 'Hello')], + activeStreamId: 'stream-1', + streamSnapshot: { + events: [ + toBatchEvent(1, { + v: 1, + seq: 1, + ts: '2026-04-15T12:00:01.000Z', + type: MothershipStreamV1EventType.session, + stream: { streamId: 'stream-1' }, + trace: { requestId: 'req-1' }, + payload: { + kind: MothershipStreamV1SessionKind.trace, + requestId: 'req-1', + }, + }), + toBatchEvent(2, { + v: 1, + seq: 2, + ts: '2026-04-15T12:00:02.000Z', + type: MothershipStreamV1EventType.text, + stream: { streamId: 'stream-1' }, + trace: { requestId: 'req-1' }, + payload: { + channel: MothershipStreamV1TextChannel.assistant, + text: 'Live response', + }, + }), + ], + previewSessions: [], + status: 'active', + }, + }) + + expect(result).toHaveLength(2) + expect(result[1]).toEqual( + expect.objectContaining({ + id: getLiveAssistantMessageId('stream-1'), + role: 'assistant', + content: 'Live response', + requestId: 'req-1', + }) + ) + }) + + it('does not duplicate thinking-only text into a second assistant block', () => { + const result = buildEffectiveChatTranscript({ + messages: [buildUserMessage('stream-1', 'Hello')], + activeStreamId: 'stream-1', + streamSnapshot: { + events: [ + toBatchEvent(1, { + v: 1, + seq: 1, + ts: '2026-04-15T12:00:01.000Z', + type: MothershipStreamV1EventType.text, + stream: { streamId: 'stream-1' }, + payload: { + channel: MothershipStreamV1TextChannel.thinking, + text: 'Internal reasoning', + }, + }), + ], + previewSessions: [], + status: 'active', + }, + }) + + expect(result).toHaveLength(2) + expect(result[1]).toEqual( + expect.objectContaining({ + content: 'Internal reasoning', + contentBlocks: [ + expect.objectContaining({ + type: MothershipStreamV1EventType.text, + content: 'Internal reasoning', + }), + ], + }) + ) + }) + + it('treats user-cancelled tool results as cancelled', () => { + const result = buildEffectiveChatTranscript({ + messages: [buildUserMessage('stream-1', 'Hello')], + activeStreamId: 'stream-1', + streamSnapshot: { + events: [ + toBatchEvent(1, { + v: 1, + seq: 1, + ts: '2026-04-15T12:00:01.000Z', + type: MothershipStreamV1EventType.tool, + stream: { streamId: 'stream-1' }, + payload: { + phase: 'result', + toolCallId: 'tool-1', + toolName: 'workspace_file', + executor: 'go', + mode: 'sync', + success: false, + output: { + reason: 'user_cancelled', + }, + }, + }), + ], + previewSessions: [], + status: 'active', + }, + }) + + expect(result[1]?.contentBlocks).toEqual([ + expect.objectContaining({ + type: MothershipStreamV1EventType.tool, + toolCall: expect.objectContaining({ + id: 'tool-1', + name: 'workspace_file', + state: MothershipStreamV1CompletionStatus.cancelled, + }), + }), + ]) + }) + + it('materializes a cancelled assistant tail when the stream ends before persistence', () => { + const result = buildEffectiveChatTranscript({ + messages: [buildUserMessage('stream-1', 'Hello')], + activeStreamId: 'stream-1', + streamSnapshot: { + events: [ + toBatchEvent(1, { + v: 1, + seq: 1, + ts: '2026-04-15T12:00:01.000Z', + type: MothershipStreamV1EventType.complete, + stream: { streamId: 'stream-1' }, + payload: { + status: MothershipStreamV1CompletionStatus.cancelled, + }, + }), + ], + previewSessions: [], + status: MothershipStreamV1CompletionStatus.cancelled, + }, + }) + + expect(result).toHaveLength(2) + expect(result[1]?.contentBlocks).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + type: MothershipStreamV1EventType.complete, + status: MothershipStreamV1CompletionStatus.cancelled, + }), + ]) + ) + }) +}) diff --git a/apps/sim/lib/copilot/chat/effective-transcript.ts b/apps/sim/lib/copilot/chat/effective-transcript.ts new file mode 100644 index 0000000000..339a326d12 --- /dev/null +++ b/apps/sim/lib/copilot/chat/effective-transcript.ts @@ -0,0 +1,412 @@ +import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message' +import { resolveStreamToolOutcome } from '@/lib/copilot/chat/stream-tool-outcome' +import { + MothershipStreamV1CompletionStatus, + type MothershipStreamV1ErrorPayload, + MothershipStreamV1EventType, + MothershipStreamV1RunKind, + MothershipStreamV1SessionKind, + MothershipStreamV1SpanLifecycleEvent, + MothershipStreamV1SpanPayloadKind, + MothershipStreamV1ToolOutcome, + MothershipStreamV1ToolPhase, +} from '@/lib/copilot/generated/mothership-stream-v1' +import type { FilePreviewSession } from '@/lib/copilot/request/session/file-preview-session-contract' +import type { StreamBatchEvent } from '@/lib/copilot/request/session/types' + +interface StreamSnapshotLike { + events: StreamBatchEvent[] + previewSessions: FilePreviewSession[] + status: string +} + +interface BuildEffectiveChatTranscriptParams { + messages: PersistedMessage[] + activeStreamId: string | null + streamSnapshot?: StreamSnapshotLike | null +} + +type RawPersistedBlock = Record + +export function getLiveAssistantMessageId(streamId: string): string { + return `live-assistant:${streamId}` +} + +function isRecord(value: unknown): value is Record { + return Boolean(value) && typeof value === 'object' && !Array.isArray(value) +} + +function asPayloadRecord(value: unknown): Record | undefined { + return isRecord(value) ? value : undefined +} + +function isTerminalStreamStatus(status: string | null | undefined): boolean { + return ( + status === MothershipStreamV1CompletionStatus.complete || + status === MothershipStreamV1CompletionStatus.error || + status === MothershipStreamV1CompletionStatus.cancelled + ) +} + +function buildInlineErrorTag(payload: MothershipStreamV1ErrorPayload): string { + const message = + (typeof payload.displayMessage === 'string' ? payload.displayMessage : undefined) || + (typeof payload.message === 'string' ? payload.message : undefined) || + (typeof payload.error === 'string' ? payload.error : undefined) || + 'An unexpected error occurred' + const provider = typeof payload.provider === 'string' ? payload.provider : undefined + const code = typeof payload.code === 'string' ? payload.code : undefined + return `${JSON.stringify({ + message, + ...(code ? { code } : {}), + ...(provider ? { provider } : {}), + })}` +} + +function resolveToolDisplayTitle(ui: unknown): string | undefined { + if (!isRecord(ui)) return undefined + return typeof ui.title === 'string' + ? ui.title + : typeof ui.phaseLabel === 'string' + ? ui.phaseLabel + : undefined +} + +function appendTextBlock( + blocks: RawPersistedBlock[], + content: string, + options: { + lane?: 'subagent' + } +): void { + if (!content) return + const last = blocks[blocks.length - 1] + if (last?.type === MothershipStreamV1EventType.text && last.lane === options.lane) { + last.content = `${typeof last.content === 'string' ? last.content : ''}${content}` + return + } + + blocks.push({ + type: MothershipStreamV1EventType.text, + ...(options.lane ? { lane: options.lane } : {}), + content, + }) +} + +function buildLiveAssistantMessage(params: { + streamId: string + events: StreamBatchEvent[] + status: string | null | undefined +}): PersistedMessage | null { + const { streamId, events, status } = params + const blocks: RawPersistedBlock[] = [] + const toolIndexById = new Map() + const subagentByParentToolCallId = new Map() + let activeSubagent: string | undefined + let activeSubagentParentToolCallId: string | undefined + let activeCompactionId: string | undefined + let runningText = '' + let lastContentSource: 'main' | 'subagent' | null = null + let requestId: string | undefined + let lastTimestamp: string | undefined + + const resolveScopedSubagent = ( + agentId: string | undefined, + parentToolCallId: string | undefined + ): string | undefined => { + if (agentId) return agentId + if (parentToolCallId) { + const scoped = subagentByParentToolCallId.get(parentToolCallId) + if (scoped) return scoped + } + return activeSubagent + } + + const ensureToolBlock = (input: { + toolCallId: string + toolName: string + calledBy?: string + displayTitle?: string + params?: Record + result?: { success: boolean; output?: unknown; error?: string } + state?: string + }): RawPersistedBlock => { + const existingIndex = toolIndexById.get(input.toolCallId) + if (existingIndex !== undefined) { + const existing = blocks[existingIndex] + const existingToolCall = asPayloadRecord(existing.toolCall) + existing.toolCall = { + ...(existingToolCall ?? {}), + id: input.toolCallId, + name: input.toolName, + state: + input.state ?? + (typeof existingToolCall?.state === 'string' ? existingToolCall.state : 'executing'), + ...(input.calledBy ? { calledBy: input.calledBy } : {}), + ...(input.params ? { params: input.params } : {}), + ...(input.result ? { result: input.result } : {}), + ...(input.displayTitle + ? { + display: { + title: input.displayTitle, + }, + } + : existingToolCall?.display + ? { display: existingToolCall.display } + : {}), + } + return existing + } + + const nextBlock: RawPersistedBlock = { + type: MothershipStreamV1EventType.tool, + phase: MothershipStreamV1ToolPhase.call, + toolCall: { + id: input.toolCallId, + name: input.toolName, + state: input.state ?? 'executing', + ...(input.calledBy ? { calledBy: input.calledBy } : {}), + ...(input.params ? { params: input.params } : {}), + ...(input.result ? { result: input.result } : {}), + ...(input.displayTitle + ? { + display: { + title: input.displayTitle, + }, + } + : {}), + }, + } + toolIndexById.set(input.toolCallId, blocks.length) + blocks.push(nextBlock) + return nextBlock + } + + for (const entry of events) { + const parsed = entry.event + lastTimestamp = parsed.ts + if (typeof parsed.trace?.requestId === 'string') { + requestId = parsed.trace.requestId + } + const scopedParentToolCallId = + typeof parsed.scope?.parentToolCallId === 'string' ? parsed.scope.parentToolCallId : undefined + const scopedAgentId = + typeof parsed.scope?.agentId === 'string' ? parsed.scope.agentId : undefined + const scopedSubagent = resolveScopedSubagent(scopedAgentId, scopedParentToolCallId) + + switch (parsed.type) { + case MothershipStreamV1EventType.session: { + if (parsed.payload.kind === MothershipStreamV1SessionKind.chat) { + continue + } + if (parsed.payload.kind === MothershipStreamV1SessionKind.start) { + continue + } + if (parsed.payload.kind === MothershipStreamV1SessionKind.trace) { + requestId = parsed.payload.requestId + } + continue + } + case MothershipStreamV1EventType.text: { + const chunk = parsed.payload.text + if (!chunk) { + continue + } + const contentSource: 'main' | 'subagent' = scopedSubagent ? 'subagent' : 'main' + const needsBoundaryNewline = + lastContentSource !== null && + lastContentSource !== contentSource && + runningText.length > 0 && + !runningText.endsWith('\n') + const normalizedChunk = needsBoundaryNewline ? `\n${chunk}` : chunk + appendTextBlock(blocks, normalizedChunk, { + ...(scopedSubagent ? { lane: 'subagent' as const } : {}), + }) + runningText += normalizedChunk + lastContentSource = contentSource + continue + } + case MothershipStreamV1EventType.tool: { + const payload = parsed.payload + const toolCallId = payload.toolCallId + const displayTitle = resolveToolDisplayTitle('ui' in payload ? payload.ui : undefined) + + if ('previewPhase' in payload) { + continue + } + + if (payload.phase === MothershipStreamV1ToolPhase.args_delta) { + continue + } + + if (payload.phase === MothershipStreamV1ToolPhase.result) { + ensureToolBlock({ + toolCallId, + toolName: payload.toolName, + calledBy: scopedSubagent, + state: resolveStreamToolOutcome(payload), + result: { + success: payload.success, + ...(payload.output !== undefined ? { output: payload.output } : {}), + ...(typeof payload.error === 'string' ? { error: payload.error } : {}), + }, + }) + continue + } + + ensureToolBlock({ + toolCallId, + toolName: payload.toolName, + calledBy: scopedSubagent, + displayTitle, + params: isRecord(payload.arguments) ? payload.arguments : undefined, + state: typeof payload.status === 'string' ? payload.status : 'executing', + }) + continue + } + case MothershipStreamV1EventType.span: { + if (parsed.payload.kind !== MothershipStreamV1SpanPayloadKind.subagent) { + continue + } + + const spanData = asPayloadRecord(parsed.payload.data) + const parentToolCallId = + scopedParentToolCallId ?? + (typeof spanData?.tool_call_id === 'string' ? spanData.tool_call_id : undefined) + const name = typeof parsed.payload.agent === 'string' ? parsed.payload.agent : scopedAgentId + if (parsed.payload.event === MothershipStreamV1SpanLifecycleEvent.start && name) { + if (parentToolCallId) { + subagentByParentToolCallId.set(parentToolCallId, name) + } + activeSubagent = name + activeSubagentParentToolCallId = parentToolCallId + blocks.push({ + type: MothershipStreamV1EventType.span, + kind: MothershipStreamV1SpanPayloadKind.subagent, + lifecycle: MothershipStreamV1SpanLifecycleEvent.start, + content: name, + }) + continue + } + + if (parsed.payload.event === MothershipStreamV1SpanLifecycleEvent.end) { + if (spanData?.pending === true) { + continue + } + if (parentToolCallId) { + subagentByParentToolCallId.delete(parentToolCallId) + } + if ( + !parentToolCallId || + parentToolCallId === activeSubagentParentToolCallId || + name === activeSubagent + ) { + activeSubagent = undefined + activeSubagentParentToolCallId = undefined + } + blocks.push({ + type: MothershipStreamV1EventType.span, + kind: MothershipStreamV1SpanPayloadKind.subagent, + lifecycle: MothershipStreamV1SpanLifecycleEvent.end, + }) + } + continue + } + case MothershipStreamV1EventType.run: { + if (parsed.payload.kind === MothershipStreamV1RunKind.compaction_start) { + activeCompactionId = `compaction_${entry.eventId}` + ensureToolBlock({ + toolCallId: activeCompactionId, + toolName: 'context_compaction', + displayTitle: 'Compacting context...', + state: 'executing', + }) + continue + } + + if (parsed.payload.kind === MothershipStreamV1RunKind.compaction_done) { + const compactionId = activeCompactionId ?? `compaction_${entry.eventId}` + activeCompactionId = undefined + ensureToolBlock({ + toolCallId: compactionId, + toolName: 'context_compaction', + displayTitle: 'Compacted context', + state: MothershipStreamV1ToolOutcome.success, + }) + } + continue + } + case MothershipStreamV1EventType.error: { + const tag = buildInlineErrorTag(parsed.payload) + if (runningText.includes(tag)) { + continue + } + const prefix = runningText.length > 0 && !runningText.endsWith('\n') ? '\n' : '' + const content = `${prefix}${tag}` + appendTextBlock(blocks, content, { + ...(scopedSubagent ? { lane: 'subagent' as const } : {}), + }) + runningText += content + continue + } + case MothershipStreamV1EventType.complete: { + if (parsed.payload.status === MothershipStreamV1CompletionStatus.cancelled) { + blocks.push({ + type: MothershipStreamV1EventType.complete, + status: parsed.payload.status, + }) + } + continue + } + case MothershipStreamV1EventType.resource: { + continue + } + default: { + continue + } + } + } + + if (blocks.length === 0 && !runningText && isTerminalStreamStatus(status)) { + return null + } + + return normalizeMessage({ + id: getLiveAssistantMessageId(streamId), + role: 'assistant', + content: runningText, + timestamp: lastTimestamp ?? new Date().toISOString(), + ...(requestId ? { requestId } : {}), + ...(blocks.length > 0 ? { contentBlocks: blocks } : {}), + }) +} + +export function buildEffectiveChatTranscript({ + messages, + activeStreamId, + streamSnapshot, +}: BuildEffectiveChatTranscriptParams): PersistedMessage[] { + if (!activeStreamId || !streamSnapshot) { + return messages + } + + const trailingMessage = messages[messages.length - 1] + if ( + !trailingMessage || + trailingMessage.role !== 'user' || + trailingMessage.id !== activeStreamId + ) { + return messages + } + + const liveAssistant = buildLiveAssistantMessage({ + streamId: activeStreamId, + events: streamSnapshot.events, + status: streamSnapshot.status, + }) + if (!liveAssistant) { + return messages + } + + return [...messages, liveAssistant] +} diff --git a/apps/sim/lib/copilot/chat/post.ts b/apps/sim/lib/copilot/chat/post.ts index b15e84db69..8581621d1f 100644 --- a/apps/sim/lib/copilot/chat/post.ts +++ b/apps/sim/lib/copilot/chat/post.ts @@ -52,7 +52,7 @@ const FileAttachmentSchema = z.object({ }) const ResourceAttachmentSchema = z.object({ - type: z.enum(['workflow', 'table', 'file', 'knowledgebase', 'folder']), + type: z.enum(['workflow', 'table', 'file', 'knowledgebase', 'folder', 'task', 'log', 'generic']), id: z.string().min(1), title: z.string().optional(), active: z.boolean().optional(), @@ -64,6 +64,9 @@ const GENERIC_RESOURCE_TITLE: Record['t file: 'File', knowledgebase: 'Knowledge Base', folder: 'Folder', + task: 'Task', + log: 'Log', + generic: 'Resource', } const ChatContextSchema = z.object({ diff --git a/apps/sim/lib/copilot/chat/stream-tool-outcome.ts b/apps/sim/lib/copilot/chat/stream-tool-outcome.ts new file mode 100644 index 0000000000..863c47f98e --- /dev/null +++ b/apps/sim/lib/copilot/chat/stream-tool-outcome.ts @@ -0,0 +1,46 @@ +import { MothershipStreamV1ToolOutcome } from '@/lib/copilot/generated/mothership-stream-v1' + +type TerminalToolOutcome = + | typeof MothershipStreamV1ToolOutcome.success + | typeof MothershipStreamV1ToolOutcome.error + | typeof MothershipStreamV1ToolOutcome.cancelled + | typeof MothershipStreamV1ToolOutcome.skipped + | typeof MothershipStreamV1ToolOutcome.rejected + +interface ResolveStreamToolOutcomeParams { + output?: unknown + status?: string + success?: boolean +} + +function isRecord(value: unknown): value is Record { + return Boolean(value) && typeof value === 'object' && !Array.isArray(value) +} + +export function resolveStreamToolOutcome({ + output, + status, + success, +}: ResolveStreamToolOutcomeParams): TerminalToolOutcome { + const outputRecord = isRecord(output) ? output : undefined + const isCancelled = + outputRecord?.reason === 'user_cancelled' || + outputRecord?.cancelledByUser === true || + status === MothershipStreamV1ToolOutcome.cancelled + + if (isCancelled) { + return MothershipStreamV1ToolOutcome.cancelled + } + + switch (status) { + case MothershipStreamV1ToolOutcome.success: + case MothershipStreamV1ToolOutcome.error: + case MothershipStreamV1ToolOutcome.skipped: + case MothershipStreamV1ToolOutcome.rejected: + return status + default: + return success === true + ? MothershipStreamV1ToolOutcome.success + : MothershipStreamV1ToolOutcome.error + } +}