diff --git a/apps/docs/content/docs/en/tools/agiloft.mdx b/apps/docs/content/docs/en/tools/agiloft.mdx
index b5579d98ff..235300ea25 100644
--- a/apps/docs/content/docs/en/tools/agiloft.mdx
+++ b/apps/docs/content/docs/en/tools/agiloft.mdx
@@ -7,7 +7,7 @@ import { BlockInfoCard } from "@/components/ui/block-info-card"
{/* MANUAL-CONTENT-START:intro */}
diff --git a/apps/sim/app/api/copilot/chat/queries.ts b/apps/sim/app/api/copilot/chat/queries.ts
index 4828a15aa7..9977f5419b 100644
--- a/apps/sim/app/api/copilot/chat/queries.ts
+++ b/apps/sim/app/api/copilot/chat/queries.ts
@@ -4,7 +4,9 @@ import { createLogger } from '@sim/logger'
import { and, desc, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository'
+import { buildEffectiveChatTranscript } from '@/lib/copilot/chat/effective-transcript'
import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle'
+import { normalizeMessage } from '@/lib/copilot/chat/persisted-message'
import {
authenticateCopilotRequestSessionOnly,
createBadRequestResponse,
@@ -113,11 +115,23 @@ export async function GET(req: NextRequest) {
}
}
+ const normalizedMessages = Array.isArray(chat.messages)
+ ? chat.messages
+ .filter((message): message is Record => Boolean(message))
+ .map(normalizeMessage)
+ : []
+ const effectiveMessages = buildEffectiveChatTranscript({
+ messages: normalizedMessages,
+ activeStreamId: chat.conversationId || null,
+ ...(streamSnapshot ? { streamSnapshot } : {}),
+ })
+
logger.info(`Retrieved chat ${chatId}`)
return NextResponse.json({
success: true,
chat: {
...transformChat(chat),
+ messages: effectiveMessages,
...(streamSnapshot ? { streamSnapshot } : {}),
},
})
diff --git a/apps/sim/app/api/copilot/chat/stop/route.test.ts b/apps/sim/app/api/copilot/chat/stop/route.test.ts
new file mode 100644
index 0000000000..21c32e38a7
--- /dev/null
+++ b/apps/sim/app/api/copilot/chat/stop/route.test.ts
@@ -0,0 +1,160 @@
+/**
+ * @vitest-environment node
+ */
+import { NextRequest } from 'next/server'
+import { beforeEach, describe, expect, it, vi } from 'vitest'
+
+const {
+ mockGetSession,
+ mockSelect,
+ mockFrom,
+ mockWhereSelect,
+ mockLimit,
+ mockUpdate,
+ mockSet,
+ mockWhereUpdate,
+ mockReturning,
+ mockPublishStatusChanged,
+ mockSql,
+} = vi.hoisted(() => ({
+ mockGetSession: vi.fn(),
+ mockSelect: vi.fn(),
+ mockFrom: vi.fn(),
+ mockWhereSelect: vi.fn(),
+ mockLimit: vi.fn(),
+ mockUpdate: vi.fn(),
+ mockSet: vi.fn(),
+ mockWhereUpdate: vi.fn(),
+ mockReturning: vi.fn(),
+ mockPublishStatusChanged: vi.fn(),
+ mockSql: vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ strings, values })),
+}))
+
+vi.mock('@/lib/auth', () => ({
+ getSession: mockGetSession,
+}))
+
+vi.mock('@sim/db', () => ({
+ db: {
+ select: mockSelect,
+ update: mockUpdate,
+ },
+}))
+
+vi.mock('@sim/db/schema', () => ({
+ copilotChats: {
+ id: 'id',
+ userId: 'userId',
+ workspaceId: 'workspaceId',
+ messages: 'messages',
+ conversationId: 'conversationId',
+ },
+}))
+
+vi.mock('drizzle-orm', () => ({
+ and: vi.fn((...conditions: unknown[]) => ({ conditions, type: 'and' })),
+ eq: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'eq' })),
+ sql: mockSql,
+}))
+
+vi.mock('@/lib/copilot/tasks', () => ({
+ taskPubSub: {
+ publishStatusChanged: mockPublishStatusChanged,
+ },
+}))
+
+import { POST } from '@/app/api/copilot/chat/stop/route'
+
+function createRequest(body: Record) {
+ return new NextRequest('http://localhost:3000/api/copilot/chat/stop', {
+ method: 'POST',
+ body: JSON.stringify(body),
+ headers: { 'Content-Type': 'application/json' },
+ })
+}
+
+describe('copilot chat stop route', () => {
+ beforeEach(() => {
+ vi.clearAllMocks()
+
+ mockGetSession.mockResolvedValue({ user: { id: 'user-1' } })
+
+ mockLimit.mockResolvedValue([
+ {
+ workspaceId: 'ws-1',
+ messages: [{ id: 'stream-1', role: 'user', content: 'hello' }],
+ },
+ ])
+ mockWhereSelect.mockReturnValue({ limit: mockLimit })
+ mockFrom.mockReturnValue({ where: mockWhereSelect })
+ mockSelect.mockReturnValue({ from: mockFrom })
+
+ mockReturning.mockResolvedValue([{ workspaceId: 'ws-1' }])
+ mockWhereUpdate.mockReturnValue({ returning: mockReturning })
+ mockSet.mockReturnValue({ where: mockWhereUpdate })
+ mockUpdate.mockReturnValue({ set: mockSet })
+ })
+
+ it('returns 401 when unauthenticated', async () => {
+ mockGetSession.mockResolvedValueOnce(null)
+
+ const response = await POST(
+ createRequest({
+ chatId: 'chat-1',
+ streamId: 'stream-1',
+ content: '',
+ })
+ )
+
+ expect(response.status).toBe(401)
+ expect(await response.json()).toEqual({ error: 'Unauthorized' })
+ })
+
+ it('is a no-op when the chat is missing', async () => {
+ mockLimit.mockResolvedValueOnce([])
+
+ const response = await POST(
+ createRequest({
+ chatId: 'missing-chat',
+ streamId: 'stream-1',
+ content: '',
+ })
+ )
+
+ expect(response.status).toBe(200)
+ expect(await response.json()).toEqual({ success: true })
+ expect(mockUpdate).not.toHaveBeenCalled()
+ })
+
+ it('appends a stopped assistant message even with no content', async () => {
+ const response = await POST(
+ createRequest({
+ chatId: 'chat-1',
+ streamId: 'stream-1',
+ content: '',
+ })
+ )
+
+ expect(response.status).toBe(200)
+ expect(await response.json()).toEqual({ success: true })
+
+ const setArg = mockSet.mock.calls[0]?.[0]
+ expect(setArg).toBeTruthy()
+ expect(setArg.conversationId).toBeNull()
+ expect(setArg.messages).toBeTruthy()
+
+ const appendedPayload = JSON.parse(setArg.messages.values[1] as string)
+ expect(appendedPayload).toHaveLength(1)
+ expect(appendedPayload[0]).toMatchObject({
+ role: 'assistant',
+ content: '',
+ contentBlocks: [{ type: 'complete', status: 'cancelled' }],
+ })
+
+ expect(mockPublishStatusChanged).toHaveBeenCalledWith({
+ workspaceId: 'ws-1',
+ chatId: 'chat-1',
+ type: 'completed',
+ })
+ })
+})
diff --git a/apps/sim/app/api/copilot/chat/stop/route.ts b/apps/sim/app/api/copilot/chat/stop/route.ts
index 8a742d7080..05e5935aa4 100644
--- a/apps/sim/app/api/copilot/chat/stop/route.ts
+++ b/apps/sim/app/api/copilot/chat/stop/route.ts
@@ -7,6 +7,7 @@ import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message'
import { taskPubSub } from '@/lib/copilot/tasks'
+import { generateId } from '@/lib/core/utils/uuid'
const logger = createLogger('CopilotChatStopAPI')
@@ -70,7 +71,6 @@ export async function POST(req: NextRequest) {
}
const { chatId, streamId, content, contentBlocks } = StopSchema.parse(await req.json())
-
const [row] = await db
.select({
workspaceId: copilotChats.workspaceId,
@@ -106,14 +106,18 @@ export async function POST(req: NextRequest) {
const hasContent = content.trim().length > 0
const hasBlocks = Array.isArray(contentBlocks) && contentBlocks.length > 0
-
- if ((hasContent || hasBlocks) && canAppendAssistant) {
+ const synthesizedStoppedBlocks = hasBlocks
+ ? contentBlocks
+ : hasContent
+ ? [{ type: 'text', channel: 'assistant', content }, { type: 'stopped' }]
+ : [{ type: 'stopped' }]
+ if (canAppendAssistant) {
const normalized = normalizeMessage({
- id: crypto.randomUUID(),
+ id: generateId(),
role: 'assistant',
content,
timestamp: new Date().toISOString(),
- ...(hasBlocks ? { contentBlocks } : {}),
+ contentBlocks: synthesizedStoppedBlocks,
})
const assistantMessage: PersistedMessage = normalized
setClause.messages = sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`
diff --git a/apps/sim/app/api/mothership/chats/[chatId]/route.ts b/apps/sim/app/api/mothership/chats/[chatId]/route.ts
index e5fc73f301..cf94fdda83 100644
--- a/apps/sim/app/api/mothership/chats/[chatId]/route.ts
+++ b/apps/sim/app/api/mothership/chats/[chatId]/route.ts
@@ -5,7 +5,9 @@ import { and, eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository'
+import { buildEffectiveChatTranscript } from '@/lib/copilot/chat/effective-transcript'
import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle'
+import { normalizeMessage } from '@/lib/copilot/chat/persisted-message'
import {
authenticateCopilotRequestSessionOnly,
createBadRequestResponse,
@@ -93,12 +95,23 @@ export async function GET(
}
}
+ const normalizedMessages = Array.isArray(chat.messages)
+ ? chat.messages
+ .filter((message): message is Record => Boolean(message))
+ .map(normalizeMessage)
+ : []
+ const effectiveMessages = buildEffectiveChatTranscript({
+ messages: normalizedMessages,
+ activeStreamId: chat.conversationId || null,
+ ...(streamSnapshot ? { streamSnapshot } : {}),
+ })
+
return NextResponse.json({
success: true,
chat: {
id: chat.id,
title: chat.title,
- messages: Array.isArray(chat.messages) ? chat.messages : [],
+ messages: effectiveMessages,
conversationId: chat.conversationId || null,
resources: Array.isArray(chat.resources) ? chat.resources : [],
createdAt: chat.createdAt,
diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts
index 4b7b9ff5d0..8575e9a1b4 100644
--- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts
+++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts
@@ -3,16 +3,20 @@ import { createLogger } from '@sim/logger'
import { useQueryClient } from '@tanstack/react-query'
import { usePathname, useRouter } from 'next/navigation'
import { toDisplayMessage } from '@/lib/copilot/chat/display-message'
+import { getLiveAssistantMessageId } from '@/lib/copilot/chat/effective-transcript'
import type {
PersistedFileAttachment,
PersistedMessage,
} from '@/lib/copilot/chat/persisted-message'
-import { MOTHERSHIP_CHAT_API_PATH } from '@/lib/copilot/constants'
+import { normalizeMessage } from '@/lib/copilot/chat/persisted-message'
+import { resolveStreamToolOutcome } from '@/lib/copilot/chat/stream-tool-outcome'
+import { MOTHERSHIP_CHAT_API_PATH, STREAM_STORAGE_KEY } from '@/lib/copilot/constants'
import type {
MothershipStreamV1ErrorPayload,
MothershipStreamV1ToolUI,
} from '@/lib/copilot/generated/mothership-stream-v1'
import {
+ MothershipStreamV1CompletionStatus,
MothershipStreamV1EventType,
MothershipStreamV1ResourceOp,
MothershipStreamV1RunKind,
@@ -172,6 +176,8 @@ const RECONNECT_TAIL_ERROR =
const MAX_RECONNECT_ATTEMPTS = 10
const RECONNECT_BASE_DELAY_MS = 1000
const RECONNECT_MAX_DELAY_MS = 30_000
+const QUEUED_SEND_HANDOFF_STORAGE_KEY = `${STREAM_STORAGE_KEY}:queued-send-handoff`
+const QUEUED_SEND_HANDOFF_CLAIM_STORAGE_KEY = `${STREAM_STORAGE_KEY}:queued-send-handoff-claim`
const logger = createLogger('useChat')
@@ -188,6 +194,100 @@ type ActiveTurn = {
optimisticAssistantMessage: ChatMessage
}
+interface QueuedSendHandoffState {
+ id: string
+ chatId: string
+ workspaceId: string
+ supersededStreamId: string | null
+ userMessageId: string
+ message: string
+ fileAttachments?: FileAttachmentForApi[]
+ contexts?: ChatContext[]
+ requestedAt: number
+}
+
+interface QueuedSendHandoffSeed {
+ id: string
+ chatId: string
+ supersededStreamId: string | null
+ userMessageId?: string
+}
+
+function readQueuedSendHandoffState(): QueuedSendHandoffState | null {
+ if (typeof window === 'undefined') return null
+
+ try {
+ const raw = window.sessionStorage.getItem(QUEUED_SEND_HANDOFF_STORAGE_KEY)
+ if (!raw) return null
+
+ const parsed = JSON.parse(raw) as Partial
+ if (
+ typeof parsed?.id !== 'string' ||
+ typeof parsed.chatId !== 'string' ||
+ typeof parsed.workspaceId !== 'string' ||
+ typeof parsed.userMessageId !== 'string' ||
+ typeof parsed.message !== 'string' ||
+ typeof parsed.requestedAt !== 'number'
+ ) {
+ return null
+ }
+
+ return {
+ id: parsed.id,
+ chatId: parsed.chatId,
+ workspaceId: parsed.workspaceId,
+ supersededStreamId:
+ typeof parsed.supersededStreamId === 'string' ? parsed.supersededStreamId : null,
+ userMessageId: parsed.userMessageId,
+ message: parsed.message,
+ ...(Array.isArray(parsed.fileAttachments)
+ ? { fileAttachments: parsed.fileAttachments as FileAttachmentForApi[] }
+ : {}),
+ ...(Array.isArray(parsed.contexts) ? { contexts: parsed.contexts as ChatContext[] } : {}),
+ requestedAt: parsed.requestedAt,
+ }
+ } catch {
+ return null
+ }
+}
+
+function writeQueuedSendHandoffState(state: QueuedSendHandoffState) {
+ if (typeof window === 'undefined') return
+ window.sessionStorage.setItem(QUEUED_SEND_HANDOFF_STORAGE_KEY, JSON.stringify(state))
+}
+
+function clearQueuedSendHandoffState(expectedId?: string) {
+ if (typeof window === 'undefined') return
+ if (expectedId) {
+ const current = readQueuedSendHandoffState()
+ if (current && current.id !== expectedId) {
+ return
+ }
+ }
+ window.sessionStorage.removeItem(QUEUED_SEND_HANDOFF_STORAGE_KEY)
+}
+
+function readQueuedSendHandoffClaim(): string | null {
+ if (typeof window === 'undefined') return null
+ return window.sessionStorage.getItem(QUEUED_SEND_HANDOFF_CLAIM_STORAGE_KEY)
+}
+
+function writeQueuedSendHandoffClaim(id: string) {
+ if (typeof window === 'undefined') return
+ window.sessionStorage.setItem(QUEUED_SEND_HANDOFF_CLAIM_STORAGE_KEY, id)
+}
+
+function clearQueuedSendHandoffClaim(expectedId?: string) {
+ if (typeof window === 'undefined') return
+ if (expectedId) {
+ const current = readQueuedSendHandoffClaim()
+ if (current && current !== expectedId) {
+ return
+ }
+ }
+ window.sessionStorage.removeItem(QUEUED_SEND_HANDOFF_CLAIM_STORAGE_KEY)
+}
+
function stringParam(value: unknown): string | undefined {
return typeof value === 'string' && value.trim() ? value.trim() : undefined
}
@@ -594,6 +694,122 @@ function parseStreamBatchResponse(value: unknown): StreamBatchResponse {
}
}
+function toRawPersistedContentBlock(block: ContentBlock): Record | null {
+ switch (block.type) {
+ case 'text':
+ return {
+ type: MothershipStreamV1EventType.text,
+ ...(block.subagent ? { lane: 'subagent' } : {}),
+ content: block.content ?? '',
+ }
+ case 'tool_call':
+ if (!block.toolCall) {
+ return null
+ }
+ return {
+ type: MothershipStreamV1EventType.tool,
+ phase: MothershipStreamV1ToolPhase.call,
+ toolCall: {
+ id: block.toolCall.id,
+ name: block.toolCall.name,
+ state: block.toolCall.status,
+ ...(block.toolCall.params ? { params: block.toolCall.params } : {}),
+ ...(block.toolCall.result ? { result: block.toolCall.result } : {}),
+ ...(block.toolCall.calledBy ? { calledBy: block.toolCall.calledBy } : {}),
+ ...(block.toolCall.displayTitle
+ ? {
+ display: {
+ title: block.toolCall.displayTitle,
+ },
+ }
+ : {}),
+ },
+ }
+ case 'subagent':
+ return {
+ type: MothershipStreamV1EventType.span,
+ kind: MothershipStreamV1SpanPayloadKind.subagent,
+ lifecycle: MothershipStreamV1SpanLifecycleEvent.start,
+ content: block.content ?? '',
+ }
+ case 'subagent_end':
+ return {
+ type: MothershipStreamV1EventType.span,
+ kind: MothershipStreamV1SpanPayloadKind.subagent,
+ lifecycle: MothershipStreamV1SpanLifecycleEvent.end,
+ }
+ case 'stopped':
+ return {
+ type: MothershipStreamV1EventType.complete,
+ status: MothershipStreamV1CompletionStatus.cancelled,
+ }
+ default:
+ return null
+ }
+}
+
+function buildAssistantSnapshotMessage(params: {
+ id: string
+ content: string
+ contentBlocks: ContentBlock[]
+ requestId?: string
+}): PersistedMessage {
+ const rawContentBlocks = params.contentBlocks
+ .map(toRawPersistedContentBlock)
+ .filter((block): block is Record => block !== null)
+
+ return normalizeMessage({
+ id: params.id,
+ role: 'assistant',
+ content: params.content,
+ timestamp: new Date().toISOString(),
+ ...(params.requestId ? { requestId: params.requestId } : {}),
+ ...(rawContentBlocks.length > 0 ? { contentBlocks: rawContentBlocks } : {}),
+ })
+}
+
+function markMessageStopped(message: PersistedMessage): PersistedMessage {
+ if (!message.contentBlocks?.some((block) => block.toolCall?.state === 'executing')) {
+ return message
+ }
+
+ const nextBlocks = message.contentBlocks.map((block) => {
+ if (block.toolCall?.state !== 'executing') {
+ return block
+ }
+
+ return {
+ ...block,
+ toolCall: {
+ ...block.toolCall,
+ state: 'cancelled' as const,
+ display: {
+ ...(block.toolCall.display ?? {}),
+ title: 'Stopped by user',
+ },
+ },
+ }
+ })
+
+ if (
+ !nextBlocks.some(
+ (block) =>
+ block.type === MothershipStreamV1EventType.complete &&
+ block.status === MothershipStreamV1CompletionStatus.cancelled
+ )
+ ) {
+ nextBlocks.push({
+ type: MothershipStreamV1EventType.complete,
+ status: MothershipStreamV1CompletionStatus.cancelled,
+ })
+ }
+
+ return normalizeMessage({
+ ...message,
+ contentBlocks: nextBlocks,
+ })
+}
+
function buildChatHistoryHydrationKey(chatHistory: TaskChatHistory): string {
const resourceKey = chatHistory.resources
.map((resource) => `${resource.type}:${resource.id}:${resource.title}`)
@@ -667,22 +883,10 @@ function resolveLiveToolStatus(
payload: Partial<{
status: string
success: boolean
+ output: unknown
}>
): ToolCallStatus {
- switch (payload.status) {
- case MothershipStreamV1ToolOutcome.success:
- return ToolCallStatus.success
- case MothershipStreamV1ToolOutcome.error:
- return ToolCallStatus.error
- case MothershipStreamV1ToolOutcome.cancelled:
- return ToolCallStatus.cancelled
- case MothershipStreamV1ToolOutcome.skipped:
- return ToolCallStatus.skipped
- case MothershipStreamV1ToolOutcome.rejected:
- return ToolCallStatus.rejected
- default:
- return payload.success === true ? ToolCallStatus.success : ToolCallStatus.error
- }
+ return resolveStreamToolOutcome(payload) as ToolCallStatus
}
/** Adds a workflow to the React Query cache with a top-insertion sort order if it doesn't already exist. */
@@ -808,7 +1012,7 @@ export function useChat(
const pathname = usePathname()
const router = useRouter()
const queryClient = useQueryClient()
- const [messages, setMessages] = useState([])
+ const [pendingMessages, setPendingMessages] = useState([])
const [isSending, setIsSending] = useState(false)
const [isReconnecting, setIsReconnecting] = useState(false)
const [error, setError] = useState(null)
@@ -855,6 +1059,22 @@ export function useChat(
const activeResourceIdRef = useRef(effectiveActiveResourceId)
activeResourceIdRef.current = effectiveActiveResourceId
+ const upsertTaskChatHistory = useCallback(
+ (chatId: string, updater: (current: TaskChatHistory) => TaskChatHistory) => {
+ queryClient.setQueryData(taskKeys.detail(chatId), (current) => {
+ const base: TaskChatHistory = current ?? {
+ id: chatId,
+ title: null,
+ messages: [],
+ activeStreamId: null,
+ resources: resourcesRef.current,
+ }
+ return updater(base)
+ })
+ },
+ [queryClient]
+ )
+
const {
previewSession,
previewSessionsById,
@@ -975,6 +1195,7 @@ export function useChat(
(opts: { streamId: string; assistantId: string; gen: number }) => Promise
>(async () => false)
const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {})
+ const recoveringQueuedSendHandoffIdRef = useRef(null)
const resetEphemeralPreviewState = useCallback(
(options?: { removeStreamingResource?: boolean }) => {
@@ -1101,7 +1322,7 @@ export function useChat(
setResolvedChatId(undefined)
appliedChatHistoryKeyRef.current = undefined
abortControllerRef.current = null
- setMessages([])
+ setPendingMessages([])
setError(null)
setTransportIdle()
setResources([])
@@ -1111,38 +1332,11 @@ export function useChat(
clearQueueDispatchState()
}, [clearActiveTurn, clearQueueDispatchState, resetEphemeralPreviewState, setTransportIdle])
- const mergeServerMessagesWithActiveTurn = useCallback(
- (serverMessages: ChatMessage[], previousMessages: ChatMessage[]) => {
- const activeTurn = activeTurnRef.current
- if (!activeTurn || !sendingRef.current) {
- return serverMessages
- }
-
- const nextMessages = [...serverMessages]
- const localStreamingUser =
- previousMessages.find(
- (message) => message.id === activeTurn.userMessageId && message.role === 'user'
- ) ?? activeTurn.optimisticUserMessage
- const localStreamingAssistant =
- previousMessages.find(
- (message) => message.id === activeTurn.assistantMessageId && message.role === 'assistant'
- ) ?? activeTurn.optimisticAssistantMessage
-
- if (!nextMessages.some((message) => message.id === localStreamingUser.id)) {
- nextMessages.push(localStreamingUser)
- }
-
- if (!nextMessages.some((message) => message.id === localStreamingAssistant.id)) {
- nextMessages.push(localStreamingAssistant)
- }
-
- return nextMessages
- },
- []
+ const { data: chatHistory } = useChatHistory(resolvedChatId)
+ const messages = useMemo(
+ () => chatHistory?.messages.map(toDisplayMessage) ?? pendingMessages,
+ [chatHistory, pendingMessages]
)
-
- const { data: chatHistory } = useChatHistory(initialChatId)
-
const addResource = useCallback((resource: MothershipResource): boolean => {
if (resourcesRef.current.some((r) => r.type === resource.type && r.id === resource.id)) {
return false
@@ -1268,12 +1462,12 @@ export function useChat(
)
useEffect(() => {
+ const streamOwnerId = chatIdRef.current
+ const navigatedToDifferentChat =
+ sendingRef.current &&
+ initialChatId !== streamOwnerId &&
+ (initialChatId !== undefined || streamOwnerId !== undefined)
if (sendingRef.current) {
- const streamOwnerId = chatIdRef.current
- const navigatedToDifferentChat =
- initialChatId !== streamOwnerId &&
- (initialChatId !== undefined || streamOwnerId !== undefined)
-
if (navigatedToDifferentChat) {
const abandonedChatId = streamOwnerId
// Detach the current UI from the old stream without cancelling it on the server.
@@ -1296,7 +1490,7 @@ export function useChat(
clearActiveTurn()
setResolvedChatId(initialChatId)
appliedChatHistoryKeyRef.current = undefined
- setMessages([])
+ setPendingMessages([])
setError(null)
setTransportIdle()
setResources([])
@@ -1344,13 +1538,6 @@ export function useChat(
if (!activeStreamId && locallyTerminalStreamIdRef.current) {
locallyTerminalStreamIdRef.current = undefined
}
- const shouldPreserveLocalActiveTurn = sendingRef.current && activeTurnRef.current !== null
-
- if (shouldPreserveLocalActiveTurn) {
- setMessages((prev) => mergeServerMessagesWithActiveTurn(mappedMessages, prev))
- } else {
- setMessages(mappedMessages)
- }
void recoverPendingClientWorkflowTools(mappedMessages)
@@ -1399,7 +1586,7 @@ export function useChat(
lastCursorRef.current = '0'
setTransportReconnecting()
- const assistantId = generateId()
+ const assistantId = getLiveAssistantMessageId(activeStreamId)
const reconnect = async () => {
const initialSnapshot = chatHistory.streamSnapshot
@@ -1458,7 +1645,6 @@ export function useChat(
queryClient,
recoverPendingClientWorkflowTools,
seedPreviewSessions,
- mergeServerMessagesWithActiveTurn,
setTransportIdle,
setTransportReconnecting,
])
@@ -1569,22 +1755,41 @@ export function useChat(
const flush = () => {
if (isStale()) return
streamingBlocksRef.current = [...blocks]
- const snapshot: Partial = {
- content: runningText,
- contentBlocks: [...blocks],
- }
- if (streamRequestId) snapshot.requestId = streamRequestId
- setMessages((prev) => {
- if (expectedGen !== undefined && streamGenRef.current !== expectedGen) return prev
- const idx = prev.findIndex((m) => m.id === assistantId)
- if (idx >= 0) {
- return prev.map((m) => (m.id === assistantId ? { ...m, ...snapshot } : m))
+ const activeChatId = chatIdRef.current
+ if (!activeChatId) {
+ const snapshot: Partial = {
+ content: runningText,
+ contentBlocks: [...blocks],
}
- return [
- ...prev,
- { id: assistantId, role: 'assistant' as const, content: '', ...snapshot },
- ]
+ if (streamRequestId) snapshot.requestId = streamRequestId
+ setPendingMessages((prev) => {
+ if (expectedGen !== undefined && streamGenRef.current !== expectedGen) return prev
+ const idx = prev.findIndex((m) => m.id === assistantId)
+ if (idx >= 0) {
+ return prev.map((m) => (m.id === assistantId ? { ...m, ...snapshot } : m))
+ }
+ return [
+ ...prev,
+ { id: assistantId, role: 'assistant' as const, content: '', ...snapshot },
+ ]
+ })
+ return
+ }
+
+ const assistantMessage = buildAssistantSnapshotMessage({
+ id: assistantId,
+ content: runningText,
+ contentBlocks: blocks,
+ ...(streamRequestId ? { requestId: streamRequestId } : {}),
})
+ upsertTaskChatHistory(activeChatId, (current) => ({
+ ...current,
+ messages: [
+ ...current.messages.filter((message) => message.id !== assistantId),
+ assistantMessage,
+ ],
+ activeStreamId: streamIdRef.current ?? current.activeStreamId,
+ }))
}
const flushText = () => {
@@ -1690,14 +1895,23 @@ export function useChat(
const userMsg = pendingUserMsgRef.current
const activeStreamId = streamIdRef.current
if (userMsg && activeStreamId) {
+ const assistantMessage = buildAssistantSnapshotMessage({
+ id:
+ activeTurnRef.current?.assistantMessageId ??
+ getLiveAssistantMessageId(activeStreamId),
+ content: streamingContentRef.current,
+ contentBlocks: streamingBlocksRef.current,
+ })
+ const seededMessages = [userMsg, assistantMessage]
queryClient.setQueryData(taskKeys.detail(payloadChatId), {
id: payloadChatId,
title: null,
- messages: [userMsg],
+ messages: seededMessages,
activeStreamId,
resources: resourcesRef.current,
})
}
+ setPendingMessages([])
if (!workflowIdRef.current) {
window.history.replaceState(
null,
@@ -2273,6 +2487,7 @@ export function useChat(
workspaceId,
router,
queryClient,
+ upsertTaskChatHistory,
addResource,
removeResource,
applyPreviewSessionUpdate,
@@ -2691,28 +2906,32 @@ export function useChat(
[]
)
- const invalidateChatQueries = useCallback(() => {
- const activeChatId = chatIdRef.current
- if (activeChatId) {
- queryClient.invalidateQueries({
- queryKey: taskKeys.detail(activeChatId),
- })
- }
- queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) })
- }, [workspaceId, queryClient])
+ const invalidateChatQueries = useCallback(
+ (options?: { includeDetail?: boolean }) => {
+ const activeChatId = chatIdRef.current
+ if (options?.includeDetail !== false && activeChatId) {
+ queryClient.invalidateQueries({
+ queryKey: taskKeys.detail(activeChatId),
+ })
+ }
+ queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) })
+ },
+ [workspaceId, queryClient]
+ )
const messagesRef = useRef(messages)
messagesRef.current = messages
const finalize = useCallback(
(options?: { error?: boolean }) => {
+ const hasQueuedFollowUp = !options?.error && messageQueueRef.current.length > 0
reconcileTerminalPreviewSessions()
locallyTerminalStreamIdRef.current =
streamIdRef.current ?? activeTurnRef.current?.userMessageId ?? undefined
clearActiveTurn()
setTransportIdle()
abortControllerRef.current = null
- invalidateChatQueries()
+ invalidateChatQueries({ includeDetail: !hasQueuedFollowUp })
if (!options?.error) {
const cid = chatIdRef.current
@@ -2725,7 +2944,7 @@ export function useChat(
return
}
- if (messageQueueRef.current.length > 0) {
+ if (hasQueuedFollowUp) {
void enqueueQueueDispatchRef.current({ type: 'send_head' })
}
},
@@ -2738,7 +2957,9 @@ export function useChat(
message: string,
fileAttachments?: FileAttachmentForApi[],
contexts?: ChatContext[],
- pendingStopOverride?: Promise | null
+ pendingStopOverride?: Promise | null,
+ onOptimisticSendApplied?: () => void,
+ queuedSendHandoff?: QueuedSendHandoffSeed
) => {
if (!message.trim() || !workspaceId) return false
const pendingStop = pendingStopOverride ?? pendingStopPromiseRef.current
@@ -2750,8 +2971,8 @@ export function useChat(
setTransportStreaming()
locallyTerminalStreamIdRef.current = undefined
- const userMessageId = generateId()
- const assistantId = generateId()
+ const userMessageId = queuedSendHandoff?.userMessageId ?? generateId()
+ const assistantId = getLiveAssistantMessageId(userMessageId)
streamIdRef.current = userMessageId
lastCursorRef.current = '0'
@@ -2769,6 +2990,19 @@ export function useChat(
: undefined
const requestChatId = selectedChatIdRef.current ?? chatIdRef.current
+ if (queuedSendHandoff) {
+ writeQueuedSendHandoffState({
+ id: queuedSendHandoff.id,
+ chatId: queuedSendHandoff.chatId,
+ workspaceId,
+ supersededStreamId: queuedSendHandoff.supersededStreamId,
+ userMessageId,
+ message,
+ ...(fileAttachments ? { fileAttachments } : {}),
+ ...(contexts ? { contexts } : {}),
+ requestedAt: Date.now(),
+ })
+ }
const messageContexts = contexts?.map((c) => ({
kind: c.kind,
label: c.label,
@@ -2818,21 +3052,33 @@ export function useChat(
optimisticAssistantMessage,
}
+ if (requestChatId) {
+ await queryClient.cancelQueries({ queryKey: taskKeys.detail(requestChatId) })
+ }
+
const applyOptimisticSend = () => {
+ const assistantSnapshot = buildAssistantSnapshotMessage({
+ id: assistantId,
+ content: '',
+ contentBlocks: [],
+ })
if (requestChatId) {
- queryClient.setQueryData(taskKeys.detail(requestChatId), (old) => {
- if (!old) return undefined
- const nextMessages = old.messages.filter((m) => m.id !== userMessageId)
- return {
- ...old,
- resources: old.resources.filter((r) => r.id !== 'streaming-file'),
- messages: [...nextMessages, cachedUserMsg],
- activeStreamId: userMessageId,
- }
- })
+ upsertTaskChatHistory(requestChatId, (current) => ({
+ ...current,
+ resources: current.resources.filter((resource) => resource.id !== 'streaming-file'),
+ messages: [
+ ...current.messages.filter(
+ (persistedMessage) =>
+ persistedMessage.id !== userMessageId && persistedMessage.id !== assistantId
+ ),
+ cachedUserMsg,
+ assistantSnapshot,
+ ],
+ activeStreamId: userMessageId,
+ }))
}
- setMessages((prev) => {
+ setPendingMessages((prev) => {
const nextMessages = prev.filter((m) => m.id !== userMessageId && m.id !== assistantId)
return [...nextMessages, optimisticUserMessage, optimisticAssistantMessage]
})
@@ -2840,20 +3086,27 @@ export function useChat(
const rollbackOptimisticSend = () => {
if (requestChatId) {
- queryClient.setQueryData(taskKeys.detail(requestChatId), (old) => {
- if (!old) return undefined
- return {
- ...old,
- messages: old.messages.filter((m) => m.id !== userMessageId),
- activeStreamId: old.activeStreamId === userMessageId ? null : old.activeStreamId,
- }
- })
+ upsertTaskChatHistory(requestChatId, (current) => ({
+ ...current,
+ messages: current.messages.filter(
+ (persistedMessage) =>
+ persistedMessage.id !== userMessageId && persistedMessage.id !== assistantId
+ ),
+ activeStreamId:
+ current.activeStreamId === userMessageId ? null : current.activeStreamId,
+ }))
}
- setMessages((prev) => prev.filter((m) => m.id !== userMessageId && m.id !== assistantId))
+ setPendingMessages((prev) =>
+ prev.filter(
+ (pendingMessage) =>
+ pendingMessage.id !== userMessageId && pendingMessage.id !== assistantId
+ )
+ )
}
applyOptimisticSend()
+ onOptimisticSendApplied?.()
consumedByTranscript = true
const abortController = new AbortController()
@@ -2863,8 +3116,9 @@ export function useChat(
if (pendingStop) {
try {
await pendingStop
- // Query invalidation from the stop barrier can briefly stomp the optimistic tail.
- // Re-apply it before the real POST so the mothership UI stays immediate.
+ if (requestChatId) {
+ await queryClient.cancelQueries({ queryKey: taskKeys.detail(requestChatId) })
+ }
applyOptimisticSend()
} catch (err) {
rollbackOptimisticSend()
@@ -2928,6 +3182,10 @@ export function useChat(
throw new Error(errorData.error || `Request failed: ${response.status}`)
}
+ if (queuedSendHandoff) {
+ clearQueuedSendHandoffState(queuedSendHandoff.id)
+ }
+
if (!response.body) throw new Error('No response body')
const streamResult = await processSSEStream(response.body.getReader(), assistantId, gen)
@@ -2986,6 +3244,7 @@ export function useChat(
[
workspaceId,
queryClient,
+ upsertTaskChatHistory,
processSSEStream,
finalize,
resumeOrFinalize,
@@ -3015,6 +3274,69 @@ export function useChat(
},
[workspaceId, startSendMessage]
)
+ useEffect(() => {
+ if (typeof window === 'undefined') return
+
+ const clearClaim = () => {
+ clearQueuedSendHandoffClaim()
+ }
+
+ window.addEventListener('pagehide', clearClaim)
+ window.addEventListener('beforeunload', clearClaim)
+ return () => {
+ window.removeEventListener('pagehide', clearClaim)
+ window.removeEventListener('beforeunload', clearClaim)
+ }
+ }, [])
+ useEffect(() => {
+ if (!workspaceId || !chatHistory || sendingRef.current || pendingStopPromiseRef.current) return
+
+ const handoff = readQueuedSendHandoffState()
+ if (!handoff) return
+ if (handoff.workspaceId !== workspaceId || handoff.chatId !== chatHistory.id) return
+ if (recoveringQueuedSendHandoffIdRef.current === handoff.id) return
+ if (readQueuedSendHandoffClaim() === handoff.id) return
+
+ if (
+ chatHistory.activeStreamId === handoff.userMessageId ||
+ chatHistory.messages.some((message) => message.id === handoff.userMessageId)
+ ) {
+ clearQueuedSendHandoffState(handoff.id)
+ clearQueuedSendHandoffClaim(handoff.id)
+ return
+ }
+
+ if (chatHistory.activeStreamId === handoff.supersededStreamId) {
+ return
+ }
+
+ if (chatHistory.activeStreamId && chatHistory.activeStreamId !== handoff.supersededStreamId) {
+ clearQueuedSendHandoffState(handoff.id)
+ clearQueuedSendHandoffClaim(handoff.id)
+ return
+ }
+
+ recoveringQueuedSendHandoffIdRef.current = handoff.id
+ writeQueuedSendHandoffClaim(handoff.id)
+ void startSendMessage(
+ handoff.message,
+ handoff.fileAttachments,
+ handoff.contexts,
+ null,
+ undefined,
+ {
+ id: handoff.id,
+ chatId: handoff.chatId,
+ supersededStreamId: handoff.supersededStreamId,
+ userMessageId: handoff.userMessageId,
+ }
+ ).finally(() => {
+ if (recoveringQueuedSendHandoffIdRef.current === handoff.id) {
+ recoveringQueuedSendHandoffIdRef.current = null
+ }
+ clearQueuedSendHandoffClaim(handoff.id)
+ })
+ }, [workspaceId, chatHistory, startSendMessage])
const cancelActiveWorkflowExecutions = useCallback(() => {
const execState = useExecutionStore.getState()
const consoleStore = useTerminalConsoleStore.getState()
@@ -3066,6 +3388,7 @@ export function useChat(
}
const wasSending = sendingRef.current
+ const activeChatId = chatIdRef.current
const sid =
streamIdRef.current ||
activeTurnRef.current?.userMessageId ||
@@ -3088,24 +3411,36 @@ export function useChat(
abortControllerRef.current = null
setTransportIdle()
- setMessages((prev) =>
- prev.map((msg) => {
- if (!msg.contentBlocks?.some((b) => b.toolCall?.status === 'executing')) return msg
- const updated = msg.contentBlocks!.map((block) => {
- if (block.toolCall?.status !== 'executing') return block
- return {
- ...block,
- toolCall: {
- ...block.toolCall,
- status: 'cancelled' as const,
- displayTitle: 'Stopped by user',
- },
+ if (activeChatId) {
+ await queryClient.cancelQueries({ queryKey: taskKeys.detail(activeChatId) })
+ upsertTaskChatHistory(activeChatId, (current) => ({
+ ...current,
+ messages: current.messages.map(markMessageStopped),
+ }))
+ } else {
+ setPendingMessages((prev) =>
+ prev.map((msg) => {
+ if (!msg.contentBlocks?.some((block) => block.toolCall?.status === 'executing')) {
+ return msg
}
+ const updatedBlocks = msg.contentBlocks.map((block) => {
+ if (block.toolCall?.status !== 'executing') {
+ return block
+ }
+ return {
+ ...block,
+ toolCall: {
+ ...block.toolCall,
+ status: 'cancelled' as const,
+ displayTitle: 'Stopped by user',
+ },
+ }
+ })
+ updatedBlocks.push({ type: 'stopped' as const })
+ return { ...msg, contentBlocks: updatedBlocks }
})
- updated.push({ type: 'stopped' as const })
- return { ...msg, contentBlocks: updated }
- })
- )
+ )
+ }
// Cancel active run-tool executions before waiting for the server-side stream
// shutdown barrier; otherwise the abort settle can sit behind tool execution teardown.
@@ -3175,6 +3510,7 @@ export function useChat(
persistPartialResponse,
queryClient,
resetEphemeralPreviewState,
+ upsertTaskChatHistory,
clearActiveTurn,
setTransportIdle,
])
@@ -3198,16 +3534,27 @@ export function useChat(
let originalIndex = 0
let removedFromQueue = false
+ const removeQueuedMessage = () => {
+ if (removedFromQueue || action.epoch !== queueDispatchEpochRef.current) {
+ return
+ }
+ removedFromQueue = true
+ setMessageQueue((prev) => prev.filter((queued) => queued.id !== msg.id))
+ }
try {
const currentIndex = messageQueueRef.current.findIndex((queued) => queued.id === msg.id)
if (currentIndex !== -1) {
originalIndex = currentIndex
- removedFromQueue = true
- setMessageQueue((prev) => prev.filter((queued) => queued.id !== msg.id))
}
- const consumed = await startSendMessage(msg.content, msg.fileAttachments, msg.contexts)
+ const consumed = await startSendMessage(
+ msg.content,
+ msg.fileAttachments,
+ msg.contexts,
+ undefined,
+ removeQueuedMessage
+ )
if (!consumed && removedFromQueue && action.epoch === queueDispatchEpochRef.current) {
setMessageQueue((prev) => {
if (prev.some((queued) => queued.id === msg.id)) return prev
@@ -3250,6 +3597,8 @@ export function useChat(
enqueueQueueDispatchRef.current = enqueueQueueDispatch
const removeFromQueue = useCallback((id: string) => {
+ clearQueuedSendHandoffState(id)
+ clearQueuedSendHandoffClaim(id)
setMessageQueue((prev) => prev.filter((m) => m.id !== id))
}, [])
@@ -3272,6 +3621,13 @@ export function useChat(
let originalIndex = initialIndex
let removedFromQueue = false
+ const removeQueuedMessage = () => {
+ if (removedFromQueue || epoch !== queueDispatchEpochRef.current) {
+ return
+ }
+ removedFromQueue = true
+ setMessageQueue((prev) => prev.filter((queued) => queued.id !== msg.id))
+ }
const restoreQueuedMessage = () => {
if (!removedFromQueue || epoch !== queueDispatchEpochRef.current) {
return
@@ -3291,15 +3647,29 @@ export function useChat(
}
originalIndex = currentIndex
- removedFromQueue = true
- setMessageQueue((prev) => prev.filter((queued) => queued.id !== msg.id))
+ const queuedSendHandoff =
+ sendingRef.current && workspaceId
+ ? {
+ id: msg.id,
+ chatId: selectedChatIdRef.current ?? chatIdRef.current ?? '',
+ supersededStreamId:
+ streamIdRef.current ||
+ activeTurnRef.current?.userMessageId ||
+ queryClient.getQueryData(
+ taskKeys.detail(selectedChatIdRef.current ?? chatIdRef.current)
+ )?.activeStreamId ||
+ null,
+ }
+ : undefined
const pendingStop = sendingRef.current ? stopGeneration() : pendingStopPromiseRef.current
const consumed = await startSendMessage(
msg.content,
msg.fileAttachments,
msg.contexts,
- pendingStop
+ pendingStop,
+ removeQueuedMessage,
+ queuedSendHandoff?.chatId ? queuedSendHandoff : undefined
)
if (!consumed) {
@@ -3324,6 +3694,8 @@ export function useChat(
const editQueuedMessage = useCallback((id: string): QueuedMessage | undefined => {
const msg = messageQueueRef.current.find((m) => m.id === id)
if (!msg) return undefined
+ clearQueuedSendHandoffState(id)
+ clearQueuedSendHandoffClaim(id)
setMessageQueue((prev) => prev.filter((m) => m.id !== id))
return msg
}, [])
diff --git a/apps/sim/blocks/registry.ts b/apps/sim/blocks/registry.ts
index 270bea945c..2b2541a4d3 100644
--- a/apps/sim/blocks/registry.ts
+++ b/apps/sim/blocks/registry.ts
@@ -245,9 +245,9 @@ export const registry: Record = {
ashby: AshbyBlock,
athena: AthenaBlock,
attio: AttioBlock,
+ box: BoxBlock,
brandfetch: BrandfetchBlock,
brightdata: BrightDataBlock,
- box: BoxBlock,
browser_use: BrowserUseBlock,
calcom: CalComBlock,
calendly: CalendlyBlock,
diff --git a/apps/sim/hooks/use-task-events.test.ts b/apps/sim/hooks/use-task-events.test.ts
index ac58e6cf27..d62b32696a 100644
--- a/apps/sim/hooks/use-task-events.test.ts
+++ b/apps/sim/hooks/use-task-events.test.ts
@@ -16,9 +16,10 @@ describe('handleTaskStatusEvent', () => {
vi.clearAllMocks()
})
- it('invalidates the task list and completed chat detail', () => {
+ it('invalidates only the task list for completed task events', () => {
handleTaskStatusEvent(
queryClient,
+ 'ws-1',
JSON.stringify({
chatId: 'chat-1',
type: 'completed',
@@ -26,18 +27,16 @@ describe('handleTaskStatusEvent', () => {
})
)
- expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(2)
- expect(queryClient.invalidateQueries).toHaveBeenNthCalledWith(1, {
- queryKey: taskKeys.lists(),
- })
- expect(queryClient.invalidateQueries).toHaveBeenNthCalledWith(2, {
- queryKey: taskKeys.detail('chat-1'),
+ expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(1)
+ expect(queryClient.invalidateQueries).toHaveBeenCalledWith({
+ queryKey: taskKeys.list('ws-1'),
})
})
it('keeps list invalidation only for non-completed task events', () => {
handleTaskStatusEvent(
queryClient,
+ 'ws-1',
JSON.stringify({
chatId: 'chat-1',
type: 'started',
@@ -47,16 +46,16 @@ describe('handleTaskStatusEvent', () => {
expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(1)
expect(queryClient.invalidateQueries).toHaveBeenCalledWith({
- queryKey: taskKeys.lists(),
+ queryKey: taskKeys.list('ws-1'),
})
})
it('preserves list invalidation when task event payload is invalid', () => {
- handleTaskStatusEvent(queryClient, '{')
+ handleTaskStatusEvent(queryClient, 'ws-1', '{')
expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(1)
expect(queryClient.invalidateQueries).toHaveBeenCalledWith({
- queryKey: taskKeys.lists(),
+ queryKey: taskKeys.list('ws-1'),
})
})
})
diff --git a/apps/sim/hooks/use-task-events.ts b/apps/sim/hooks/use-task-events.ts
index 04bff3df49..e10a7599b1 100644
--- a/apps/sim/hooks/use-task-events.ts
+++ b/apps/sim/hooks/use-task-events.ts
@@ -38,19 +38,16 @@ function parseTaskStatusEventPayload(data: unknown): TaskStatusEventPayload | nu
export function handleTaskStatusEvent(
queryClient: Pick,
+ workspaceId: string,
data: unknown
): void {
- queryClient.invalidateQueries({ queryKey: taskKeys.lists() })
+ queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) })
const payload = parseTaskStatusEventPayload(data)
if (!payload) {
logger.warn('Received invalid task_status payload')
return
}
-
- if (payload.type === 'completed' && payload.chatId) {
- queryClient.invalidateQueries({ queryKey: taskKeys.detail(payload.chatId) })
- }
}
/**
@@ -67,7 +64,11 @@ export function useTaskEvents(workspaceId: string | undefined) {
)
eventSource.addEventListener('task_status', (event) => {
- handleTaskStatusEvent(queryClient, event instanceof MessageEvent ? event.data : undefined)
+ handleTaskStatusEvent(
+ queryClient,
+ workspaceId,
+ event instanceof MessageEvent ? event.data : undefined
+ )
})
eventSource.onerror = () => {
diff --git a/apps/sim/lib/copilot/chat/effective-transcript.test.ts b/apps/sim/lib/copilot/chat/effective-transcript.test.ts
new file mode 100644
index 0000000000..285743d37a
--- /dev/null
+++ b/apps/sim/lib/copilot/chat/effective-transcript.test.ts
@@ -0,0 +1,263 @@
+/**
+ * @vitest-environment node
+ */
+
+import { describe, expect, it } from 'vitest'
+import {
+ buildEffectiveChatTranscript,
+ getLiveAssistantMessageId,
+} from '@/lib/copilot/chat/effective-transcript'
+import { normalizeMessage } from '@/lib/copilot/chat/persisted-message'
+import {
+ MothershipStreamV1CompletionStatus,
+ MothershipStreamV1EventType,
+ MothershipStreamV1SessionKind,
+ MothershipStreamV1TextChannel,
+} from '@/lib/copilot/generated/mothership-stream-v1'
+import type { StreamBatchEvent } from '@/lib/copilot/request/session/types'
+
+function toBatchEvent(eventId: number, event: StreamBatchEvent['event']): StreamBatchEvent {
+ return {
+ eventId,
+ streamId: event.stream.streamId,
+ event,
+ }
+}
+
+function buildUserMessage(id: string, content: string) {
+ return normalizeMessage({
+ id,
+ role: 'user',
+ content,
+ timestamp: '2026-04-15T12:00:00.000Z',
+ })
+}
+
+describe('buildEffectiveChatTranscript', () => {
+ it('returns the existing transcript when the stream owner is no longer the trailing user', () => {
+ const messages = [
+ buildUserMessage('stream-1', 'Hello'),
+ normalizeMessage({
+ id: 'assistant-1',
+ role: 'assistant',
+ content: 'Persisted response',
+ timestamp: '2026-04-15T12:00:01.000Z',
+ }),
+ ]
+
+ const result = buildEffectiveChatTranscript({
+ messages,
+ activeStreamId: 'stream-1',
+ streamSnapshot: {
+ events: [
+ toBatchEvent(1, {
+ v: 1,
+ seq: 1,
+ ts: '2026-04-15T12:00:01.000Z',
+ type: MothershipStreamV1EventType.text,
+ stream: { streamId: 'stream-1' },
+ payload: {
+ channel: MothershipStreamV1TextChannel.assistant,
+ text: 'Live response',
+ },
+ }),
+ ],
+ previewSessions: [],
+ status: 'active',
+ },
+ })
+
+ expect(result).toEqual(messages)
+ })
+
+ it('appends a placeholder assistant while an active stream has not produced text yet', () => {
+ const result = buildEffectiveChatTranscript({
+ messages: [buildUserMessage('stream-1', 'Hello')],
+ activeStreamId: 'stream-1',
+ streamSnapshot: {
+ events: [
+ toBatchEvent(1, {
+ v: 1,
+ seq: 1,
+ ts: '2026-04-15T12:00:01.000Z',
+ type: MothershipStreamV1EventType.session,
+ stream: { streamId: 'stream-1' },
+ payload: {
+ kind: MothershipStreamV1SessionKind.start,
+ },
+ }),
+ ],
+ previewSessions: [],
+ status: 'active',
+ },
+ })
+
+ expect(result).toHaveLength(2)
+ expect(result[1]).toEqual(
+ expect.objectContaining({
+ id: getLiveAssistantMessageId('stream-1'),
+ role: 'assistant',
+ content: '',
+ })
+ )
+ })
+
+ it('materializes a live assistant response from redis-backed stream events', () => {
+ const result = buildEffectiveChatTranscript({
+ messages: [buildUserMessage('stream-1', 'Hello')],
+ activeStreamId: 'stream-1',
+ streamSnapshot: {
+ events: [
+ toBatchEvent(1, {
+ v: 1,
+ seq: 1,
+ ts: '2026-04-15T12:00:01.000Z',
+ type: MothershipStreamV1EventType.session,
+ stream: { streamId: 'stream-1' },
+ trace: { requestId: 'req-1' },
+ payload: {
+ kind: MothershipStreamV1SessionKind.trace,
+ requestId: 'req-1',
+ },
+ }),
+ toBatchEvent(2, {
+ v: 1,
+ seq: 2,
+ ts: '2026-04-15T12:00:02.000Z',
+ type: MothershipStreamV1EventType.text,
+ stream: { streamId: 'stream-1' },
+ trace: { requestId: 'req-1' },
+ payload: {
+ channel: MothershipStreamV1TextChannel.assistant,
+ text: 'Live response',
+ },
+ }),
+ ],
+ previewSessions: [],
+ status: 'active',
+ },
+ })
+
+ expect(result).toHaveLength(2)
+ expect(result[1]).toEqual(
+ expect.objectContaining({
+ id: getLiveAssistantMessageId('stream-1'),
+ role: 'assistant',
+ content: 'Live response',
+ requestId: 'req-1',
+ })
+ )
+ })
+
+ it('does not duplicate thinking-only text into a second assistant block', () => {
+ const result = buildEffectiveChatTranscript({
+ messages: [buildUserMessage('stream-1', 'Hello')],
+ activeStreamId: 'stream-1',
+ streamSnapshot: {
+ events: [
+ toBatchEvent(1, {
+ v: 1,
+ seq: 1,
+ ts: '2026-04-15T12:00:01.000Z',
+ type: MothershipStreamV1EventType.text,
+ stream: { streamId: 'stream-1' },
+ payload: {
+ channel: MothershipStreamV1TextChannel.thinking,
+ text: 'Internal reasoning',
+ },
+ }),
+ ],
+ previewSessions: [],
+ status: 'active',
+ },
+ })
+
+ expect(result).toHaveLength(2)
+ expect(result[1]).toEqual(
+ expect.objectContaining({
+ content: 'Internal reasoning',
+ contentBlocks: [
+ expect.objectContaining({
+ type: MothershipStreamV1EventType.text,
+ content: 'Internal reasoning',
+ }),
+ ],
+ })
+ )
+ })
+
+ it('treats user-cancelled tool results as cancelled', () => {
+ const result = buildEffectiveChatTranscript({
+ messages: [buildUserMessage('stream-1', 'Hello')],
+ activeStreamId: 'stream-1',
+ streamSnapshot: {
+ events: [
+ toBatchEvent(1, {
+ v: 1,
+ seq: 1,
+ ts: '2026-04-15T12:00:01.000Z',
+ type: MothershipStreamV1EventType.tool,
+ stream: { streamId: 'stream-1' },
+ payload: {
+ phase: 'result',
+ toolCallId: 'tool-1',
+ toolName: 'workspace_file',
+ executor: 'go',
+ mode: 'sync',
+ success: false,
+ output: {
+ reason: 'user_cancelled',
+ },
+ },
+ }),
+ ],
+ previewSessions: [],
+ status: 'active',
+ },
+ })
+
+ expect(result[1]?.contentBlocks).toEqual([
+ expect.objectContaining({
+ type: MothershipStreamV1EventType.tool,
+ toolCall: expect.objectContaining({
+ id: 'tool-1',
+ name: 'workspace_file',
+ state: MothershipStreamV1CompletionStatus.cancelled,
+ }),
+ }),
+ ])
+ })
+
+ it('materializes a cancelled assistant tail when the stream ends before persistence', () => {
+ const result = buildEffectiveChatTranscript({
+ messages: [buildUserMessage('stream-1', 'Hello')],
+ activeStreamId: 'stream-1',
+ streamSnapshot: {
+ events: [
+ toBatchEvent(1, {
+ v: 1,
+ seq: 1,
+ ts: '2026-04-15T12:00:01.000Z',
+ type: MothershipStreamV1EventType.complete,
+ stream: { streamId: 'stream-1' },
+ payload: {
+ status: MothershipStreamV1CompletionStatus.cancelled,
+ },
+ }),
+ ],
+ previewSessions: [],
+ status: MothershipStreamV1CompletionStatus.cancelled,
+ },
+ })
+
+ expect(result).toHaveLength(2)
+ expect(result[1]?.contentBlocks).toEqual(
+ expect.arrayContaining([
+ expect.objectContaining({
+ type: MothershipStreamV1EventType.complete,
+ status: MothershipStreamV1CompletionStatus.cancelled,
+ }),
+ ])
+ )
+ })
+})
diff --git a/apps/sim/lib/copilot/chat/effective-transcript.ts b/apps/sim/lib/copilot/chat/effective-transcript.ts
new file mode 100644
index 0000000000..339a326d12
--- /dev/null
+++ b/apps/sim/lib/copilot/chat/effective-transcript.ts
@@ -0,0 +1,412 @@
+import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message'
+import { resolveStreamToolOutcome } from '@/lib/copilot/chat/stream-tool-outcome'
+import {
+ MothershipStreamV1CompletionStatus,
+ type MothershipStreamV1ErrorPayload,
+ MothershipStreamV1EventType,
+ MothershipStreamV1RunKind,
+ MothershipStreamV1SessionKind,
+ MothershipStreamV1SpanLifecycleEvent,
+ MothershipStreamV1SpanPayloadKind,
+ MothershipStreamV1ToolOutcome,
+ MothershipStreamV1ToolPhase,
+} from '@/lib/copilot/generated/mothership-stream-v1'
+import type { FilePreviewSession } from '@/lib/copilot/request/session/file-preview-session-contract'
+import type { StreamBatchEvent } from '@/lib/copilot/request/session/types'
+
+interface StreamSnapshotLike {
+ events: StreamBatchEvent[]
+ previewSessions: FilePreviewSession[]
+ status: string
+}
+
+interface BuildEffectiveChatTranscriptParams {
+ messages: PersistedMessage[]
+ activeStreamId: string | null
+ streamSnapshot?: StreamSnapshotLike | null
+}
+
+type RawPersistedBlock = Record
+
+export function getLiveAssistantMessageId(streamId: string): string {
+ return `live-assistant:${streamId}`
+}
+
+function isRecord(value: unknown): value is Record {
+ return Boolean(value) && typeof value === 'object' && !Array.isArray(value)
+}
+
+function asPayloadRecord(value: unknown): Record | undefined {
+ return isRecord(value) ? value : undefined
+}
+
+function isTerminalStreamStatus(status: string | null | undefined): boolean {
+ return (
+ status === MothershipStreamV1CompletionStatus.complete ||
+ status === MothershipStreamV1CompletionStatus.error ||
+ status === MothershipStreamV1CompletionStatus.cancelled
+ )
+}
+
+function buildInlineErrorTag(payload: MothershipStreamV1ErrorPayload): string {
+ const message =
+ (typeof payload.displayMessage === 'string' ? payload.displayMessage : undefined) ||
+ (typeof payload.message === 'string' ? payload.message : undefined) ||
+ (typeof payload.error === 'string' ? payload.error : undefined) ||
+ 'An unexpected error occurred'
+ const provider = typeof payload.provider === 'string' ? payload.provider : undefined
+ const code = typeof payload.code === 'string' ? payload.code : undefined
+ return `${JSON.stringify({
+ message,
+ ...(code ? { code } : {}),
+ ...(provider ? { provider } : {}),
+ })}`
+}
+
+function resolveToolDisplayTitle(ui: unknown): string | undefined {
+ if (!isRecord(ui)) return undefined
+ return typeof ui.title === 'string'
+ ? ui.title
+ : typeof ui.phaseLabel === 'string'
+ ? ui.phaseLabel
+ : undefined
+}
+
+function appendTextBlock(
+ blocks: RawPersistedBlock[],
+ content: string,
+ options: {
+ lane?: 'subagent'
+ }
+): void {
+ if (!content) return
+ const last = blocks[blocks.length - 1]
+ if (last?.type === MothershipStreamV1EventType.text && last.lane === options.lane) {
+ last.content = `${typeof last.content === 'string' ? last.content : ''}${content}`
+ return
+ }
+
+ blocks.push({
+ type: MothershipStreamV1EventType.text,
+ ...(options.lane ? { lane: options.lane } : {}),
+ content,
+ })
+}
+
+function buildLiveAssistantMessage(params: {
+ streamId: string
+ events: StreamBatchEvent[]
+ status: string | null | undefined
+}): PersistedMessage | null {
+ const { streamId, events, status } = params
+ const blocks: RawPersistedBlock[] = []
+ const toolIndexById = new Map()
+ const subagentByParentToolCallId = new Map()
+ let activeSubagent: string | undefined
+ let activeSubagentParentToolCallId: string | undefined
+ let activeCompactionId: string | undefined
+ let runningText = ''
+ let lastContentSource: 'main' | 'subagent' | null = null
+ let requestId: string | undefined
+ let lastTimestamp: string | undefined
+
+ const resolveScopedSubagent = (
+ agentId: string | undefined,
+ parentToolCallId: string | undefined
+ ): string | undefined => {
+ if (agentId) return agentId
+ if (parentToolCallId) {
+ const scoped = subagentByParentToolCallId.get(parentToolCallId)
+ if (scoped) return scoped
+ }
+ return activeSubagent
+ }
+
+ const ensureToolBlock = (input: {
+ toolCallId: string
+ toolName: string
+ calledBy?: string
+ displayTitle?: string
+ params?: Record
+ result?: { success: boolean; output?: unknown; error?: string }
+ state?: string
+ }): RawPersistedBlock => {
+ const existingIndex = toolIndexById.get(input.toolCallId)
+ if (existingIndex !== undefined) {
+ const existing = blocks[existingIndex]
+ const existingToolCall = asPayloadRecord(existing.toolCall)
+ existing.toolCall = {
+ ...(existingToolCall ?? {}),
+ id: input.toolCallId,
+ name: input.toolName,
+ state:
+ input.state ??
+ (typeof existingToolCall?.state === 'string' ? existingToolCall.state : 'executing'),
+ ...(input.calledBy ? { calledBy: input.calledBy } : {}),
+ ...(input.params ? { params: input.params } : {}),
+ ...(input.result ? { result: input.result } : {}),
+ ...(input.displayTitle
+ ? {
+ display: {
+ title: input.displayTitle,
+ },
+ }
+ : existingToolCall?.display
+ ? { display: existingToolCall.display }
+ : {}),
+ }
+ return existing
+ }
+
+ const nextBlock: RawPersistedBlock = {
+ type: MothershipStreamV1EventType.tool,
+ phase: MothershipStreamV1ToolPhase.call,
+ toolCall: {
+ id: input.toolCallId,
+ name: input.toolName,
+ state: input.state ?? 'executing',
+ ...(input.calledBy ? { calledBy: input.calledBy } : {}),
+ ...(input.params ? { params: input.params } : {}),
+ ...(input.result ? { result: input.result } : {}),
+ ...(input.displayTitle
+ ? {
+ display: {
+ title: input.displayTitle,
+ },
+ }
+ : {}),
+ },
+ }
+ toolIndexById.set(input.toolCallId, blocks.length)
+ blocks.push(nextBlock)
+ return nextBlock
+ }
+
+ for (const entry of events) {
+ const parsed = entry.event
+ lastTimestamp = parsed.ts
+ if (typeof parsed.trace?.requestId === 'string') {
+ requestId = parsed.trace.requestId
+ }
+ const scopedParentToolCallId =
+ typeof parsed.scope?.parentToolCallId === 'string' ? parsed.scope.parentToolCallId : undefined
+ const scopedAgentId =
+ typeof parsed.scope?.agentId === 'string' ? parsed.scope.agentId : undefined
+ const scopedSubagent = resolveScopedSubagent(scopedAgentId, scopedParentToolCallId)
+
+ switch (parsed.type) {
+ case MothershipStreamV1EventType.session: {
+ if (parsed.payload.kind === MothershipStreamV1SessionKind.chat) {
+ continue
+ }
+ if (parsed.payload.kind === MothershipStreamV1SessionKind.start) {
+ continue
+ }
+ if (parsed.payload.kind === MothershipStreamV1SessionKind.trace) {
+ requestId = parsed.payload.requestId
+ }
+ continue
+ }
+ case MothershipStreamV1EventType.text: {
+ const chunk = parsed.payload.text
+ if (!chunk) {
+ continue
+ }
+ const contentSource: 'main' | 'subagent' = scopedSubagent ? 'subagent' : 'main'
+ const needsBoundaryNewline =
+ lastContentSource !== null &&
+ lastContentSource !== contentSource &&
+ runningText.length > 0 &&
+ !runningText.endsWith('\n')
+ const normalizedChunk = needsBoundaryNewline ? `\n${chunk}` : chunk
+ appendTextBlock(blocks, normalizedChunk, {
+ ...(scopedSubagent ? { lane: 'subagent' as const } : {}),
+ })
+ runningText += normalizedChunk
+ lastContentSource = contentSource
+ continue
+ }
+ case MothershipStreamV1EventType.tool: {
+ const payload = parsed.payload
+ const toolCallId = payload.toolCallId
+ const displayTitle = resolveToolDisplayTitle('ui' in payload ? payload.ui : undefined)
+
+ if ('previewPhase' in payload) {
+ continue
+ }
+
+ if (payload.phase === MothershipStreamV1ToolPhase.args_delta) {
+ continue
+ }
+
+ if (payload.phase === MothershipStreamV1ToolPhase.result) {
+ ensureToolBlock({
+ toolCallId,
+ toolName: payload.toolName,
+ calledBy: scopedSubagent,
+ state: resolveStreamToolOutcome(payload),
+ result: {
+ success: payload.success,
+ ...(payload.output !== undefined ? { output: payload.output } : {}),
+ ...(typeof payload.error === 'string' ? { error: payload.error } : {}),
+ },
+ })
+ continue
+ }
+
+ ensureToolBlock({
+ toolCallId,
+ toolName: payload.toolName,
+ calledBy: scopedSubagent,
+ displayTitle,
+ params: isRecord(payload.arguments) ? payload.arguments : undefined,
+ state: typeof payload.status === 'string' ? payload.status : 'executing',
+ })
+ continue
+ }
+ case MothershipStreamV1EventType.span: {
+ if (parsed.payload.kind !== MothershipStreamV1SpanPayloadKind.subagent) {
+ continue
+ }
+
+ const spanData = asPayloadRecord(parsed.payload.data)
+ const parentToolCallId =
+ scopedParentToolCallId ??
+ (typeof spanData?.tool_call_id === 'string' ? spanData.tool_call_id : undefined)
+ const name = typeof parsed.payload.agent === 'string' ? parsed.payload.agent : scopedAgentId
+ if (parsed.payload.event === MothershipStreamV1SpanLifecycleEvent.start && name) {
+ if (parentToolCallId) {
+ subagentByParentToolCallId.set(parentToolCallId, name)
+ }
+ activeSubagent = name
+ activeSubagentParentToolCallId = parentToolCallId
+ blocks.push({
+ type: MothershipStreamV1EventType.span,
+ kind: MothershipStreamV1SpanPayloadKind.subagent,
+ lifecycle: MothershipStreamV1SpanLifecycleEvent.start,
+ content: name,
+ })
+ continue
+ }
+
+ if (parsed.payload.event === MothershipStreamV1SpanLifecycleEvent.end) {
+ if (spanData?.pending === true) {
+ continue
+ }
+ if (parentToolCallId) {
+ subagentByParentToolCallId.delete(parentToolCallId)
+ }
+ if (
+ !parentToolCallId ||
+ parentToolCallId === activeSubagentParentToolCallId ||
+ name === activeSubagent
+ ) {
+ activeSubagent = undefined
+ activeSubagentParentToolCallId = undefined
+ }
+ blocks.push({
+ type: MothershipStreamV1EventType.span,
+ kind: MothershipStreamV1SpanPayloadKind.subagent,
+ lifecycle: MothershipStreamV1SpanLifecycleEvent.end,
+ })
+ }
+ continue
+ }
+ case MothershipStreamV1EventType.run: {
+ if (parsed.payload.kind === MothershipStreamV1RunKind.compaction_start) {
+ activeCompactionId = `compaction_${entry.eventId}`
+ ensureToolBlock({
+ toolCallId: activeCompactionId,
+ toolName: 'context_compaction',
+ displayTitle: 'Compacting context...',
+ state: 'executing',
+ })
+ continue
+ }
+
+ if (parsed.payload.kind === MothershipStreamV1RunKind.compaction_done) {
+ const compactionId = activeCompactionId ?? `compaction_${entry.eventId}`
+ activeCompactionId = undefined
+ ensureToolBlock({
+ toolCallId: compactionId,
+ toolName: 'context_compaction',
+ displayTitle: 'Compacted context',
+ state: MothershipStreamV1ToolOutcome.success,
+ })
+ }
+ continue
+ }
+ case MothershipStreamV1EventType.error: {
+ const tag = buildInlineErrorTag(parsed.payload)
+ if (runningText.includes(tag)) {
+ continue
+ }
+ const prefix = runningText.length > 0 && !runningText.endsWith('\n') ? '\n' : ''
+ const content = `${prefix}${tag}`
+ appendTextBlock(blocks, content, {
+ ...(scopedSubagent ? { lane: 'subagent' as const } : {}),
+ })
+ runningText += content
+ continue
+ }
+ case MothershipStreamV1EventType.complete: {
+ if (parsed.payload.status === MothershipStreamV1CompletionStatus.cancelled) {
+ blocks.push({
+ type: MothershipStreamV1EventType.complete,
+ status: parsed.payload.status,
+ })
+ }
+ continue
+ }
+ case MothershipStreamV1EventType.resource: {
+ continue
+ }
+ default: {
+ continue
+ }
+ }
+ }
+
+ if (blocks.length === 0 && !runningText && isTerminalStreamStatus(status)) {
+ return null
+ }
+
+ return normalizeMessage({
+ id: getLiveAssistantMessageId(streamId),
+ role: 'assistant',
+ content: runningText,
+ timestamp: lastTimestamp ?? new Date().toISOString(),
+ ...(requestId ? { requestId } : {}),
+ ...(blocks.length > 0 ? { contentBlocks: blocks } : {}),
+ })
+}
+
+export function buildEffectiveChatTranscript({
+ messages,
+ activeStreamId,
+ streamSnapshot,
+}: BuildEffectiveChatTranscriptParams): PersistedMessage[] {
+ if (!activeStreamId || !streamSnapshot) {
+ return messages
+ }
+
+ const trailingMessage = messages[messages.length - 1]
+ if (
+ !trailingMessage ||
+ trailingMessage.role !== 'user' ||
+ trailingMessage.id !== activeStreamId
+ ) {
+ return messages
+ }
+
+ const liveAssistant = buildLiveAssistantMessage({
+ streamId: activeStreamId,
+ events: streamSnapshot.events,
+ status: streamSnapshot.status,
+ })
+ if (!liveAssistant) {
+ return messages
+ }
+
+ return [...messages, liveAssistant]
+}
diff --git a/apps/sim/lib/copilot/chat/post.ts b/apps/sim/lib/copilot/chat/post.ts
index b15e84db69..8581621d1f 100644
--- a/apps/sim/lib/copilot/chat/post.ts
+++ b/apps/sim/lib/copilot/chat/post.ts
@@ -52,7 +52,7 @@ const FileAttachmentSchema = z.object({
})
const ResourceAttachmentSchema = z.object({
- type: z.enum(['workflow', 'table', 'file', 'knowledgebase', 'folder']),
+ type: z.enum(['workflow', 'table', 'file', 'knowledgebase', 'folder', 'task', 'log', 'generic']),
id: z.string().min(1),
title: z.string().optional(),
active: z.boolean().optional(),
@@ -64,6 +64,9 @@ const GENERIC_RESOURCE_TITLE: Record['t
file: 'File',
knowledgebase: 'Knowledge Base',
folder: 'Folder',
+ task: 'Task',
+ log: 'Log',
+ generic: 'Resource',
}
const ChatContextSchema = z.object({
diff --git a/apps/sim/lib/copilot/chat/stream-tool-outcome.ts b/apps/sim/lib/copilot/chat/stream-tool-outcome.ts
new file mode 100644
index 0000000000..863c47f98e
--- /dev/null
+++ b/apps/sim/lib/copilot/chat/stream-tool-outcome.ts
@@ -0,0 +1,46 @@
+import { MothershipStreamV1ToolOutcome } from '@/lib/copilot/generated/mothership-stream-v1'
+
+type TerminalToolOutcome =
+ | typeof MothershipStreamV1ToolOutcome.success
+ | typeof MothershipStreamV1ToolOutcome.error
+ | typeof MothershipStreamV1ToolOutcome.cancelled
+ | typeof MothershipStreamV1ToolOutcome.skipped
+ | typeof MothershipStreamV1ToolOutcome.rejected
+
+interface ResolveStreamToolOutcomeParams {
+ output?: unknown
+ status?: string
+ success?: boolean
+}
+
+function isRecord(value: unknown): value is Record {
+ return Boolean(value) && typeof value === 'object' && !Array.isArray(value)
+}
+
+export function resolveStreamToolOutcome({
+ output,
+ status,
+ success,
+}: ResolveStreamToolOutcomeParams): TerminalToolOutcome {
+ const outputRecord = isRecord(output) ? output : undefined
+ const isCancelled =
+ outputRecord?.reason === 'user_cancelled' ||
+ outputRecord?.cancelledByUser === true ||
+ status === MothershipStreamV1ToolOutcome.cancelled
+
+ if (isCancelled) {
+ return MothershipStreamV1ToolOutcome.cancelled
+ }
+
+ switch (status) {
+ case MothershipStreamV1ToolOutcome.success:
+ case MothershipStreamV1ToolOutcome.error:
+ case MothershipStreamV1ToolOutcome.skipped:
+ case MothershipStreamV1ToolOutcome.rejected:
+ return status
+ default:
+ return success === true
+ ? MothershipStreamV1ToolOutcome.success
+ : MothershipStreamV1ToolOutcome.error
+ }
+}
diff --git a/apps/sim/tools/brightdata/cancel_snapshot.ts b/apps/sim/tools/brightdata/cancel_snapshot.ts
index a5a2328979..8f4823ccc3 100644
--- a/apps/sim/tools/brightdata/cancel_snapshot.ts
+++ b/apps/sim/tools/brightdata/cancel_snapshot.ts
@@ -38,17 +38,17 @@ export const brightDataCancelSnapshotTool: ToolConfig<
}),
},
- transformResponse: async (response: Response) => {
+ transformResponse: async (response: Response, params) => {
if (!response.ok) {
const errorText = await response.text()
throw new Error(errorText || `Cancel snapshot failed with status ${response.status}`)
}
- const data = (await response.json().catch(() => null)) as Record | null
+ await response.json().catch(() => null)
return {
success: true,
output: {
- snapshotId: (data?.snapshot_id as string) ?? null,
+ snapshotId: params?.snapshotId ?? null,
cancelled: true,
},
}
diff --git a/apps/sim/tools/brightdata/discover.ts b/apps/sim/tools/brightdata/discover.ts
index 8f0f8ced6b..153bf465c6 100644
--- a/apps/sim/tools/brightdata/discover.ts
+++ b/apps/sim/tools/brightdata/discover.ts
@@ -1,6 +1,13 @@
+import { createLogger } from '@sim/logger'
+import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import type { BrightDataDiscoverParams, BrightDataDiscoverResponse } from '@/tools/brightdata/types'
import type { ToolConfig } from '@/tools/types'
+const logger = createLogger('tools:brightdata:discover')
+
+const POLL_INTERVAL_MS = 3000
+const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
+
export const brightDataDiscoverTool: ToolConfig<
BrightDataDiscoverParams,
BrightDataDiscoverResponse
@@ -84,7 +91,7 @@ export const brightDataDiscoverTool: ToolConfig<
},
},
- transformResponse: async (response: Response) => {
+ transformResponse: async (response: Response, params) => {
if (!response.ok) {
const errorText = await response.text()
throw new Error(errorText || `Discover request failed with status ${response.status}`)
@@ -92,37 +99,112 @@ export const brightDataDiscoverTool: ToolConfig<
const data = await response.json()
- let results: Array<{
- url: string | null
- title: string | null
- description: string | null
- relevanceScore: number | null
- content: string | null
- }> = []
-
- const items = Array.isArray(data) ? data : (data?.results ?? data?.data ?? [])
-
- if (Array.isArray(items)) {
- results = items.map((item: Record) => ({
- url: (item.link as string) ?? (item.url as string) ?? null,
- title: (item.title as string) ?? null,
- description: (item.description as string) ?? (item.snippet as string) ?? null,
- relevanceScore: (item.relevance_score as number) ?? null,
- content:
- (item.content as string) ?? (item.text as string) ?? (item.markdown as string) ?? null,
- }))
- }
-
return {
success: true,
output: {
- results,
- query: null,
- totalResults: results.length,
+ results: [],
+ query: params?.query ?? null,
+ totalResults: 0,
+ taskId: data.task_id ?? null,
},
}
},
+ postProcess: async (result, params) => {
+ if (!result.success) return result
+
+ const taskId = result.output.taskId
+ if (!taskId) {
+ return {
+ ...result,
+ success: false,
+ error: 'Discover API did not return a task_id. Cannot poll for results.',
+ }
+ }
+
+ logger.info(`Bright Data Discover task ${taskId} created, polling for results...`)
+
+ let elapsedTime = 0
+
+ while (elapsedTime < MAX_POLL_TIME_MS) {
+ try {
+ const pollResponse = await fetch(
+ `https://api.brightdata.com/discover?task_id=${encodeURIComponent(taskId)}`,
+ {
+ method: 'GET',
+ headers: {
+ Authorization: `Bearer ${params.apiKey}`,
+ },
+ }
+ )
+
+ if (!pollResponse.ok) {
+ return {
+ ...result,
+ success: false,
+ error: `Failed to poll discover results: ${pollResponse.statusText}`,
+ }
+ }
+
+ const data = await pollResponse.json()
+ logger.info(`Bright Data Discover task ${taskId} status: ${data.status}`)
+
+ if (data.status === 'done') {
+ const items = Array.isArray(data.results) ? data.results : []
+
+ const results = items.map((item: Record) => ({
+ url: (item.link as string) ?? (item.url as string) ?? null,
+ title: (item.title as string) ?? null,
+ description: (item.description as string) ?? (item.snippet as string) ?? null,
+ relevanceScore: (item.relevance_score as number) ?? null,
+ content: (item.content as string) ?? null,
+ }))
+
+ return {
+ success: true,
+ output: {
+ results,
+ query: params.query ?? null,
+ totalResults: results.length,
+ },
+ }
+ }
+
+ if (data.status === 'failed' || data.status === 'error') {
+ return {
+ ...result,
+ success: false,
+ error: `Discover task failed: ${data.error ?? 'Unknown error'}`,
+ }
+ }
+
+ await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
+ elapsedTime += POLL_INTERVAL_MS
+ } catch (error) {
+ logger.error('Error polling for discover task:', {
+ message: error instanceof Error ? error.message : String(error),
+ taskId,
+ })
+
+ return {
+ ...result,
+ success: false,
+ error: `Error polling for discover task: ${error instanceof Error ? error.message : String(error)}`,
+ }
+ }
+ }
+
+ logger.warn(
+ `Discover task ${taskId} did not complete within the maximum polling time (${MAX_POLL_TIME_MS / 1000}s)`
+ )
+
+ return {
+ ...result,
+ success: false,
+ error: `Discover task ${taskId} timed out after ${MAX_POLL_TIME_MS / 1000}s. Check status manually.`,
+ }
+ },
+
outputs: {
results: {
type: 'array',
diff --git a/apps/sim/tools/brightdata/download_snapshot.ts b/apps/sim/tools/brightdata/download_snapshot.ts
index c62cfc4c68..555be41aab 100644
--- a/apps/sim/tools/brightdata/download_snapshot.ts
+++ b/apps/sim/tools/brightdata/download_snapshot.ts
@@ -56,7 +56,7 @@ export const brightDataDownloadSnapshotTool: ToolConfig<
}),
},
- transformResponse: async (response: Response) => {
+ transformResponse: async (response: Response, params) => {
if (response.status === 409) {
throw new Error(
'Snapshot is not ready for download. Check the snapshot status first and wait until it is "ready".'
@@ -89,7 +89,7 @@ export const brightDataDownloadSnapshotTool: ToolConfig<
output: {
data,
format: contentType,
- snapshotId: (data[0]?.snapshot_id as string) ?? null,
+ snapshotId: params?.snapshotId ?? null,
},
}
},
diff --git a/apps/sim/tools/brightdata/scrape_url.ts b/apps/sim/tools/brightdata/scrape_url.ts
index 1fe284cd31..1d62d4c6df 100644
--- a/apps/sim/tools/brightdata/scrape_url.ts
+++ b/apps/sim/tools/brightdata/scrape_url.ts
@@ -66,7 +66,7 @@ export const brightDataScrapeUrlTool: ToolConfig<
},
},
- transformResponse: async (response: Response) => {
+ transformResponse: async (response: Response, params) => {
const contentType = response.headers.get('content-type') || ''
if (!response.ok) {
@@ -86,7 +86,7 @@ export const brightDataScrapeUrlTool: ToolConfig<
success: true,
output: {
content,
- url: null,
+ url: params?.url ?? null,
statusCode: response.status,
},
}
diff --git a/apps/sim/tools/brightdata/serp_search.ts b/apps/sim/tools/brightdata/serp_search.ts
index e9ed8ef1de..3acf3b2f07 100644
--- a/apps/sim/tools/brightdata/serp_search.ts
+++ b/apps/sim/tools/brightdata/serp_search.ts
@@ -129,7 +129,7 @@ export const brightDataSerpSearchTool: ToolConfig<
},
},
- transformResponse: async (response: Response) => {
+ transformResponse: async (response: Response, params) => {
if (!response.ok) {
const errorText = await response.text()
throw new Error(errorText || `SERP request failed with status ${response.status}`)
@@ -178,9 +178,14 @@ export const brightDataSerpSearchTool: ToolConfig<
success: true,
output: {
results,
- query: ((data?.general as Record | undefined)?.query as string) ?? null,
+ query:
+ ((data?.general as Record | undefined)?.query as string) ??
+ params?.query ??
+ null,
searchEngine:
- ((data?.general as Record | undefined)?.search_engine as string) ?? null,
+ ((data?.general as Record | undefined)?.search_engine as string) ??
+ params?.searchEngine ??
+ null,
},
}
},
diff --git a/apps/sim/tools/brightdata/types.ts b/apps/sim/tools/brightdata/types.ts
index 3197826996..d0a3e8aa39 100644
--- a/apps/sim/tools/brightdata/types.ts
+++ b/apps/sim/tools/brightdata/types.ts
@@ -131,6 +131,7 @@ export interface BrightDataDiscoverResponse extends ToolResponse {
}>
query: string | null
totalResults: number
+ taskId?: string | null
}
}
diff --git a/bun.lock b/bun.lock
index aaa61ed6da..654302c866 100644
--- a/bun.lock
+++ b/bun.lock
@@ -1,5 +1,6 @@
{
"lockfileVersion": 1,
+ "configVersion": 0,
"workspaces": {
"": {
"name": "simstudio",