diff --git a/src/app/api/projects/[projectId]/cloud-token/route.ts b/src/app/api/projects/[projectId]/cloud-token/route.ts index f78032e7..6b940aa0 100644 --- a/src/app/api/projects/[projectId]/cloud-token/route.ts +++ b/src/app/api/projects/[projectId]/cloud-token/route.ts @@ -27,7 +27,7 @@ async function projectCloudTokenRoute(req: NextRequest, { routeParams, user }: A const secret = new TextEncoder().encode(process.env.JWT_SECRET!); const token = await new SignJWT(payload) .setProtectedHeader({ alg: "HS256" }) - .setExpirationTime("1m") + .setExpirationTime("1h") .sign(secret); return Success(token); diff --git a/src/lib/cloud/room.ts b/src/lib/cloud/room.ts index ac971fc0..9af67d3b 100644 --- a/src/lib/cloud/room.ts +++ b/src/lib/cloud/room.ts @@ -18,10 +18,7 @@ import { } from "./types"; import { handleProtocolMessage } from "./protocol"; import { ProjectState } from "../project/project-doc"; -import { - migrateProjectDocCore, - readProjectDocVersion, -} from "../project/migrations/project-migration-runner"; +import { migrateProjectDocCore, readProjectDocVersion } from "../project/migrations/project-migration-runner"; import { CURRENT_PROJECT_VERSION } from "../project/migrations/project-migrations"; export class ProjectRoom extends DurableObject { @@ -31,11 +28,11 @@ export class ProjectRoom extends DurableObject { sessions: Map; userConnections: Map; blacklist: Set; - cleanupInterval: ReturnType | null = null; private isDirty: boolean = false; private alarmScheduled: boolean = false; private projectId: string | null = null; + private lastAwarenessCleanup: number = 0; /** Project schema version of the in-memory doc; the gatekeeper compares * client-advertised versions against this on connect. */ @@ -50,7 +47,10 @@ export class ProjectRoom extends DurableObject { // (esbuild does not guarantee class-field arrow functions are initialized // before the constructor body runs.) private handleDocUpdate!: (update: Uint8Array, origin: unknown) => void; - private handleAwarenessUpdate!: (changes: { added: number[]; updated: number[]; removed: number[] }, origin: unknown) => void; + private handleAwarenessUpdate!: ( + changes: { added: number[]; updated: number[]; removed: number[] }, + origin: unknown, + ) => void; constructor(ctx: DurableObjectState, env: Env) { super(ctx, env); @@ -67,12 +67,24 @@ export class ProjectRoom extends DurableObject { this.markDirty(); }; - this.handleAwarenessUpdate = ({ added }: { added: number[]; updated: number[]; removed: number[] }, origin: unknown): void => { + this.handleAwarenessUpdate = ( + { added, updated }: { added: number[]; updated: number[]; removed: number[] }, + origin: unknown, + ): void => { if (origin instanceof WebSocket) { const session = this.sessions.get(origin); if (session) { - added.forEach((id: number) => session.clientIds.add(id)); + let changed = false; + const toAdd = [...added, ...updated]; + toAdd.forEach((id: number) => { + if (!session.clientIds.has(id)) { + session.clientIds.add(id); + changed = true; + } + }); session.lastActivity = Date.now(); + // Persist updated clientIds so they survive DO hibernation. + if (changed) this.persistSessionAttachment(origin); } } }; @@ -89,10 +101,6 @@ export class ProjectRoom extends DurableObject { this.userConnections = new Map(); this.blacklist = new Set(); - // Listen for document updates and handle broadcasting + persistence. - // This is the source of truth for ALL changes to the document. - this.doc.on("update", this.handleDocUpdate); - // Track client IDs when awareness updates come from a WebSocket this.awareness.on("update", this.handleAwarenessUpdate); @@ -111,7 +119,10 @@ export class ProjectRoom extends DurableObject { ); `); - // Restore project state + // Restore project state from SQLite. Attach the update handler AFTER + // the restore so that re-loading persisted bytes on every DO wake-up + // doesn't trigger scheduleSave / markDirty (which would save identical + // bytes and schedule an unnecessary R2 snapshot). const cursor = this.ctx.storage.sql.exec("SELECT data FROM project WHERE id = 1;"); for (const row of cursor) { if (row.data) { @@ -119,6 +130,11 @@ export class ProjectRoom extends DurableObject { } } + // Listen for document updates and handle broadcasting + persistence. + // Attached here (after restore) so only live writes from WS clients + // and server-side migrations trigger the save pipeline. + this.doc.on("update", this.handleDocUpdate); + // Restore blacklist const blacklistRows = this.ctx.storage.sql.exec("SELECT user_id FROM blacklist;").toArray(); for (const row of blacklistRows) { @@ -139,12 +155,52 @@ export class ProjectRoom extends DurableObject { this.observeDocVersion(); }); - // Start periodic stale awareness cleanup - this.startAwarenessCleanup(); + // Restore sessions from hibernated WebSockets. Cloudflare DOs can + // hibernate to save memory while WebSockets stay connected; on the + // next message the constructor runs again with empty maps. Without + // this restoration, incoming messages have no session to attach to, + // session activity tracking breaks, webSocketClose finds nothing to + // clean up, and broadcastAwarenessRequest counts wrongly. + const hibernatedSockets = this.ctx.getWebSockets(); + for (const ws of hibernatedSockets) { + const attachment = ws.deserializeAttachment() as + | { userId: string; clientIds: number[] } + | null; + if (!attachment) continue; + this.sessions.set(ws, { + clientIds: new Set(attachment.clientIds), + userId: attachment.userId, + lastActivity: Date.now(), + }); + this.userConnections.set(attachment.userId, ws); + } + if (hibernatedSockets.length > 0) { + console.log( + `[Room] Restored ${this.sessions.size} session(s) from ${hibernatedSockets.length} hibernated WebSocket(s)`, + ); + // Awareness state was lost when the DO hibernated. Ask all + // restored clients to re-broadcast their awareness so we can + // rebuild room.awareness from scratch. + this.broadcastAwarenessRequest(); + } console.log("[Room] Initialized"); } + /** + * Persist the current session state on the WebSocket so it survives + * Cloudflare DO hibernation. Called whenever clientIds or userId changes + * for a session. + */ + private persistSessionAttachment(ws: WebSocket): void { + const session = this.sessions.get(ws); + if (!session) return; + ws.serializeAttachment({ + userId: session.userId, + clientIds: Array.from(session.clientIds), + }); + } + /** * Apply pending project-doc migrations to the in-memory doc and persist * the result. Idempotent: a no-op when the doc is already at @@ -179,8 +235,7 @@ export class ProjectRoom extends DurableObject { this.docMigrationFailed = true; this.docVersion = outcome.from; console.error( - `[Room] Doc migration failed at step v${outcome.failedAt} ` + - `(stored v${outcome.from}):`, + `[Room] Doc migration failed at step v${outcome.failedAt} ` + `(stored v${outcome.from}):`, outcome.error, ); break; @@ -335,12 +390,16 @@ export class ProjectRoom extends DurableObject { } /** - * Start periodic cleanup of stale awareness states + * Throttled inline cleanup. Called from message/connect paths instead of + * setInterval — a live timer would prevent the DO from hibernating, which + * keeps it billed continuously. With this approach the DO only does + * cleanup work when traffic is already arriving. */ - private startAwarenessCleanup(): void { - this.cleanupInterval = setInterval(() => { - this.cleanupStaleAwareness(); - }, AWARENESS_CLEANUP_INTERVAL_MS); + maybeCleanupStaleAwareness(): void { + const now = Date.now(); + if (now - this.lastAwarenessCleanup < AWARENESS_CLEANUP_INTERVAL_MS) return; + this.lastAwarenessCleanup = now; + this.cleanupStaleAwareness(); } /** @@ -572,9 +631,7 @@ export class ProjectRoom extends DurableObject { const clientVersionParam = url.searchParams.get("clientVersion"); const clientVersion = clientVersionParam !== null ? Number(clientVersionParam) : NaN; if (Number.isFinite(clientVersion) && clientVersion < this.docVersion) { - console.log( - `[Room] Rejecting stale client v${clientVersion} (doc at v${this.docVersion})`, - ); + console.log(`[Room] Rejecting stale client v${clientVersion} (doc at v${this.docVersion})`); try { server.close(4006, `Stale client: update to access v${this.docVersion}`); } catch {} @@ -588,6 +645,9 @@ export class ProjectRoom extends DurableObject { lastActivity: Date.now(), }); this.userConnections.set(userId, server); + // Persist immediately so a hibernation-wake before the first + // awareness message can still identify this socket. + this.persistSessionAttachment(server); // Send current document state (sync step 1) using the same encoder // pattern as all other outgoing messages — avoids fragile manual byte prepend. @@ -611,6 +671,10 @@ export class ProjectRoom extends DurableObject { console.log(`[Room] User ${userId} connected. Total sessions: ${this.sessions.size}`); + // Opportunistic cleanup on connect — a new client arriving is the + // best moment to drop awareness for clients that quietly went away. + this.maybeCleanupStaleAwareness(); + // Request all existing clients to re-broadcast their awareness // This ensures the new client receives everyone's current state, // especially important after a DurableObject restart where @@ -802,6 +866,7 @@ export class ProjectRoom extends DurableObject { if (fullMessage.length === 0) return; handleProtocolMessage(this, fullMessage, ws); + this.maybeCleanupStaleAwareness(); } scheduleSave(): void { diff --git a/src/lib/cloud/utils.ts b/src/lib/cloud/utils.ts index 5de26ce2..d9c5adc2 100644 --- a/src/lib/cloud/utils.ts +++ b/src/lib/cloud/utils.ts @@ -8,7 +8,6 @@ import * as syncProtocol from "y-protocols/sync"; import * as awarenessProtocol from "y-protocols/awareness"; import { CURRENT_PROJECT_VERSION } from "../project/migrations/project-migrations"; - declare const window: Window & typeof globalThis; /** @@ -28,7 +27,10 @@ type WebsocketProviderOptions = { type WSInternals = { _updateHandler: (update: Uint8Array, origin: unknown) => void; - _awarenessUpdateHandler: (changes: { added: number[]; updated: number[]; removed: number[] }, origin: unknown) => void; + _awarenessUpdateHandler: ( + changes: { added: number[]; updated: number[]; removed: number[] }, + origin: unknown, + ) => void; messageHandlers: Array<(encoder: encoding.Encoder, ...rest: unknown[]) => void>; ws: WebSocket | null; bcconnected: boolean; @@ -38,6 +40,7 @@ type WSInternals = { export class ThrottledWebsocketProvider extends WebsocketProvider { on(event: "document-restored", listener: () => void): this; on(event: "stale-client-version", listener: () => void): this; + on(event: "session-replaced", listener: () => void): this; on(event: Parameters[0], listener: Parameters[1]): this; on(event: string, listener: (...args: unknown[]) => void): this { return super.on( @@ -48,6 +51,7 @@ export class ThrottledWebsocketProvider extends WebsocketProvider { emit(event: "document-restored", args: []): this; emit(event: "stale-client-version", args: []): this; + emit(event: "session-replaced", args: []): this; emit(event: Parameters[0], args: Parameters[1]): this; emit(event: string, args: unknown[]): this { super.emit( @@ -63,11 +67,16 @@ export class ThrottledWebsocketProvider extends WebsocketProvider { private lastMessageTime: number = 0; // Milliseconds (Date.now()) private userIdleTimer: ReturnType | null = null; + // Cap the queue so a long disconnect can't grow it without bound. The + // local Yjs doc is the source of truth — on reconnect, syncStep1/2 will + // re-converge state regardless of what's in the queue. + private readonly MAX_UPDATE_QUEUE = 1000; + // Throttling configuration (all in milliseconds) private readonly SOLO_USER_UPDATE_MS = 1000; // 1s when alone private readonly MULTI_USER_UPDATE_MS = 200; // 200ms with others private readonly MAX_SILENCE_DURATION_MS = 20000; // 20s max silence before ping - private readonly MAX_IDLE_DURATION_MS = 10 * 60 * 1000; // 10 minutes idle timeout + private readonly MAX_IDLE_DURATION_MS = 30 * 1000; // 30 seconds idle timeout private readonly FLUSH_CHECK_INTERVAL_MS = 100; // Check flush every 100ms private readonly ACTIVITY_EVENTS = ["mousedown", "mousemove", "keydown", "touchstart", "scroll"]; @@ -113,14 +122,9 @@ export class ThrottledWebsocketProvider extends WebsocketProvider { ...(options.params ?? {}), clientVersion: String(CURRENT_PROJECT_VERSION), }; - super( - serverUrl, - room, - doc, - { ...options, params, connect: false } as unknown as ConstructorParameters< - typeof WebsocketProvider - >[3], - ); + super(serverUrl, room, doc, { ...options, params, connect: false } as unknown as ConstructorParameters< + typeof WebsocketProvider + >[3]); this.localClientId = doc.clientID; this.boundResetIdleTimer = this.resetUserIdleTimer.bind(this); @@ -132,7 +136,8 @@ export class ThrottledWebsocketProvider extends WebsocketProvider { // Store and set user info BEFORE connecting so awareness is correct from the start if (options.userInfo) { this.userInfo = options.userInfo; - this.awareness.setLocalStateField("user", options.userInfo); + const currentState = this.awareness.getLocalState() || {}; + this.awareness.setLocalState({ ...currentState, user: options.userInfo }); } // Replace default handlers with throttled versions @@ -143,9 +148,7 @@ export class ThrottledWebsocketProvider extends WebsocketProvider { // Handle awareness query (message type 3 = messageQueryAwareness) // When the server requests awareness, immediately send our current state - (this as unknown as WSInternals).messageHandlers[3] = ( - encoder: encoding.Encoder, - ) => { + (this as unknown as WSInternals).messageHandlers[3] = (encoder: encoding.Encoder) => { this.lastMessageTime = Date.now(); // Write awareness update to the encoder (y-websocket will send it) encoding.writeVarUint(encoder, 1); // messageAwareness @@ -190,8 +193,10 @@ export class ThrottledWebsocketProvider extends WebsocketProvider { // Restore user info if it was lost during reconnection // y-websocket may clear awareness state internally during reconnect - if (this.userInfo && !this.awareness.getLocalState()?.user) { - this.awareness.setLocalStateField("user", this.userInfo); + const localState = this.awareness.getLocalState(); + if (this.userInfo && !localState?.user) { + const currentState = localState || {}; + this.awareness.setLocalState({ ...currentState, user: this.userInfo }); } // Queue and send our awareness update @@ -256,20 +261,14 @@ export class ThrottledWebsocketProvider extends WebsocketProvider { console.log("[WS] Session was replaced by another connection. Stopping reconnection."); this.isSessionReplaced = true; - // Cancel any pending reconnects from our custom logic if (this.reconnectTimeout) { clearTimeout(this.reconnectTimeout); this.reconnectTimeout = null; } - // Tell y-websocket to not reconnect this.shouldConnect = false; - - // Disconnect properly to clean up this.disconnect(); - - // Emit custom event for UI to handle - //this.emit("session-replaced", []); + this.emit("session-replaced", []); } /** @@ -347,7 +346,8 @@ export class ThrottledWebsocketProvider extends WebsocketProvider { */ public setUserInfo(userInfo: { name: string; color: string; userId?: string }): void { this.userInfo = userInfo; - this.awareness.setLocalStateField("user", userInfo); + const currentState = this.awareness.getLocalState() || {}; + this.awareness.setLocalState({ ...currentState, user: userInfo }); } /** @@ -415,7 +415,8 @@ export class ThrottledWebsocketProvider extends WebsocketProvider { // Restore user info after cleanup (it will also be restored in onStatusChange) if (this.userInfo) { - this.awareness.setLocalStateField("user", this.userInfo); + const currentState = this.awareness.getLocalState() || {}; + this.awareness.setLocalState({ ...currentState, user: this.userInfo }); } // Clear any pending reconnect @@ -587,11 +588,22 @@ export class ThrottledWebsocketProvider extends WebsocketProvider { /** * Reconnect after wake (visibility restored or network online). - * Only acts if not connected, not idle-disconnected (that case is handled - * by resetUserIdleTimer on next user activity), and not session-replaced. + * + * Visibility/online return is a strong "user is back" signal — strong + * enough that we treat it the same as user input and reset the idle + * state. Previously we bailed out for idle-disconnected sessions, which + * left the user offline (no presence, no incoming updates) until they + * happened to type or move the mouse. */ private handleWakeUp(source: string): void { - if (this.isDestroyed || this.isSessionReplaced || this.isIdleDisconnected) return; + if (this.isDestroyed || this.isSessionReplaced) return; + + // resetUserIdleTimer clears isIdleDisconnected and triggers a + // reconnect when we were idle, so it covers that path. + const wasIdleDisconnected = this.isIdleDisconnected; + this.resetUserIdleTimer(); + if (wasIdleDisconnected) return; + if (!this.wsconnected) { console.log(`[WS] Reconnecting after wake (${source})...`); this.reconnectAttempts = 0; @@ -650,6 +662,11 @@ export class ThrottledWebsocketProvider extends WebsocketProvider { private onThrottledUpdate = (update: Uint8Array, origin: unknown): void => { if (origin !== this) { this.updateQueue.push(update); + // Cap queue length on long disconnects. The local doc still holds + // every change; sync on reconnect will replay them. + if (this.updateQueue.length > this.MAX_UPDATE_QUEUE) { + this.updateQueue.splice(0, this.updateQueue.length - this.MAX_UPDATE_QUEUE); + } } }; @@ -714,18 +731,24 @@ export class ThrottledWebsocketProvider extends WebsocketProvider { } /** - * Flush all pending updates to the server and BroadcastChannel + * Flush all pending updates to the server and BroadcastChannel. + * + * Queues are only cleared if at least one transport (WS or BC) actually + * delivered the message. Otherwise the entries stay queued for the next + * flush — important when the WS is mid-reconnect: the previous code + * dropped the queue regardless, relying on Yjs re-sync to recover. */ public flush(): void { const ws = this.ws; - const isWsConnected = this.wsconnected && ws && ws.readyState === 1; + const internals = this as unknown as WSInternals; + const isWsConnected = this.wsconnected && !!ws && ws.readyState === 1; + const isBcConnected = internals.bcconnected; + const canDeliver = isWsConnected || isBcConnected; // Send document updates - if (this.updateQueue.length > 0) { + if (this.updateQueue.length > 0 && canDeliver) { try { const updates = this.updateQueue; - this.updateQueue = []; - const encoder = encoding.createEncoder(); encoding.writeVarUint(encoder, 0); // sync message type for (const update of updates) { @@ -736,21 +759,19 @@ export class ThrottledWebsocketProvider extends WebsocketProvider { if (isWsConnected) { ws.send(message); } - - if ((this as unknown as WSInternals).bcconnected) { - bc.publish((this as unknown as WSInternals).bcChannel, message, this); + if (isBcConnected) { + bc.publish(internals.bcChannel, message, this); } + this.updateQueue = []; } catch (e) { console.error("[WS] Failed to send document updates:", e); } } // Send awareness updates - if (this.awarenessQueue.size > 0) { + if (this.awarenessQueue.size > 0 && canDeliver) { try { const changedClients = Array.from(this.awarenessQueue); - this.awarenessQueue.clear(); - const encoder = encoding.createEncoder(); encoding.writeVarUint(encoder, 1); // awareness message type encoding.writeVarUint8Array( @@ -762,10 +783,10 @@ export class ThrottledWebsocketProvider extends WebsocketProvider { if (isWsConnected) { ws.send(message); } - - if ((this as unknown as WSInternals).bcconnected) { - bc.publish((this as unknown as WSInternals).bcChannel, message, this); + if (isBcConnected) { + bc.publish(internals.bcChannel, message, this); } + this.awarenessQueue.clear(); } catch (e) { console.error("[WS] Failed to send awareness updates:", e); } @@ -855,10 +876,7 @@ export const allowOnWebsocket = async (userId: string, projectId: string) => { }; const secret = new TextEncoder().encode(process.env.JWT_SECRET!); - const token = await new SignJWT(payload) - .setProtectedHeader({ alg: "HS256" }) - .setExpirationTime("1m") - .sign(secret); + const token = await new SignJWT(payload).setProtectedHeader({ alg: "HS256" }).setExpirationTime("1m").sign(secret); await fetch(`${process.env.NEXT_PUBLIC_CLOUD_URL}/${projectId}/allow`, { method: "POST", headers: { @@ -876,10 +894,7 @@ export const blacklistFromWebsocket = async (userId: string, projectId: string) }; const secret = new TextEncoder().encode(process.env.JWT_SECRET!); - const token = await new SignJWT(payload) - .setProtectedHeader({ alg: "HS256" }) - .setExpirationTime("1m") - .sign(secret); + const token = await new SignJWT(payload).setProtectedHeader({ alg: "HS256" }).setExpirationTime("1m").sign(secret); await fetch(`${process.env.NEXT_PUBLIC_CLOUD_URL}/${projectId}/blacklist`, { method: "POST", headers: { diff --git a/src/lib/cloud/wrangler.toml b/src/lib/cloud/wrangler.toml index 7b1c9985..37e7f2db 100644 --- a/src/lib/cloud/wrangler.toml +++ b/src/lib/cloud/wrangler.toml @@ -9,6 +9,17 @@ compatibility_flags = ["nodejs_compat"] tag = "v1" new_sqlite_classes = ["ProjectRoom"] +# --- Local dev (default env, wrangler dev with no --env flag) --- +# Wrangler simulates DOs and R2 locally when bindings are declared here. + +[[durable_objects.bindings]] +name = "PROJECT_ROOM" +class_name = "ProjectRoom" + +[[r2_buckets]] +binding = "SNAPSHOTS" +bucket_name = "scriptio-snapshots-local" + # --- Production --- [env.production] name = "scriptio-cloud" diff --git a/src/lib/project/project-state.ts b/src/lib/project/project-state.ts index 6f5bc809..9563db0b 100644 --- a/src/lib/project/project-state.ts +++ b/src/lib/project/project-state.ts @@ -234,7 +234,7 @@ export const useCloudSync = ( const [connectionStatus, setConnectionStatus] = useState("disconnected"); const [isCloudSynced, setIsCloudSynced] = useState(false); const [isLockedByServer] = useState(false); - const [isSessionReplaced] = useState(false); + const [isSessionReplaced, setIsSessionReplaced] = useState(false); const [isProjectUnavailable, setIsProjectUnavailable] = useState(false); const [isStaleClient, setIsStaleClient] = useState(false); @@ -274,6 +274,7 @@ export const useCloudSync = ( useEffect(() => { isMountedRef.current = true; setIsProjectUnavailable(false); + setIsSessionReplaced(false); if (!ydoc || !projectId || typeof window === "undefined") { setConnectionStatus("disconnected"); @@ -375,15 +376,10 @@ export const useCloudSync = ( // Status updates cloudProvider.on("status", (e: { status: string }) => { - if (isMountedRef.current) { - setTimeout(() => { - if (isMountedRef.current) { - setConnectionStatus(e.status as ConnectionStatus); - if (e.status === "connected" && cloudProvider.synced) { - setIsCloudSynced(true); - } - } - }, 0); + if (!isMountedRef.current) return; + setConnectionStatus(e.status as ConnectionStatus); + if (e.status === "connected" && cloudProvider.synced) { + setIsCloudSynced(true); } }); @@ -394,6 +390,15 @@ export const useCloudSync = ( } }); + // Surface session-replaced state to the UI so the connection + // indicator and recovery dialogs reflect the terminal state + // (the provider stops reconnecting after this fires). + cloudProvider.on("session-replaced", () => { + if (!isMountedRef.current) return; + setIsSessionReplaced(true); + setConnectionStatus("disconnected"); + }); + // Handle document restore cloudProvider.on("document-restored", async () => { if (!isMountedRef.current) return;