From b794c9e5527f6afc71dd8b59be42473939c9e63e Mon Sep 17 00:00:00 2001 From: JD Date: Tue, 26 May 2026 18:07:55 +0000 Subject: [PATCH 1/6] fix: throttle SSE part deltas to reduce mobile freezes Accumulate text deltas in a buffer and flush every 50ms, so rapid streaming chunks don't trigger a Solid store mutation + re-render on every single delta event. On mobile this cuts the main-thread blocking caused by KaTeX / highlight.js re-renders during streaming responses. --- packages/ui/src/stores/session-events.ts | 29 ++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/packages/ui/src/stores/session-events.ts b/packages/ui/src/stores/session-events.ts index 90430b1f5..cc660d49a 100644 --- a/packages/ui/src/stores/session-events.ts +++ b/packages/ui/src/stores/session-events.ts @@ -327,7 +327,7 @@ function findPendingSyntheticMessageId( if (!record) continue if (record.sessionId !== sessionId) continue if (record.role !== role) continue - if (record.status !== "sending") continue + if (record.status !== "sending" && record.status !== "sent") continue if (!record.isEphemeral) continue return record.id } @@ -453,12 +453,37 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes } } +const DELTA_FLUSH_INTERVAL = 50 + +const pendingDeltas = new Map() +let deltaFlushTimer: ReturnType | null = null + +function enqueueDelta(instanceId: string, messageId: string, partId: string, field: string, delta: string) { + const key = `${instanceId}:${messageId}:${partId}:${field}` + const existing = pendingDeltas.get(key) + const accumulated = existing ? existing.delta + delta : delta + pendingDeltas.set(key, { instanceId, messageId, partId, field, delta: accumulated }) + if (deltaFlushTimer === null) { + deltaFlushTimer = setTimeout(flushDeltas, DELTA_FLUSH_INTERVAL) + } +} + +function flushDeltas() { + deltaFlushTimer = null + if (pendingDeltas.size === 0) return + const batch = Array.from(pendingDeltas.values()) + pendingDeltas.clear() + for (const { instanceId, messageId, partId, field, delta } of batch) { + applyPartDeltaV2(instanceId, { messageId, partId, field, delta }) + } +} + function handleMessagePartDelta(instanceId: string, event: MessagePartDeltaEvent): void { const props = event.properties if (!props) return const { messageID, partID, field, delta } = props if (!messageID || !partID || !field || typeof delta !== "string") return - applyPartDeltaV2(instanceId, { messageId: messageID, partId: partID, field, delta }) + enqueueDelta(instanceId, messageID, partID, field, delta) } function handleSessionUpdate(instanceId: string, event: EventSessionUpdated): void { From 9fc0de1af8fd1635a965fcc0375ec200ac2f3212 Mon Sep 17 00:00:00 2001 From: JD Date: Wed, 27 May 2026 01:27:56 +0000 Subject: [PATCH 2/6] fix: prevent text duplication from stale deltas after part updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When message.part.updated arrives with a complete part, applyPartUpdate replaces the part entirely with the server's state. But if deltas were enqueued before the part update arrived and haven't flushed yet, those stale deltas would be applied AFTER the replacement, causing text duplication at the end of assistant messages. Fix: clear pending deltas for a part before applying the full part update, since the update already contains the complete state and any accumulated deltas are now stale. This prevents the race where: 1. Deltas accumulate in the 50ms throttle window 2. message.part.updated arrives and replaces the part with complete text 3. Delta timer expires and concatenates stale deltas → duplicate text --- packages/ui/src/stores/session-events.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/packages/ui/src/stores/session-events.ts b/packages/ui/src/stores/session-events.ts index cc660d49a..2f473eac7 100644 --- a/packages/ui/src/stores/session-events.ts +++ b/packages/ui/src/stores/session-events.ts @@ -384,6 +384,12 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes upsertMessageInfoV2(instanceId, messageInfo, { status: "streaming" }) } + // Clear any pending deltas for this part before applying the full part update. + // The part update contains the complete state from the server, so accumulated + // deltas would be stale and cause duplication if flushed later. + if (part.id) { + clearPendingDeltasForPart(instanceId, messageId, part.id) + } applyPartUpdateV2(instanceId, { ...part, sessionID: sessionId, messageID: messageId }) handleConversationAssistantPartUpdated(instanceId, { ...part, sessionID: sessionId, messageID: messageId }, messageInfo) @@ -468,6 +474,18 @@ function enqueueDelta(instanceId: string, messageId: string, partId: string, fie } } +function clearPendingDeltasForPart(instanceId: string, messageId: string, partId: string) { + const keysToDelete: string[] = [] + for (const key of pendingDeltas.keys()) { + if (key.startsWith(`${instanceId}:${messageId}:${partId}:`)) { + keysToDelete.push(key) + } + } + for (const key of keysToDelete) { + pendingDeltas.delete(key) + } +} + function flushDeltas() { deltaFlushTimer = null if (pendingDeltas.size === 0) return From 105838b5cdc5a4bb99ef9375e5f025967fde06cf Mon Sep 17 00:00:00 2001 From: JD Date: Tue, 9 Jun 2026 10:30:52 +0000 Subject: [PATCH 3/6] fix(ui): flush pending deltas before message.updated to preserve event ordering When deltas are buffered for up to 50ms and message.updated arrives before the flush timer fires, the message could be marked complete/error before pending text mutations are applied. This causes the UI to observe a terminal-status message with stale content. Fix: flush any pending deltas for the message before applying the message.updated event. This preserves the server's event ordering: all delta content is applied first, then the message status/metadata update runs on the complete content. Addresses gatekeeper review blocking finding #1 on PR #536. --- packages/ui/src/stores/session-events.ts | 26 ++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/packages/ui/src/stores/session-events.ts b/packages/ui/src/stores/session-events.ts index 2f473eac7..0df24cc6e 100644 --- a/packages/ui/src/stores/session-events.ts +++ b/packages/ui/src/stores/session-events.ts @@ -408,6 +408,14 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes const messageId = typeof info.id === "string" ? info.id : undefined if (!sessionId || !messageId) return + // Flush any pending deltas for this message before applying the update. + // Deltas are buffered for up to 50ms; if message.updated arrives before + // the buffer flushes, the message could be marked complete/error with + // stale text mutations still pending. Flushing first preserves the + // server's event ordering: all delta content is applied, then the + // message status/metadata update runs on the complete content. + flushPendingDeltasForMessage(instanceId, messageId) + const timeInfo = (info.time ?? {}) as { created?: number; updated?: number; end?: number } const nextUpdated = typeof timeInfo.end === "number" && timeInfo.end > 0 @@ -486,6 +494,24 @@ function clearPendingDeltasForPart(instanceId: string, messageId: string, partId } } +function flushPendingDeltasForMessage(instanceId: string, messageId: string): void { + const prefix = `${instanceId}:${messageId}:` + for (const key of pendingDeltas.keys()) { + if (key.startsWith(prefix)) { + const pending = pendingDeltas.get(key) + if (pending) { + applyPartDeltaV2(instanceId, { + messageId: pending.messageId, + partId: pending.partId, + field: pending.field, + delta: pending.delta, + }) + pendingDeltas.delete(key) + } + } + } +} + function flushDeltas() { deltaFlushTimer = null if (pendingDeltas.size === 0) return From ff977479dcf01e34db11e09f398bcfc266f90310 Mon Sep 17 00:00:00 2001 From: JD Date: Wed, 10 Jun 2026 22:00:15 +0000 Subject: [PATCH 4/6] refactor(ui): address Greptile review findings on PR #536 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Apply two P2 (style-level) improvements from Greptile gatekeeper review: 1. Consistent snapshot pattern in flushPendingDeltasForMessage - Collect keys first, then iterate to flush (matches clearPendingDeltasForPart) - Prevents mutation-during-iteration and ensures delete() runs before apply - If applyPartDeltaV2 throws, entry is already removed (no re-apply on timer) 2. Extract delta-buffer to dedicated module - session-events.ts: 854 → 808 lines (under 850, closer to 800 target) - delta-buffer.ts: 80 lines (focused, single-responsibility) - Improves maintainability per AGENTS.md file-length guidelines No functional changes. Build verified. Addresses: https://github.com/Pagecran/CodeNomad/pull/17#discussion_r3383913782 https://github.com/Pagecran/CodeNomad/pull/17#discussion_r3383913841 --- packages/ui/src/stores/delta-buffer.ts | 80 ++++++++++++++++++++++++ packages/ui/src/stores/session-events.ts | 62 +++--------------- 2 files changed, 90 insertions(+), 52 deletions(-) create mode 100644 packages/ui/src/stores/delta-buffer.ts diff --git a/packages/ui/src/stores/delta-buffer.ts b/packages/ui/src/stores/delta-buffer.ts new file mode 100644 index 000000000..82a7436be --- /dev/null +++ b/packages/ui/src/stores/delta-buffer.ts @@ -0,0 +1,80 @@ +/** + * Delta buffer for throttling SSE message.part.delta events. + * + * Accumulates text deltas in a 50ms window to reduce UI churn from + * high-frequency streaming chunks. Provides targeted flush/clear paths + * so full part-update or message-complete events always win over stale + * buffered deltas. + */ + +const DELTA_FLUSH_INTERVAL = 50 + +const pendingDeltas = new Map() +let deltaFlushTimer: ReturnType | null = null + +export function enqueueDelta(instanceId: string, messageId: string, partId: string, field: string, delta: string) { + const key = `${instanceId}:${messageId}:${partId}:${field}` + const existing = pendingDeltas.get(key) + const accumulated = existing ? existing.delta + delta : delta + pendingDeltas.set(key, { instanceId, messageId, partId, field, delta: accumulated }) + if (deltaFlushTimer === null) { + deltaFlushTimer = setTimeout(flushDeltas, DELTA_FLUSH_INTERVAL) + } +} + +export function clearPendingDeltasForPart(instanceId: string, messageId: string, partId: string) { + const keysToDelete: string[] = [] + for (const key of pendingDeltas.keys()) { + if (key.startsWith(`${instanceId}:${messageId}:${partId}:`)) { + keysToDelete.push(key) + } + } + for (const key of keysToDelete) { + pendingDeltas.delete(key) + } +} + +export function flushPendingDeltasForMessage( + instanceId: string, + messageId: string, + applyDelta: (instanceId: string, delta: { messageId: string; partId: string; field: string; delta: string }) => void +): void { + const prefix = `${instanceId}:${messageId}:` + const keysToFlush: string[] = [] + for (const key of pendingDeltas.keys()) { + if (key.startsWith(prefix)) { + keysToFlush.push(key) + } + } + for (const key of keysToFlush) { + const pending = pendingDeltas.get(key) + if (pending) { + pendingDeltas.delete(key) + applyDelta(instanceId, { + messageId: pending.messageId, + partId: pending.partId, + field: pending.field, + delta: pending.delta, + }) + } + } +} + +export function setFlushCallback( + callback: (batch: Array<{ instanceId: string; messageId: string; partId: string; field: string; delta: string }>) => void +) { + // Store callback for flushDeltas to use + flushCallback = callback +} + +let flushCallback: ((batch: Array<{ instanceId: string; messageId: string; partId: string; field: string; delta: string }>) => void) | null = null + +function flushDeltas() { + deltaFlushTimer = null + if (pendingDeltas.size === 0) return + const batch = Array.from(pendingDeltas.values()) + pendingDeltas.clear() + if (flushCallback) { + flushCallback(batch) + } +} diff --git a/packages/ui/src/stores/session-events.ts b/packages/ui/src/stores/session-events.ts index 0df24cc6e..e62ce0120 100644 --- a/packages/ui/src/stores/session-events.ts +++ b/packages/ui/src/stores/session-events.ts @@ -17,6 +17,12 @@ import type { MessageStatus } from "./message-v2/types" import { getLogger } from "../lib/logger" import { requestData } from "../lib/opencode-api" +import { + enqueueDelta, + clearPendingDeltasForPart, + flushPendingDeltasForMessage, + setFlushCallback, +} from "./delta-buffer" import { getPermissionId, getPermissionKind, @@ -414,7 +420,7 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes // stale text mutations still pending. Flushing first preserves the // server's event ordering: all delta content is applied, then the // message status/metadata update runs on the complete content. - flushPendingDeltasForMessage(instanceId, messageId) + flushPendingDeltasForMessage(instanceId, messageId, applyPartDeltaV2) const timeInfo = (info.time ?? {}) as { created?: number; updated?: number; end?: number } const nextUpdated = @@ -467,60 +473,12 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes } } -const DELTA_FLUSH_INTERVAL = 50 - -const pendingDeltas = new Map() -let deltaFlushTimer: ReturnType | null = null - -function enqueueDelta(instanceId: string, messageId: string, partId: string, field: string, delta: string) { - const key = `${instanceId}:${messageId}:${partId}:${field}` - const existing = pendingDeltas.get(key) - const accumulated = existing ? existing.delta + delta : delta - pendingDeltas.set(key, { instanceId, messageId, partId, field, delta: accumulated }) - if (deltaFlushTimer === null) { - deltaFlushTimer = setTimeout(flushDeltas, DELTA_FLUSH_INTERVAL) - } -} - -function clearPendingDeltasForPart(instanceId: string, messageId: string, partId: string) { - const keysToDelete: string[] = [] - for (const key of pendingDeltas.keys()) { - if (key.startsWith(`${instanceId}:${messageId}:${partId}:`)) { - keysToDelete.push(key) - } - } - for (const key of keysToDelete) { - pendingDeltas.delete(key) - } -} - -function flushPendingDeltasForMessage(instanceId: string, messageId: string): void { - const prefix = `${instanceId}:${messageId}:` - for (const key of pendingDeltas.keys()) { - if (key.startsWith(prefix)) { - const pending = pendingDeltas.get(key) - if (pending) { - applyPartDeltaV2(instanceId, { - messageId: pending.messageId, - partId: pending.partId, - field: pending.field, - delta: pending.delta, - }) - pendingDeltas.delete(key) - } - } - } -} - -function flushDeltas() { - deltaFlushTimer = null - if (pendingDeltas.size === 0) return - const batch = Array.from(pendingDeltas.values()) - pendingDeltas.clear() +// Delta buffer callback setup +setFlushCallback((batch) => { for (const { instanceId, messageId, partId, field, delta } of batch) { applyPartDeltaV2(instanceId, { messageId, partId, field, delta }) } -} +}) function handleMessagePartDelta(instanceId: string, event: MessagePartDeltaEvent): void { const props = event.properties From 512240d96cb99b92c8367670370daf25f915b801 Mon Sep 17 00:00:00 2001 From: JD Date: Sat, 13 Jun 2026 11:39:04 +0000 Subject: [PATCH 5/6] fix: remove trailing whitespace in delta-buffer.ts Addresses whitespace hygiene failure found by CodeNomadBot gatekeeper review. git diff --check was failing on line 3 of delta-buffer.ts. --- packages/ui/src/stores/delta-buffer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/ui/src/stores/delta-buffer.ts b/packages/ui/src/stores/delta-buffer.ts index 82a7436be..303435b42 100644 --- a/packages/ui/src/stores/delta-buffer.ts +++ b/packages/ui/src/stores/delta-buffer.ts @@ -1,6 +1,6 @@ /** * Delta buffer for throttling SSE message.part.delta events. - * + * * Accumulates text deltas in a 50ms window to reduce UI churn from * high-frequency streaming chunks. Provides targeted flush/clear paths * so full part-update or message-complete events always win over stale From 6ee9b808deb68bab68841e87f107c26a43904826 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Andr=C3=A9?= Date: Sat, 13 Jun 2026 16:31:36 +0200 Subject: [PATCH 6/6] test(ui): cover delta buffer ordering Add targeted node:test coverage for SSE delta buffering, including concatenation, stale part clearing, message-level flushing before updates, and isolation across instances/messages/parts. Keep optimistic synthetic message reconciliation scoped to sending records by removing the unrelated sent-status match. This preserves the original pending-message behavior while the PR focuses on delta throttling. Validation: node --test --import tsx packages/ui/src/stores/delta-buffer.test.ts; npm run typecheck --workspace @codenomad/ui; npm run build:ui. --- packages/ui/src/stores/delta-buffer.test.ts | 102 ++++++++++++++++++++ packages/ui/src/stores/delta-buffer.ts | 9 ++ packages/ui/src/stores/session-events.ts | 2 +- 3 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 packages/ui/src/stores/delta-buffer.test.ts diff --git a/packages/ui/src/stores/delta-buffer.test.ts b/packages/ui/src/stores/delta-buffer.test.ts new file mode 100644 index 000000000..09072783e --- /dev/null +++ b/packages/ui/src/stores/delta-buffer.test.ts @@ -0,0 +1,102 @@ +import assert from "node:assert/strict" +import { afterEach, beforeEach, describe, it } from "node:test" +import { setTimeout as delay } from "node:timers/promises" + +import { + clearPendingDeltasForPart, + enqueueDelta, + flushPendingDeltasForMessage, + resetDeltaBufferForTests, + setFlushCallback, +} from "./delta-buffer.ts" + +type DeltaBatch = Array<{ instanceId: string; messageId: string; partId: string; field: string; delta: string }> + +describe("delta buffer", () => { + beforeEach(() => { + resetDeltaBufferForTests() + }) + + afterEach(() => { + resetDeltaBufferForTests() + }) + + it("concatenates matching deltas and flushes them once", async () => { + const flushed: DeltaBatch[] = [] + setFlushCallback((batch) => flushed.push(batch)) + + enqueueDelta("instance-1", "message-1", "part-1", "text", "hello") + enqueueDelta("instance-1", "message-1", "part-1", "text", " world") + + await delay(75) + + assert.equal(flushed.length, 1) + assert.deepEqual(flushed[0], [ + { instanceId: "instance-1", messageId: "message-1", partId: "part-1", field: "text", delta: "hello world" }, + ]) + }) + + it("clears pending deltas for a full part update before a stale timer flush", async () => { + const flushed: DeltaBatch[] = [] + setFlushCallback((batch) => flushed.push(batch)) + + enqueueDelta("instance-1", "message-1", "part-1", "text", "stale") + clearPendingDeltasForPart("instance-1", "message-1", "part-1") + + await delay(75) + + assert.deepEqual(flushed, []) + }) + + it("flushes pending message deltas before applying message.updated", async () => { + const timerFlushes: DeltaBatch[] = [] + const applied: Array<{ instanceId: string; delta: { messageId: string; partId: string; field: string; delta: string } }> = [] + setFlushCallback((batch) => timerFlushes.push(batch)) + + enqueueDelta("instance-1", "message-1", "part-1", "text", "before update") + flushPendingDeltasForMessage("instance-1", "message-1", (instanceId, delta) => { + applied.push({ instanceId, delta }) + }) + + await delay(75) + + assert.deepEqual(applied, [ + { + instanceId: "instance-1", + delta: { messageId: "message-1", partId: "part-1", field: "text", delta: "before update" }, + }, + ]) + assert.deepEqual(timerFlushes, []) + }) + + it("keeps clear and flush operations isolated by instance, message, and part", async () => { + const timerFlushes: DeltaBatch[] = [] + const applied: Array<{ instanceId: string; delta: { messageId: string; partId: string; field: string; delta: string } }> = [] + setFlushCallback((batch) => timerFlushes.push(batch)) + + enqueueDelta("instance-1", "message-1", "part-1", "text", "drop") + enqueueDelta("instance-1", "message-1", "part-2", "text", "same message") + enqueueDelta("instance-1", "message-2", "part-1", "text", "other message") + enqueueDelta("instance-2", "message-1", "part-1", "text", "other instance") + + clearPendingDeltasForPart("instance-1", "message-1", "part-1") + flushPendingDeltasForMessage("instance-1", "message-1", (instanceId, delta) => { + applied.push({ instanceId, delta }) + }) + + await delay(75) + + assert.deepEqual(applied, [ + { + instanceId: "instance-1", + delta: { messageId: "message-1", partId: "part-2", field: "text", delta: "same message" }, + }, + ]) + assert.deepEqual(timerFlushes, [ + [ + { instanceId: "instance-1", messageId: "message-2", partId: "part-1", field: "text", delta: "other message" }, + { instanceId: "instance-2", messageId: "message-1", partId: "part-1", field: "text", delta: "other instance" }, + ], + ]) + }) +}) diff --git a/packages/ui/src/stores/delta-buffer.ts b/packages/ui/src/stores/delta-buffer.ts index 303435b42..3307257a4 100644 --- a/packages/ui/src/stores/delta-buffer.ts +++ b/packages/ui/src/stores/delta-buffer.ts @@ -67,6 +67,15 @@ export function setFlushCallback( flushCallback = callback } +export function resetDeltaBufferForTests() { + pendingDeltas.clear() + if (deltaFlushTimer !== null) { + clearTimeout(deltaFlushTimer) + deltaFlushTimer = null + } + flushCallback = null +} + let flushCallback: ((batch: Array<{ instanceId: string; messageId: string; partId: string; field: string; delta: string }>) => void) | null = null function flushDeltas() { diff --git a/packages/ui/src/stores/session-events.ts b/packages/ui/src/stores/session-events.ts index e62ce0120..ce3632d0b 100644 --- a/packages/ui/src/stores/session-events.ts +++ b/packages/ui/src/stores/session-events.ts @@ -333,7 +333,7 @@ function findPendingSyntheticMessageId( if (!record) continue if (record.sessionId !== sessionId) continue if (record.role !== role) continue - if (record.status !== "sending" && record.status !== "sent") continue + if (record.status !== "sending") continue if (!record.isEphemeral) continue return record.id }