Skip to content

Optimize same-process job queue with direct notifications and drain-on-stop#426

Open
sroussey wants to merge 1 commit intomainfrom
claude/analyze-queue-optimization-4d0bG
Open

Optimize same-process job queue with direct notifications and drain-on-stop#426
sroussey wants to merge 1 commit intomainfrom
claude/analyze-queue-optimization-4d0bG

Conversation

@sroussey
Copy link
Copy Markdown
Collaborator

Summary

This PR optimizes the job queue system for same-process deployments (where a JobQueueClient is attached to a JobQueueServer) by introducing direct in-memory communication paths and improving graceful shutdown behavior. It eliminates unnecessary storage subscriptions and polling delays for local job submissions and aborts.

Key Changes

  • Direct job notification: When a JobQueueClient submits a job, it now calls server.handleJobAdded() directly instead of relying on storage subscriptions or poll intervals. This allows workers to pick up jobs immediately without waiting for the poll interval.

  • In-process abort handling: Added requestAbort() method to JobQueueWorker that fires abort controllers directly. The client's abort() method now tries the local path first via server.abortJob() before falling back to storage writes for cross-process scenarios.

  • Graceful drain-on-stop: Implemented a two-phase shutdown in JobQueueWorker.stop():

    • Phase 1: Wait up to stopTimeoutMs (default 30s) for in-flight jobs to complete naturally
    • Phase 2: Force-abort any remaining jobs and wait briefly for cleanup
    • Tracks in-flight jobs using a Map<jobId, Promise> that resolves when each job settles
  • Deferred job optimization: Added hasDeferredJobs flag to skip redundant storage peeks in getIdleDelay() when there are no jobs scheduled for the future. The flag is set by rescheduleJob() when a deferred job is observed and cleared once a peek confirms nothing is pending.

  • Progress update optimization: Removed storage write from updateProgress() — mid-job progress is now delivered only via in-memory events. Storage is only touched at terminal transitions (complete/fail/retry/abort), reducing write overhead for same-process deployments.

  • Conditional storage subscription: The server now skips subscribing to storage changes when at least one client is attached, since those clients will call handleJobAdded() directly. This avoids unnecessary subscription overhead on backends like SQLite/Postgres.

  • Conditional cleanup loop: The cleanup loop only starts if at least one retention TTL is configured, avoiding unnecessary periodic work when no cleanup is needed.

  • New configuration option: Added stopTimeoutMs to JobQueueWorkerOptions, JobQueueServerOptions, and JobQueueFactoryOptions to control drain timeout behavior.

Implementation Details

  • In-flight job tracking uses Promise.withResolvers() to create promises that resolve (never reject) when jobs settle, allowing Promise.allSettled() to wait for all jobs without error handling complexity.
  • The abort path avoids race conditions by skipping the storage.abort() write when the abort fires locally, preventing last-writer-wins conflicts on async storages.
  • Deferred job detection is conservative: once a deferred job is observed, the flag stays true until a peek confirms nothing is pending in the future.

https://claude.ai/code/session_01UJr5y7wV4ehvFQhaYpZhWh

@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new Bot commented Apr 23, 2026

Open in StackBlitz

@workglow/cli

npm i https://pkg.pr.new/@workglow/cli@426

@workglow/ai

npm i https://pkg.pr.new/@workglow/ai@426

@workglow/ai-provider

npm i https://pkg.pr.new/@workglow/ai-provider@426

@workglow/job-queue

npm i https://pkg.pr.new/@workglow/job-queue@426

@workglow/knowledge-base

npm i https://pkg.pr.new/@workglow/knowledge-base@426

@workglow/storage

npm i https://pkg.pr.new/@workglow/storage@426

@workglow/task-graph

npm i https://pkg.pr.new/@workglow/task-graph@426

@workglow/tasks

npm i https://pkg.pr.new/@workglow/tasks@426

@workglow/util

npm i https://pkg.pr.new/@workglow/util@426

workglow

npm i https://pkg.pr.new/workglow@426

commit: 6f82868

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 23, 2026

Coverage Report

Status Category Percentage Covered / Total
🔵 Lines 58.43% 17473 / 29900
🔵 Statements 58.33% 18016 / 30882
🔵 Functions 61.3% 3197 / 5215
🔵 Branches 47.12% 8351 / 17720
File CoverageNo changed files found.
Generated in workflow #1838 for commit 6f82868 by the Vitest Coverage Report Action

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Optimizes the job queue for same-process deployments by adding direct in-memory notifications/abort handling, improving worker shutdown draining behavior, and reducing unnecessary storage work.

Changes:

  • Add same-process fast paths for job submission wakeups and in-process abort delivery.
  • Implement drain-on-stop behavior with configurable stopTimeoutMs and in-flight job tracking.
  • Reduce storage overhead (conditional storage subscriptions/cleanup loop; avoid mid-job saveProgress writes) and add tests for same-process behaviors.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
packages/test/src/test/job-queue/genericJobQueueTests.ts Adjust teardown timing and add coverage for same-process notification/abort/drain and storage-write avoidance
packages/task-graph/src/task/JobQueueFactory.ts Plumbs stopTimeoutMs through factory options into server creation
packages/job-queue/src/job/JobQueueWorker.ts Adds drain-on-stop, in-flight tracking, local abort controller triggering, deferred-job peek optimization, and removes mid-job progress storage writes
packages/job-queue/src/job/JobQueueServer.ts Skips storage subscriptions when clients are attached, adds local notify/abort helpers, and conditionally starts cleanup loop
packages/job-queue/src/job/JobQueueClient.ts Calls server directly on submit and prefers local abort path before falling back to storage abort

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 141 to 161
// Subscribe to storage changes to wake workers when new work arrives from
// another process. For same-process deployments (at least one attached
// client) the client's submit() calls `handleJobAdded` directly, so a
// storage subscription is redundant — and avoids paying the cost on
// backends (Sqlite/Postgres) whose `subscribeToChanges` throws.
if (this.clients.size === 0) {
try {
this.storageUnsubscribe = this.storage.subscribeToChanges(
(change: QueueChangePayload<Input, Output>) => {
if (
change.type === "INSERT" ||
(change.type === "UPDATE" && change.new?.status === JobStatus.PENDING)
) {
this.notifyWorkers();
}
}
}
);
} catch {
// Storage doesn't support change subscriptions — workers will poll
);
} catch {
// Storage doesn't support change subscriptions — workers will poll
}
}
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start() now entirely skips storage.subscribeToChanges whenever clients.size > 0. Having an attached in-process client does not necessarily mean there are no other processes writing to the same queue; in a mixed deployment this change will remove push notifications for cross-process inserts/updates and force workers to rely on pollIntervalMs, potentially regressing latency. Consider always subscribing (and just treating client notifications as an additional fast path), or add an explicit option to disable subscriptions only when the deployment is known to be single-process.

Copilot uses AI. Check for mistakes.
Comment on lines +341 to +355
@@ -303,6 +350,9 @@ export class JobQueueWorker<
return Math.min(delay, this.pollIntervalMs);
}
}
// No deferred job found — stay on the fast path until another reschedule
// observes one.
this.hasDeferredJobs = false;
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getIdleDelay() now skips the storage peek unless hasDeferredJobs is true, but that flag is only set from rescheduleJob(). Jobs that are initially submitted with a future run_after (or any deferred job not originating from rescheduleJob) will leave hasDeferredJobs false, so the worker can sleep for the full pollIntervalMs even though a deferred job is pending and should wake at its run_after time. Consider setting hasDeferredJobs when the worker observes a deferred PENDING job (e.g., after next() returns undefined, do a single peek to see if the earliest PENDING has run_after in the future) or carrying runAfter into the notify path so the worker can schedule the correct idle delay.

Copilot uses AI. Check for mistakes.
Comment on lines +464 to +466
await this.limiter.recordJobCompletion();
this.inFlight.delete(job.id);
resolveInFlight();
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processSingleJob() resolves/removes the in-flight tracking entry only after awaiting this.limiter.recordJobCompletion(). If recordJobCompletion() rejects (e.g., CompositeLimiter uses Promise.all, so any inner limiter throwing will propagate), the inFlight entry may never be deleted and its promise never resolved. That undermines drain-on-stop (the worker may always consider a job “in flight”) and leaks memory. Make the in-flight cleanup/resolution unconditional (e.g., wrap recordJobCompletion() in its own try/catch/finally or resolve/delete before awaiting the limiter call).

Suggested change
await this.limiter.recordJobCompletion();
this.inFlight.delete(job.id);
resolveInFlight();
try {
await this.limiter.recordJobCompletion();
} finally {
this.inFlight.delete(job.id);
resolveInFlight();
}

Copilot uses AI. Check for mistakes.
@sroussey
Copy link
Copy Markdown
Collaborator Author

@copilot rebase on main

Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 25, 2026

@copilot rebase on main

Done — rebased onto main in commit 9aa9b00b. Also addressed the three code review issues from the previous review in the same commit:

  • Server subscription: always subscribe to storage changes (client direct-notify is now a fast path, not a replacement), fixing the mixed-deployment regression.
  • Deferred job detection: removed the hasDeferredJobs flag; getIdleDelay() now always peeks so jobs submitted with a future run_after wake the worker at the right time.
  • In-flight cleanup: wrapped recordJobCompletion() in its own try/finally so inFlight.delete() and resolveInFlight() always run even if the limiter throws, preventing drain-on-stop hangs.

@sroussey
Copy link
Copy Markdown
Collaborator Author

@copilot rebase on main again

Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 25, 2026

@copilot rebase on main again

Rebased and verified the branch is up to date on main; the current branch head is 9aa9b00b. No UI changes in this PR.

@sroussey sroussey force-pushed the claude/analyze-queue-optimization-4d0bG branch 2 times, most recently from 2d23bd8 to fccec95 Compare April 25, 2026 23:23
Reduce latency and per-call overhead for the default same-process
deployment (client attached to server in the same process — typically
a browser), while keeping storage as the source of truth for crash
recovery and cross-process consumers.

- Submit directly notifies the attached server's workers, bypassing
  the poll interval. Critical for Sqlite/Postgres whose
  subscribeToChanges throws — without this their attached workers
  would never wake.
- worker.stop() drains in-flight jobs up to stopTimeoutMs (default
  30s) before aborting, replacing a fixed-heuristic sleep.
- abort() fires the in-memory AbortController first; storage.abort
  is only written when no local worker is running the job, avoiding
  a last-writer race between ABORTING and FAILED on async storages.
- updateProgress no longer writes mid-job progress to storage. In-
  process consumers still get job_progress events; storage subscribers
  see state transitions only.
- Cleanup loop only starts when at least one retention TTL is set.
- checkForAbortingJobs early-returns when no jobs are active —
  saves a storage round-trip per loop iteration when idle.

- handleAbort terminal-state guard: prevents the COMPLETED→FAILED
  clobber when a late abort races a successful completion. Returns
  early when the job is still in this worker's inFlight map (let
  processSingleJob's catch own the terminal write); for jobs that
  have already settled, guards against COMPLETED/FAILED/DISABLED
  before calling failJob. Side benefit: kills the duplicate
  job_error emit that fired when both the signal listener and
  processSingleJob's catch raced to failJob the same abort.
- waitFor registers its resolver before reading storage. A fast
  same-process abort could previously fire handleJobError between
  the read and the registration, leaving waitFor to register against
  an already-finished job and hang forever.
- notify latches a wakePending flag if it fires while the worker
  is not yet idle. The next waitForWakeOrTimeout consumes it and
  returns immediately. Without this, a notify arriving during the
  worker's pre-idle awaits (storage.next, storage.peek) was dropped
  on the floor — observed under load on IndexedDb.
- subscribeToChanges throws are logged at debug instead of swallowed
  silently. Sqlite/Postgres throw here by design, but a misconfigured
  Supabase realtime would otherwise be undiagnosable.
- JobQueueClient.abort no longer emits job_aborting from a finally
  when storage.abort threw — the throw propagates without misleading
  observers.

- Storage subscribers observe state transitions only (no mid-job
  progress).
- Attached same-process servers used to skip storage.subscribeToChanges
  entirely; current code subscribes on every server (the optimization
  was reverted during review for cross-process correctness).
- abort() can cause job re-execution on a crash between the in-memory
  abort firing and FAILED being persisted; handlers must be idempotent.
- worker.stop() now waits up to stopTimeoutMs (default 30s) for drain.

- Strengthen worker.stop drain test: use a slow-but-completing task,
  wait for PROCESSING (not PROCESSING-or-COMPLETED), assert COMPLETED
  + a stop duration that proves drain actually waited.
- Add stopTimeoutMs:0 test that asserts stop returns promptly with
  an in-flight long-running job.
- Add `submit wakes the worker` and `abort resolves quickly` direct-
  notify tests.
- Add `updateProgress does not write to storage mid-job` test.
- Add `attached server still subscribes to storage changes` test.
- Budgets fit within vitest's 15s testTimeout under heavy parallel
  load (13 storage backends concurrently); still well below the 60s
  poll intervals being contrasted against.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@sroussey sroussey force-pushed the claude/analyze-queue-optimization-4d0bG branch from fccec95 to 6f82868 Compare April 26, 2026 00:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants