-
Notifications
You must be signed in to change notification settings - Fork 6
Make review queue jobs resumable and lease-aware #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| ALTER TABLE jobs ADD COLUMN IF NOT EXISTS check_run_completed_at TIMESTAMPTZ; | ||
| ALTER TABLE jobs ADD COLUMN IF NOT EXISTS lease_owner TEXT; | ||
| ALTER TABLE jobs ADD COLUMN IF NOT EXISTS lease_expires_at TIMESTAMPTZ; | ||
| ALTER TABLE jobs ADD COLUMN IF NOT EXISTS heartbeat_at TIMESTAMPTZ; | ||
| ALTER TABLE jobs ADD COLUMN IF NOT EXISTS recovery_count INTEGER NOT NULL DEFAULT 0; | ||
| ALTER TABLE jobs ADD COLUMN IF NOT EXISTS last_queue_message_at TIMESTAMPTZ; | ||
| ALTER TABLE file_reviews ADD COLUMN IF NOT EXISTS transient_error_count INTEGER NOT NULL DEFAULT 0; | ||
|
|
||
| CREATE INDEX IF NOT EXISTS jobs_lease_expiry_idx | ||
| ON jobs (lease_expires_at) | ||
| WHERE status = 'running' AND lease_expires_at IS NOT NULL; | ||
|
|
||
| CREATE INDEX IF NOT EXISTS jobs_terminal_check_idx | ||
| ON jobs (status, check_run_completed_at) | ||
| WHERE check_run_id IS NOT NULL AND check_run_completed_at IS NULL; | ||
|
|
||
| CREATE INDEX IF NOT EXISTS jobs_unleased_running_idx | ||
| ON jobs (last_queue_message_at, heartbeat_at) | ||
| WHERE status = 'running' AND lease_expires_at IS NULL; | ||
|
|
||
| DELETE FROM file_reviews fr | ||
| USING ( | ||
| SELECT id, ROW_NUMBER() OVER (PARTITION BY job_id, file_path ORDER BY created_at ASC, id ASC) AS row_number | ||
| FROM file_reviews | ||
| ) ranked | ||
| WHERE fr.id = ranked.id | ||
| AND ranked.row_number > 1; | ||
|
|
||
| CREATE UNIQUE INDEX IF NOT EXISTS file_reviews_job_file_path_key | ||
| ON file_reviews (job_id, file_path); |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -39,12 +39,12 @@ const DEFAULT_GLOBAL_CONFIG: ModelRouteConfig = { | |||||
| ], | ||||||
| }; | ||||||
|
|
||||||
| function normalizeGlobalConfig(config: any): ModelRouteConfig { | ||||||
| export function normalizeGlobalConfig(config: any): ModelRouteConfig { | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The function 'normalizeGlobalConfig' uses 'any' for the 'config' parameter. This bypasses TypeScript's type checking and can lead to runtime errors if the input structure changes. It is better to use 'Partial' or 'unknown' with a type guard to ensure type safety.
Suggested change
|
||||||
| if (!config || !config.main) return DEFAULT_GLOBAL_CONFIG; | ||||||
| return { | ||||||
| main: config.main, | ||||||
| fallbacks: config.fallbacks?.length ? config.fallbacks : DEFAULT_GLOBAL_CONFIG.fallbacks, | ||||||
| size_overrides: config.size_overrides ?? DEFAULT_GLOBAL_CONFIG.size_overrides, | ||||||
| fallbacks: Array.isArray(config.fallbacks) ? config.fallbacks : DEFAULT_GLOBAL_CONFIG.fallbacks, | ||||||
| size_overrides: Array.isArray(config.size_overrides) ? config.size_overrides : DEFAULT_GLOBAL_CONFIG.size_overrides, | ||||||
| }; | ||||||
| } | ||||||
|
|
||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| import type { AppBindings } from '@server/env'; | ||
| import { getTerminalJobsNeedingCheckRunCompletion, markJobCheckRunCompleted, recoverExpiredJobLeases } from '@server/db/jobs'; | ||
| import { logger } from '@server/core/logger'; | ||
| import { GitHubService } from '@server/services/github'; | ||
|
|
||
| const MAX_RECOVERY_COUNT = 3; | ||
|
|
||
| export async function recoverJobs(env: AppBindings) { | ||
| try { | ||
| const recovered = await recoverExpiredJobLeases(env, MAX_RECOVERY_COUNT); | ||
| for (const jobId of recovered.requeuedJobIds) { | ||
| await env.REVIEW_QUEUE.send({ | ||
| jobId, | ||
| deliveryId: crypto.randomUUID(), | ||
| phase: 'review', | ||
| }); | ||
| } | ||
|
|
||
| if (recovered.requeuedJobIds.length > 0 || recovered.failedJobs.length > 0) { | ||
| logger.warn('Expired job leases recovered', { | ||
| requeued: recovered.requeuedJobIds.length, | ||
| failed: recovered.failedJobs.length, | ||
| }); | ||
| } | ||
| } catch (err) { | ||
| logger.error('Failed to recover expired job leases', err instanceof Error ? err : new Error(String(err))); | ||
| } | ||
| } | ||
|
|
||
| export async function completeTerminalCheckRuns(env: AppBindings) { | ||
| const jobs = await getTerminalJobsNeedingCheckRunCompletion(env); | ||
| for (const job of jobs) { | ||
| if (!job.check_run_id) continue; | ||
|
|
||
| try { | ||
| const github = new GitHubService(env, job.installation_id); | ||
| await github.updateCheckRun(job.owner, job.repo, job.check_run_id, { | ||
| status: 'completed', | ||
| conclusion: job.status === 'superseded' ? 'neutral' : 'failure', | ||
| title: job.status === 'superseded' ? 'Review superseded' : 'Review failed', | ||
| summary: job.error_msg ?? (job.status === 'superseded' ? 'Superseded by a newer commit or job.' : 'Review failed.'), | ||
| }); | ||
| await markJobCheckRunCompleted(env, job.id); | ||
| } catch (error) { | ||
| logger.error(`Failed to complete terminal check run for job ${job.id}`, error instanceof Error ? error : new Error(String(error))); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| export async function runOpportunisticJobMaintenance(env: AppBindings) { | ||
| await recoverJobs(env); | ||
| await completeTerminalCheckRuns(env); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function 'normalizeGlobalConfig' uses 'any' for the 'config' parameter. This bypasses TypeScript's type checking and can lead to runtime errors if the input structure changes. It is better to use 'Partial' or 'unknown' with a type guard to ensure type safety.