Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/app/api/projects/[projectId]/cloud-token/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async function projectCloudTokenRoute(req: NextRequest, { routeParams, user }: A
const secret = new TextEncoder().encode(process.env.JWT_SECRET!);
const token = await new SignJWT(payload)
.setProtectedHeader({ alg: "HS256" })
.setExpirationTime("1m")
.setExpirationTime("1h")
.sign(secret);

return Success(token);
Expand Down
115 changes: 90 additions & 25 deletions src/lib/cloud/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ import {
} from "./types";
import { handleProtocolMessage } from "./protocol";
import { ProjectState } from "../project/project-doc";
import {
migrateProjectDocCore,
readProjectDocVersion,
} from "../project/migrations/project-migration-runner";
import { migrateProjectDocCore, readProjectDocVersion } from "../project/migrations/project-migration-runner";
import { CURRENT_PROJECT_VERSION } from "../project/migrations/project-migrations";

export class ProjectRoom extends DurableObject {
Expand All @@ -31,11 +28,11 @@ export class ProjectRoom extends DurableObject {
sessions: Map<WebSocket, SessionInfo>;
userConnections: Map<string, WebSocket>;
blacklist: Set<string>;
cleanupInterval: ReturnType<typeof setInterval> | null = null;

private isDirty: boolean = false;
private alarmScheduled: boolean = false;
private projectId: string | null = null;
private lastAwarenessCleanup: number = 0;

/** Project schema version of the in-memory doc; the gatekeeper compares
* client-advertised versions against this on connect. */
Expand All @@ -50,7 +47,10 @@ export class ProjectRoom extends DurableObject {
// (esbuild does not guarantee class-field arrow functions are initialized
// before the constructor body runs.)
private handleDocUpdate!: (update: Uint8Array, origin: unknown) => void;
private handleAwarenessUpdate!: (changes: { added: number[]; updated: number[]; removed: number[] }, origin: unknown) => void;
private handleAwarenessUpdate!: (
changes: { added: number[]; updated: number[]; removed: number[] },
origin: unknown,
) => void;

constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env);
Expand All @@ -67,12 +67,24 @@ export class ProjectRoom extends DurableObject {
this.markDirty();
};

this.handleAwarenessUpdate = ({ added }: { added: number[]; updated: number[]; removed: number[] }, origin: unknown): void => {
this.handleAwarenessUpdate = (
{ added, updated }: { added: number[]; updated: number[]; removed: number[] },
origin: unknown,
): void => {
if (origin instanceof WebSocket) {
const session = this.sessions.get(origin);
if (session) {
added.forEach((id: number) => session.clientIds.add(id));
let changed = false;
const toAdd = [...added, ...updated];
toAdd.forEach((id: number) => {
if (!session.clientIds.has(id)) {
session.clientIds.add(id);
changed = true;
}
});
session.lastActivity = Date.now();
// Persist updated clientIds so they survive DO hibernation.
if (changed) this.persistSessionAttachment(origin);
}
}
};
Expand All @@ -89,10 +101,6 @@ export class ProjectRoom extends DurableObject {
this.userConnections = new Map();
this.blacklist = new Set();

// Listen for document updates and handle broadcasting + persistence.
// This is the source of truth for ALL changes to the document.
this.doc.on("update", this.handleDocUpdate);

// Track client IDs when awareness updates come from a WebSocket
this.awareness.on("update", this.handleAwarenessUpdate);

Expand All @@ -111,14 +119,22 @@ export class ProjectRoom extends DurableObject {
);
`);

// Restore project state
// Restore project state from SQLite. Attach the update handler AFTER
// the restore so that re-loading persisted bytes on every DO wake-up
// doesn't trigger scheduleSave / markDirty (which would save identical
// bytes and schedule an unnecessary R2 snapshot).
const cursor = this.ctx.storage.sql.exec("SELECT data FROM project WHERE id = 1;");
for (const row of cursor) {
if (row.data) {
Y.applyUpdate(this.doc, new Uint8Array(row.data as ArrayBuffer));
}
}

// Listen for document updates and handle broadcasting + persistence.
// Attached here (after restore) so only live writes from WS clients
// and server-side migrations trigger the save pipeline.
this.doc.on("update", this.handleDocUpdate);

// Restore blacklist
const blacklistRows = this.ctx.storage.sql.exec("SELECT user_id FROM blacklist;").toArray();
for (const row of blacklistRows) {
Expand All @@ -139,12 +155,52 @@ export class ProjectRoom extends DurableObject {
this.observeDocVersion();
});

// Start periodic stale awareness cleanup
this.startAwarenessCleanup();
// Restore sessions from hibernated WebSockets. Cloudflare DOs can
// hibernate to save memory while WebSockets stay connected; on the
// next message the constructor runs again with empty maps. Without
// this restoration, incoming messages have no session to attach to,
// session activity tracking breaks, webSocketClose finds nothing to
// clean up, and broadcastAwarenessRequest counts wrongly.
const hibernatedSockets = this.ctx.getWebSockets();
for (const ws of hibernatedSockets) {
const attachment = ws.deserializeAttachment() as
| { userId: string; clientIds: number[] }
| null;
if (!attachment) continue;
this.sessions.set(ws, {
clientIds: new Set(attachment.clientIds),
userId: attachment.userId,
lastActivity: Date.now(),
});
this.userConnections.set(attachment.userId, ws);
}
if (hibernatedSockets.length > 0) {
console.log(
`[Room] Restored ${this.sessions.size} session(s) from ${hibernatedSockets.length} hibernated WebSocket(s)`,
);
// Awareness state was lost when the DO hibernated. Ask all
// restored clients to re-broadcast their awareness so we can
// rebuild room.awareness from scratch.
this.broadcastAwarenessRequest();
}

console.log("[Room] Initialized");
}

/**
* Persist the current session state on the WebSocket so it survives
* Cloudflare DO hibernation. Called whenever clientIds or userId changes
* for a session.
*/
private persistSessionAttachment(ws: WebSocket): void {
const session = this.sessions.get(ws);
if (!session) return;
ws.serializeAttachment({
userId: session.userId,
clientIds: Array.from(session.clientIds),
});
}

/**
* Apply pending project-doc migrations to the in-memory doc and persist
* the result. Idempotent: a no-op when the doc is already at
Expand Down Expand Up @@ -179,8 +235,7 @@ export class ProjectRoom extends DurableObject {
this.docMigrationFailed = true;
this.docVersion = outcome.from;
console.error(
`[Room] Doc migration failed at step v${outcome.failedAt} ` +
`(stored v${outcome.from}):`,
`[Room] Doc migration failed at step v${outcome.failedAt} ` + `(stored v${outcome.from}):`,
outcome.error,
);
break;
Expand Down Expand Up @@ -335,12 +390,16 @@ export class ProjectRoom extends DurableObject {
}

/**
* Start periodic cleanup of stale awareness states
* Throttled inline cleanup. Called from message/connect paths instead of
* setInterval — a live timer would prevent the DO from hibernating, which
* keeps it billed continuously. With this approach the DO only does
* cleanup work when traffic is already arriving.
*/
private startAwarenessCleanup(): void {
this.cleanupInterval = setInterval(() => {
this.cleanupStaleAwareness();
}, AWARENESS_CLEANUP_INTERVAL_MS);
maybeCleanupStaleAwareness(): void {
const now = Date.now();
if (now - this.lastAwarenessCleanup < AWARENESS_CLEANUP_INTERVAL_MS) return;
this.lastAwarenessCleanup = now;
this.cleanupStaleAwareness();
}

/**
Expand Down Expand Up @@ -572,9 +631,7 @@ export class ProjectRoom extends DurableObject {
const clientVersionParam = url.searchParams.get("clientVersion");
const clientVersion = clientVersionParam !== null ? Number(clientVersionParam) : NaN;
if (Number.isFinite(clientVersion) && clientVersion < this.docVersion) {
console.log(
`[Room] Rejecting stale client v${clientVersion} (doc at v${this.docVersion})`,
);
console.log(`[Room] Rejecting stale client v${clientVersion} (doc at v${this.docVersion})`);
try {
server.close(4006, `Stale client: update to access v${this.docVersion}`);
} catch {}
Expand All @@ -588,6 +645,9 @@ export class ProjectRoom extends DurableObject {
lastActivity: Date.now(),
});
this.userConnections.set(userId, server);
// Persist immediately so a hibernation-wake before the first
// awareness message can still identify this socket.
this.persistSessionAttachment(server);

// Send current document state (sync step 1) using the same encoder
// pattern as all other outgoing messages — avoids fragile manual byte prepend.
Expand All @@ -611,6 +671,10 @@ export class ProjectRoom extends DurableObject {

console.log(`[Room] User ${userId} connected. Total sessions: ${this.sessions.size}`);

// Opportunistic cleanup on connect — a new client arriving is the
// best moment to drop awareness for clients that quietly went away.
this.maybeCleanupStaleAwareness();

// Request all existing clients to re-broadcast their awareness
// This ensures the new client receives everyone's current state,
// especially important after a DurableObject restart where
Expand Down Expand Up @@ -802,6 +866,7 @@ export class ProjectRoom extends DurableObject {
if (fullMessage.length === 0) return;

handleProtocolMessage(this, fullMessage, ws);
this.maybeCleanupStaleAwareness();
}

scheduleSave(): void {
Expand Down
Loading
Loading