diff --git a/.server-changes/merge-dequeue-snapshot-into-transaction.md b/.server-changes/merge-dequeue-snapshot-into-transaction.md new file mode 100644 index 0000000000..62c9a0ec6c --- /dev/null +++ b/.server-changes/merge-dequeue-snapshot-into-transaction.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Merge execution snapshot creation into the dequeue taskRun.update transaction, reducing 2 DB commits to 1 per dequeue operation diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index 9476a081fe..15d79e76ba 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -3,7 +3,7 @@ import { startSpan } from "@internal/tracing"; import { assertExhaustive, tryCatch } from "@trigger.dev/core"; import { DequeuedMessage, RetryOptions, RunAnnotations } from "@trigger.dev/core/v3"; import { placementTag } from "@trigger.dev/core/v3/serverOnly"; -import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic"; +import { generateInternalId, getMaxDuration, SnapshotId } from "@trigger.dev/core/v3/isomorphic"; import { BackgroundWorker, BackgroundWorkerTask, @@ -416,6 +416,9 @@ export class DequeueSystem { ? undefined : result.task.retryConfig; + // Pre-generate snapshot ID so we can construct the result without an extra read + const snapshotId = generateInternalId(); + const lockedTaskRun = await prisma.taskRun.update({ where: { id: runId, @@ -435,6 +438,33 @@ export class DequeueSystem { cliVersion: result.worker.cliVersion, maxDurationInSeconds, maxAttempts: maxAttempts ?? undefined, + executionSnapshots: { + create: { + id: snapshotId, + engine: "V2", + executionStatus: "PENDING_EXECUTING", + description: "Run was dequeued for execution", + // Map DEQUEUED -> PENDING for backwards compatibility with older runners + runStatus: "PENDING", + attemptNumber: result.run.attemptNumber ?? undefined, + previousSnapshotId: snapshot.id, + environmentId: snapshot.environmentId, + environmentType: snapshot.environmentType, + projectId: snapshot.projectId, + organizationId: snapshot.organizationId, + checkpointId: snapshot.checkpointId ?? undefined, + batchId: snapshot.batchId ?? undefined, + completedWaitpoints: { + connect: snapshot.completedWaitpoints.map((w) => ({ id: w.id })), + }, + completedWaitpointOrder: snapshot.completedWaitpoints + .filter((c) => c.index !== undefined) + .sort((a, b) => a.index! - b.index!) + .map((w) => w.id), + workerId, + runnerId, + }, + }, }, include: { runtimeEnvironment: true, @@ -516,44 +546,50 @@ export class DequeueSystem { hasPrivateLink = billingResult.val.hasPrivateLink; } - const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( - prisma, - { - run: { - id: runId, - status: lockedTaskRun.status, - attemptNumber: lockedTaskRun.attemptNumber, - }, - snapshot: { - executionStatus: "PENDING_EXECUTING", - description: "Run was dequeued for execution", - }, - previousSnapshotId: snapshot.id, - environmentId: snapshot.environmentId, - environmentType: snapshot.environmentType, - projectId: snapshot.projectId, - organizationId: snapshot.organizationId, - checkpointId: snapshot.checkpointId ?? undefined, - batchId: snapshot.batchId ?? undefined, - completedWaitpoints: snapshot.completedWaitpoints, - workerId, - runnerId, - } - ); + // Snapshot was created as part of the taskRun.update above (single transaction). + // Construct the snapshot info from data we already have and handle side effects + // (heartbeat + event) manually — no extra DB read needed. + const snapshotCreatedAt = new Date(); + + this.$.eventBus.emit("executionSnapshotCreated", { + time: snapshotCreatedAt, + run: { + id: runId, + }, + snapshot: { + id: snapshotId, + executionStatus: "PENDING_EXECUTING", + description: "Run was dequeued for execution", + runStatus: "PENDING", + attemptNumber: result.run.attemptNumber ?? null, + checkpointId: snapshot.checkpointId ?? null, + workerId: workerId ?? null, + runnerId: runnerId ?? null, + isValid: true, + error: null, + completedWaitpointIds: snapshot.completedWaitpoints.map((wp) => wp.id), + }, + }); + + await this.executionSnapshotSystem.enqueueHeartbeatIfNeeded({ + id: snapshotId, + runId, + executionStatus: "PENDING_EXECUTING", + }); return { version: "1" as const, dequeuedAt: new Date(), workerQueueLength: message.workerQueueLength, snapshot: { - id: newSnapshot.id, - friendlyId: newSnapshot.friendlyId, - executionStatus: newSnapshot.executionStatus, - description: newSnapshot.description, - createdAt: newSnapshot.createdAt, + id: snapshotId, + friendlyId: SnapshotId.toFriendlyId(snapshotId), + executionStatus: "PENDING_EXECUTING" as const, + description: "Run was dequeued for execution", + createdAt: snapshotCreatedAt, }, image: result.deployment?.imageReference ?? undefined, - checkpoint: newSnapshot.checkpoint ?? undefined, + checkpoint: snapshot.checkpoint ?? undefined, completedWaitpoints: snapshot.completedWaitpoints, backgroundWorker: { id: result.worker.id, diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index a224e5a86b..d615c066b8 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -518,6 +518,27 @@ export class ExecutionSnapshotSystem { return executionResultFromSnapshot(latestSnapshot); } + /** + * Enqueues a heartbeat job for a snapshot if the execution status requires one. + * Use this after nesting a snapshot create inside a taskRun.update() to replicate + * the heartbeat side effect that createExecutionSnapshot normally handles. + */ + public async enqueueHeartbeatIfNeeded(snapshot: { + id: string; + runId: string; + executionStatus: TaskRunExecutionStatus; + }) { + const intervalMs = this.#getHeartbeatIntervalMs(snapshot.executionStatus); + if (intervalMs !== null) { + await this.$.worker.enqueue({ + id: `heartbeatSnapshot.${snapshot.runId}`, + job: "heartbeatSnapshot", + payload: { snapshotId: snapshot.id, runId: snapshot.runId }, + availableAt: new Date(Date.now() + intervalMs), + }); + } + } + #getHeartbeatIntervalMs(status: TaskRunExecutionStatus): number | null { switch (status) { case "PENDING_EXECUTING": {