Optimize same-process job queue with direct notifications and drain-on-stop#426
Optimize same-process job queue with direct notifications and drain-on-stop#426
Conversation
@workglow/cli
@workglow/ai
@workglow/ai-provider
@workglow/job-queue
@workglow/knowledge-base
@workglow/storage
@workglow/task-graph
@workglow/tasks
@workglow/util
workglow
commit: |
Coverage Report
File CoverageNo changed files found. |
There was a problem hiding this comment.
Pull request overview
Optimizes the job queue for same-process deployments by adding direct in-memory notifications/abort handling, improving worker shutdown draining behavior, and reducing unnecessary storage work.
Changes:
- Add same-process fast paths for job submission wakeups and in-process abort delivery.
- Implement drain-on-stop behavior with configurable
stopTimeoutMsand in-flight job tracking. - Reduce storage overhead (conditional storage subscriptions/cleanup loop; avoid mid-job
saveProgresswrites) and add tests for same-process behaviors.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/test/src/test/job-queue/genericJobQueueTests.ts | Adjust teardown timing and add coverage for same-process notification/abort/drain and storage-write avoidance |
| packages/task-graph/src/task/JobQueueFactory.ts | Plumbs stopTimeoutMs through factory options into server creation |
| packages/job-queue/src/job/JobQueueWorker.ts | Adds drain-on-stop, in-flight tracking, local abort controller triggering, deferred-job peek optimization, and removes mid-job progress storage writes |
| packages/job-queue/src/job/JobQueueServer.ts | Skips storage subscriptions when clients are attached, adds local notify/abort helpers, and conditionally starts cleanup loop |
| packages/job-queue/src/job/JobQueueClient.ts | Calls server directly on submit and prefers local abort path before falling back to storage abort |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Subscribe to storage changes to wake workers when new work arrives from | ||
| // another process. For same-process deployments (at least one attached | ||
| // client) the client's submit() calls `handleJobAdded` directly, so a | ||
| // storage subscription is redundant — and avoids paying the cost on | ||
| // backends (Sqlite/Postgres) whose `subscribeToChanges` throws. | ||
| if (this.clients.size === 0) { | ||
| try { | ||
| this.storageUnsubscribe = this.storage.subscribeToChanges( | ||
| (change: QueueChangePayload<Input, Output>) => { | ||
| if ( | ||
| change.type === "INSERT" || | ||
| (change.type === "UPDATE" && change.new?.status === JobStatus.PENDING) | ||
| ) { | ||
| this.notifyWorkers(); | ||
| } | ||
| } | ||
| } | ||
| ); | ||
| } catch { | ||
| // Storage doesn't support change subscriptions — workers will poll | ||
| ); | ||
| } catch { | ||
| // Storage doesn't support change subscriptions — workers will poll | ||
| } | ||
| } |
There was a problem hiding this comment.
start() now entirely skips storage.subscribeToChanges whenever clients.size > 0. Having an attached in-process client does not necessarily mean there are no other processes writing to the same queue; in a mixed deployment this change will remove push notifications for cross-process inserts/updates and force workers to rely on pollIntervalMs, potentially regressing latency. Consider always subscribing (and just treating client notifications as an additional fast path), or add an explicit option to disable subscriptions only when the deployment is known to be single-process.
| @@ -303,6 +350,9 @@ export class JobQueueWorker< | |||
| return Math.min(delay, this.pollIntervalMs); | |||
| } | |||
| } | |||
| // No deferred job found — stay on the fast path until another reschedule | |||
| // observes one. | |||
| this.hasDeferredJobs = false; | |||
There was a problem hiding this comment.
getIdleDelay() now skips the storage peek unless hasDeferredJobs is true, but that flag is only set from rescheduleJob(). Jobs that are initially submitted with a future run_after (or any deferred job not originating from rescheduleJob) will leave hasDeferredJobs false, so the worker can sleep for the full pollIntervalMs even though a deferred job is pending and should wake at its run_after time. Consider setting hasDeferredJobs when the worker observes a deferred PENDING job (e.g., after next() returns undefined, do a single peek to see if the earliest PENDING has run_after in the future) or carrying runAfter into the notify path so the worker can schedule the correct idle delay.
| await this.limiter.recordJobCompletion(); | ||
| this.inFlight.delete(job.id); | ||
| resolveInFlight(); |
There was a problem hiding this comment.
processSingleJob() resolves/removes the in-flight tracking entry only after awaiting this.limiter.recordJobCompletion(). If recordJobCompletion() rejects (e.g., CompositeLimiter uses Promise.all, so any inner limiter throwing will propagate), the inFlight entry may never be deleted and its promise never resolved. That undermines drain-on-stop (the worker may always consider a job “in flight”) and leaks memory. Make the in-flight cleanup/resolution unconditional (e.g., wrap recordJobCompletion() in its own try/catch/finally or resolve/delete before awaiting the limiter call).
| await this.limiter.recordJobCompletion(); | |
| this.inFlight.delete(job.id); | |
| resolveInFlight(); | |
| try { | |
| await this.limiter.recordJobCompletion(); | |
| } finally { | |
| this.inFlight.delete(job.id); | |
| resolveInFlight(); | |
| } |
|
@copilot rebase on main |
Done — rebased onto main in commit
|
|
@copilot rebase on main again |
Rebased and verified the branch is up to date on |
2d23bd8 to
fccec95
Compare
Reduce latency and per-call overhead for the default same-process deployment (client attached to server in the same process — typically a browser), while keeping storage as the source of truth for crash recovery and cross-process consumers. - Submit directly notifies the attached server's workers, bypassing the poll interval. Critical for Sqlite/Postgres whose subscribeToChanges throws — without this their attached workers would never wake. - worker.stop() drains in-flight jobs up to stopTimeoutMs (default 30s) before aborting, replacing a fixed-heuristic sleep. - abort() fires the in-memory AbortController first; storage.abort is only written when no local worker is running the job, avoiding a last-writer race between ABORTING and FAILED on async storages. - updateProgress no longer writes mid-job progress to storage. In- process consumers still get job_progress events; storage subscribers see state transitions only. - Cleanup loop only starts when at least one retention TTL is set. - checkForAbortingJobs early-returns when no jobs are active — saves a storage round-trip per loop iteration when idle. - handleAbort terminal-state guard: prevents the COMPLETED→FAILED clobber when a late abort races a successful completion. Returns early when the job is still in this worker's inFlight map (let processSingleJob's catch own the terminal write); for jobs that have already settled, guards against COMPLETED/FAILED/DISABLED before calling failJob. Side benefit: kills the duplicate job_error emit that fired when both the signal listener and processSingleJob's catch raced to failJob the same abort. - waitFor registers its resolver before reading storage. A fast same-process abort could previously fire handleJobError between the read and the registration, leaving waitFor to register against an already-finished job and hang forever. - notify latches a wakePending flag if it fires while the worker is not yet idle. The next waitForWakeOrTimeout consumes it and returns immediately. Without this, a notify arriving during the worker's pre-idle awaits (storage.next, storage.peek) was dropped on the floor — observed under load on IndexedDb. - subscribeToChanges throws are logged at debug instead of swallowed silently. Sqlite/Postgres throw here by design, but a misconfigured Supabase realtime would otherwise be undiagnosable. - JobQueueClient.abort no longer emits job_aborting from a finally when storage.abort threw — the throw propagates without misleading observers. - Storage subscribers observe state transitions only (no mid-job progress). - Attached same-process servers used to skip storage.subscribeToChanges entirely; current code subscribes on every server (the optimization was reverted during review for cross-process correctness). - abort() can cause job re-execution on a crash between the in-memory abort firing and FAILED being persisted; handlers must be idempotent. - worker.stop() now waits up to stopTimeoutMs (default 30s) for drain. - Strengthen worker.stop drain test: use a slow-but-completing task, wait for PROCESSING (not PROCESSING-or-COMPLETED), assert COMPLETED + a stop duration that proves drain actually waited. - Add stopTimeoutMs:0 test that asserts stop returns promptly with an in-flight long-running job. - Add `submit wakes the worker` and `abort resolves quickly` direct- notify tests. - Add `updateProgress does not write to storage mid-job` test. - Add `attached server still subscribes to storage changes` test. - Budgets fit within vitest's 15s testTimeout under heavy parallel load (13 storage backends concurrently); still well below the 60s poll intervals being contrasted against. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
fccec95 to
6f82868
Compare
Summary
This PR optimizes the job queue system for same-process deployments (where a
JobQueueClientis attached to aJobQueueServer) by introducing direct in-memory communication paths and improving graceful shutdown behavior. It eliminates unnecessary storage subscriptions and polling delays for local job submissions and aborts.Key Changes
Direct job notification: When a
JobQueueClientsubmits a job, it now callsserver.handleJobAdded()directly instead of relying on storage subscriptions or poll intervals. This allows workers to pick up jobs immediately without waiting for the poll interval.In-process abort handling: Added
requestAbort()method toJobQueueWorkerthat fires abort controllers directly. The client'sabort()method now tries the local path first viaserver.abortJob()before falling back to storage writes for cross-process scenarios.Graceful drain-on-stop: Implemented a two-phase shutdown in
JobQueueWorker.stop():stopTimeoutMs(default 30s) for in-flight jobs to complete naturallyMap<jobId, Promise>that resolves when each job settlesDeferred job optimization: Added
hasDeferredJobsflag to skip redundant storage peeks ingetIdleDelay()when there are no jobs scheduled for the future. The flag is set byrescheduleJob()when a deferred job is observed and cleared once a peek confirms nothing is pending.Progress update optimization: Removed storage write from
updateProgress()— mid-job progress is now delivered only via in-memory events. Storage is only touched at terminal transitions (complete/fail/retry/abort), reducing write overhead for same-process deployments.Conditional storage subscription: The server now skips subscribing to storage changes when at least one client is attached, since those clients will call
handleJobAdded()directly. This avoids unnecessary subscription overhead on backends like SQLite/Postgres.Conditional cleanup loop: The cleanup loop only starts if at least one retention TTL is configured, avoiding unnecessary periodic work when no cleanup is needed.
New configuration option: Added
stopTimeoutMstoJobQueueWorkerOptions,JobQueueServerOptions, andJobQueueFactoryOptionsto control drain timeout behavior.Implementation Details
Promise.withResolvers()to create promises that resolve (never reject) when jobs settle, allowingPromise.allSettled()to wait for all jobs without error handling complexity.storage.abort()write when the abort fires locally, preventing last-writer-wins conflicts on async storages.https://claude.ai/code/session_01UJr5y7wV4ehvFQhaYpZhWh