Add queued Omnidreams video service#335
Conversation
Greptile SummaryThis PR introduces a queued HTTP service (
Confidence Score: 3/5The service logic is correct for the happy path, but a failed startup leaves the cleanup handler in a broken state that raises a secondary exception, making the root cause harder to diagnose. The new server.py has a concrete crash path: if the in-process runner fails to preload, aiohttp's cleanup calls _stop_worker, which unconditionally dereferences app["worker_task"] — a key that was never written — raising KeyError and masking the real startup error. The in-place runner config mutation is safe today but is one missed reset away from cross-job state contamination as the described overlay fields get added. The transformer string-match fallback is fragile but lower-stakes. The runner and client changes are straightforward and low-risk. integrations/omnidreams/omnidreams/service/server.py — specifically the create_app initialisation, _stop_worker, and _run_job_in_process functions. Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant Client
participant Server as aiohttp Server
participant Queue as asyncio.Queue
participant Worker as Single Worker Task
participant Runner as OmnidreamsRunner (in_process)
Client->>Server: POST /generate (multipart)
Server->>Server: "Save uploaded files to work_dir/{job_id}/inputs/"
Server->>Server: runner_layout() → validate + compute total_blocks
Server->>Queue: queue.put(GenerationJob)
Server->>Server: await asyncio.shield(job.future)
Worker->>Queue: "job = await queue.get()"
Worker->>Runner: asyncio.to_thread(runner.run)
Note over Worker,Runner: Mutates runner.config in-place before each job
Runner-->>Worker: writes MP4 to output_dir
Worker->>Worker: job.future.set_result(output_path)
Server-->>Client: FileResponse (MP4 + X-Job-Id header)
Client->>Server: GET /health
Server-->>Client: "{ok, loaded, queued, current_job_id}"
Client->>Server: GET /queue
Server-->>Client: "{jobs: [...job summaries]}"
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant Client
participant Server as aiohttp Server
participant Queue as asyncio.Queue
participant Worker as Single Worker Task
participant Runner as OmnidreamsRunner (in_process)
Client->>Server: POST /generate (multipart)
Server->>Server: "Save uploaded files to work_dir/{job_id}/inputs/"
Server->>Server: runner_layout() → validate + compute total_blocks
Server->>Queue: queue.put(GenerationJob)
Server->>Server: await asyncio.shield(job.future)
Worker->>Queue: "job = await queue.get()"
Worker->>Runner: asyncio.to_thread(runner.run)
Note over Worker,Runner: Mutates runner.config in-place before each job
Runner-->>Worker: writes MP4 to output_dir
Worker->>Worker: job.future.set_result(output_path)
Server-->>Client: FileResponse (MP4 + X-Job-Id header)
Client->>Server: GET /health
Server-->>Client: "{ok, loaded, queued, current_job_id}"
Client->>Server: GET /queue
Server-->>Client: "{jobs: [...job summaries]}"
Reviews (1): Last reviewed commit: "Merge branch 'main' into dev/gtong/omnid..." | Re-trigger Greptile |
| async def _start_worker(app: web.Application) -> None: | ||
| config: ServiceConfig = app["config"] | ||
| if config.execution_mode == "in_process": | ||
| try: | ||
| app["runner_config"] = resolve_runner_config( | ||
| config.runner_name, | ||
| checkpoint_path=config.checkpoint_path, | ||
| device=config.device, | ||
| ) | ||
| logger.info(f"preloading Omnidreams runner {config.runner_name!r}") | ||
| app["runner"] = app["runner_config"].setup() | ||
| logger.info(f"preloaded Omnidreams runner {config.runner_name!r}") | ||
| except Exception as exc: | ||
| app["startup_error"] = str(exc) | ||
| raise | ||
| app["worker_task"] = asyncio.create_task(_worker(app)) |
There was a problem hiding this comment.
KeyError on worker_task when in-process startup fails
If _start_worker raises during runner preload (lines 441–451), execution exits before app["worker_task"] is set. aiohttp still invokes _stop_worker during cleanup, which unconditionally accesses app["worker_task"] on line 637 — producing a KeyError that masks the real startup error. Initializing app["worker_task"] = None in create_app and guarding _stop_worker with a None check would fix this.
| app["runner"] = None | ||
| app["runner_config"] = None | ||
| app["startup_error"] = None |
There was a problem hiding this comment.
Initialize
worker_task alongside the other app keys so _stop_worker can check it safely even if _start_worker raises before reaching asyncio.create_task.
| app["runner"] = None | |
| app["runner_config"] = None | |
| app["startup_error"] = None | |
| app["runner"] = None | |
| app["runner_config"] = None | |
| app["startup_error"] = None | |
| app["worker_task"] = None |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| async def _stop_worker(app: web.Application) -> None: | ||
| task: asyncio.Task[None] = app["worker_task"] | ||
| task.cancel() | ||
| try: | ||
| await task | ||
| except asyncio.CancelledError: | ||
| pass |
There was a problem hiding this comment.
Guard
_stop_worker against the case where startup raised before the task was created — otherwise cleanup raises a confusing KeyError on top of the original error.
| async def _stop_worker(app: web.Application) -> None: | |
| task: asyncio.Task[None] = app["worker_task"] | |
| task.cancel() | |
| try: | |
| await task | |
| except asyncio.CancelledError: | |
| pass | |
| async def _stop_worker(app: web.Application) -> None: | |
| task: asyncio.Task[None] | None = app.get("worker_task") | |
| if task is None: | |
| return | |
| task.cancel() | |
| try: | |
| await task | |
| except asyncio.CancelledError: | |
| pass |
| async def _run_job_in_process(*, app: web.Application, job: GenerationJob) -> Path: | ||
| runner = app["runner"] | ||
| if not isinstance(runner, OmnidreamsRunner): | ||
| raise RuntimeError("resident Omnidreams runner is not loaded") | ||
|
|
||
| cfg = runner.config | ||
| assert isinstance(cfg, OmnidreamsRunnerConfig) | ||
| cfg.prompt = job.prompt | ||
| cfg.prompts = () | ||
| cfg.first_frame_paths = job.first_frame_paths | ||
| cfg.hdmap_video_paths = job.hdmap_video_paths | ||
| cfg.total_blocks = job.total_blocks | ||
| cfg.output_dir = job.output_dir | ||
| cfg.release_oneshot_encoders_after_run = False | ||
|
|
||
| logger.info( | ||
| f"[{job.job_id}] running resident Omnidreams runner " | ||
| f"{job.runner_name!r} total_blocks={job.total_blocks}" | ||
| ) | ||
| await asyncio.to_thread(runner.run) | ||
|
|
||
| expected = job.output_dir / f"{job.runner_name}.mp4" | ||
| if expected.exists(): | ||
| return expected | ||
| candidates = sorted(job.output_dir.glob("*.mp4")) | ||
| if candidates: | ||
| return candidates[0] | ||
| raise FileNotFoundError(f"resident runner wrote no MP4 in {job.output_dir}") |
There was a problem hiding this comment.
Shared runner config mutated in-place between jobs
_run_job_in_process writes directly into runner.config before every run. This works safely today because the single-worker FIFO queue serialises execution and all six job-scoped fields are explicitly reset per job. The fragility is that the PR description already describes adding inferenced_video_only and hdmap_overlay fields to the runner — any newly introduced field that is conditionally set (e.g. only when True) will silently carry its value from the previous job into the next one if the writer forgets to include a reset. Creating a per-job derived config copy (via derive_config) would make the pattern safe by default.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| jobs: dict[str, GenerationJob] = request.app["jobs"] | ||
| jobs[job_id] = job | ||
| await request.app["queue"].put(job) |
There was a problem hiding this comment.
app["jobs"] dict grows without bound
Completed and failed GenerationJob objects are added to app["jobs"] but never evicted. For a long-lived service that processes many requests, this is a memory leak. Consider capping the history (e.g. keeping only the last N entries) or pruning jobs older than some TTL after finished_at is set.
| except ValueError as exc: | ||
| if "Model must be provided for distributed checkpoint loading" not in str( | ||
| exc | ||
| ): | ||
| raise | ||
| load_checkpoint(config.checkpoint_path, model=self.network) |
There was a problem hiding this comment.
Fragile error-message string match for distributed checkpoint fallback
The fallback path is triggered by matching the substring "Model must be provided for distributed checkpoint loading" inside a caught ValueError. If load_checkpoint's error message is ever rephrased, the fallback silently stops working and the original ValueError propagates as if the feature never existed. A more stable approach would be to pass a sentinel or probe the checkpoint format explicitly, or at minimum log a warning when this fallback activates so it can be observed in production.
Add queued OmniDreams video service and HDMap overlay outputs
Summary
omnidreams-video-serviceHTTP server for queued OmniDreams rollouts from a prompt, first frame, and HDMap MP4.omnidreams-video-clientmultipart upload helper and service API discovery atGET /api.What Changed
Queued video service
omnidreams.service.serverwith:POST /generatefor multipart prompt, first-frame image, and HDMap MP4 uploads.GET /healthfor service/queue/model-load state.GET /queuefor current and recent job state.in_process: preload one residentOmnidreamsRunner.subprocess: spawnflashdreams-runper job.total_blocksfrom the shortest uploaded HDMap video and the selected runner'slen_t.omnidreams-video-serviceomnidreams-video-clientAPI discovery and client options
GET /apito advertise request fields, response headers, and route shapes.inferenced_video_onlysupport to service requests and the client CLI.--prompt--first-frame-paths--hdmap-video-paths--total-blocks--inferenced-video-only--hdmap-overlay--hdmap-overlay-alpha--hdmap-overlay-black-threshold--output-dirHDMap overlay output
hdmap_overlayhdmap_overlay_alphahdmap_overlay_black_thresholdinferenced_video_onlyandhdmap_overlay./api.Runner/service robustness
release_oneshot_encoders_after_runso the standalone runner can free encoder VRAM after cache initialization, while the resident service can keep encoders warm across jobs.Usage
Start the service:
Inspect available APIs:
Generate the default comparison video:
uv run --package flashdreams-omnidreams omnidreams-video-client \ --url http://127.0.0.1:8090/generate \ --prompt "Driving through a city street" \ --first-frame /path/to/first_frame.png \ --hdmap-video /path/to/hdmap.mp4 \ --output /path/to/omnidreams-output.mp4Generate only the inferred RGB video:
uv run --package flashdreams-omnidreams omnidreams-video-client \ --url http://127.0.0.1:8090/generate \ --prompt "Driving through a city street" \ --first-frame /path/to/first_frame.png \ --hdmap-video /path/to/hdmap.mp4 \ --output /path/to/omnidreams-rgb.mp4 \ --inferenced-video-onlyGenerate RGB with HDMap raster overlay:
uv run --package flashdreams-omnidreams omnidreams-video-client \ --url http://127.0.0.1:8090/generate \ --prompt "Driving through a city street" \ --first-frame /path/to/first_frame.png \ --hdmap-video /path/to/hdmap.mp4 \ --output /path/to/omnidreams-overlay.mp4 \ --hdmap-overlay \ --hdmap-overlay-alpha 0.5 \ --hdmap-overlay-black-threshold 8Equivalent overlay
curlrequest:Testing
Result:
Notes