Skip to content

Add queued Omnidreams video service#335

Open
gtong-nv wants to merge 2 commits into
mainfrom
dev/gtong/omnidreams-serving
Open

Add queued Omnidreams video service#335
gtong-nv wants to merge 2 commits into
mainfrom
dev/gtong/omnidreams-serving

Conversation

@gtong-nv

@gtong-nv gtong-nv commented Jun 12, 2026

Copy link
Copy Markdown
Collaborator

Add queued OmniDreams video service and HDMap overlay outputs

Summary

  • Add an omnidreams-video-service HTTP server for queued OmniDreams rollouts from a prompt, first frame, and HDMap MP4.
  • Add an omnidreams-video-client multipart upload helper and service API discovery at GET /api.
  • Add output layout options for generated-only video and HDMap raster overlay video, including alpha and black-background threshold controls.
  • Keep the default runner output compatible with existing quality workflows: HDMap conditioning stacked above generated RGB.

What Changed

Queued video service

  • Adds omnidreams.service.server with:
    • POST /generate for multipart prompt, first-frame image, and HDMap MP4 uploads.
    • GET /health for service/queue/model-load state.
    • GET /queue for current and recent job state.
    • Single-worker FIFO execution so concurrent requests serialize GPU-heavy rollouts.
  • Supports both execution modes:
    • in_process: preload one resident OmnidreamsRunner.
    • subprocess: spawn flashdreams-run per job.
  • Derives total_blocks from the shortest uploaded HDMap video and the selected runner's len_t.
  • Expands a single first frame or HDMap upload across all views, or validates per-view uploads.
  • Adds entry points:
    • omnidreams-video-service
    • omnidreams-video-client

API discovery and client options

  • Adds GET /api to advertise request fields, response headers, and route shapes.
  • Adds inferenced_video_only support to service requests and the client CLI.
  • Adds service-managed runner forwarding for:
    • --prompt
    • --first-frame-paths
    • --hdmap-video-paths
    • --total-blocks
    • --inferenced-video-only
    • --hdmap-overlay
    • --hdmap-overlay-alpha
    • --hdmap-overlay-black-threshold
    • --output-dir

HDMap overlay output

  • Adds OmniDreams runner options:
    • hdmap_overlay
    • hdmap_overlay_alpha
    • hdmap_overlay_black_threshold
  • Blends visible HDMap raster pixels over generated RGB while treating black HDMap background pixels as transparent.
  • Performs overlay formatting with vectorized torch tensor ops while the chunk is still on-device, then moves the final canvas to CPU for MP4 writing.
  • Rejects mutually exclusive output modes: inferenced_video_only and hdmap_overlay.
  • Advertises overlay request fields and response headers through /api.

Runner/service robustness

  • Adds release_oneshot_encoders_after_run so the standalone runner can free encoder VRAM after cache initialization, while the resident service can keep encoders warm across jobs.
  • Allows OmniDreams transformer checkpoint loading to fall back to model-aware distributed checkpoint loading when required.
  • Validates overlay alpha and black threshold at service parse time so bad multipart requests return HTTP 400.

Usage

Start the service:

uv run --package flashdreams-omnidreams omnidreams-video-service \
  --host 0.0.0.0 \
  --port 8090

Inspect available APIs:

curl http://127.0.0.1:8090/api

Generate the default comparison video:

uv run --package flashdreams-omnidreams omnidreams-video-client \
  --url http://127.0.0.1:8090/generate \
  --prompt "Driving through a city street" \
  --first-frame /path/to/first_frame.png \
  --hdmap-video /path/to/hdmap.mp4 \
  --output /path/to/omnidreams-output.mp4

Generate only the inferred RGB video:

uv run --package flashdreams-omnidreams omnidreams-video-client \
  --url http://127.0.0.1:8090/generate \
  --prompt "Driving through a city street" \
  --first-frame /path/to/first_frame.png \
  --hdmap-video /path/to/hdmap.mp4 \
  --output /path/to/omnidreams-rgb.mp4 \
  --inferenced-video-only

Generate RGB with HDMap raster overlay:

uv run --package flashdreams-omnidreams omnidreams-video-client \
  --url http://127.0.0.1:8090/generate \
  --prompt "Driving through a city street" \
  --first-frame /path/to/first_frame.png \
  --hdmap-video /path/to/hdmap.mp4 \
  --output /path/to/omnidreams-overlay.mp4 \
  --hdmap-overlay \
  --hdmap-overlay-alpha 0.5 \
  --hdmap-overlay-black-threshold 8

Equivalent overlay curl request:

curl -X POST http://127.0.0.1:8090/generate \
  -F 'prompt=Driving through a city street' \
  -F 'first_frame=@/path/to/first_frame.png' \
  -F 'hdmap_video=@/path/to/hdmap.mp4' \
  -F 'hdmap_overlay=true' \
  -F 'hdmap_overlay_alpha=0.5' \
  -F 'hdmap_overlay_black_threshold=8' \
  -o /path/to/omnidreams-overlay.mp4

Testing

uv run --project integrations/omnidreams pytest \
  integrations/omnidreams/tests/test_runner_output_overlay.py \
  integrations/omnidreams/tests/test_video_service.py \
  integrations/omnidreams/tests/test_recipe_configs.py -q

Result:

19 passed

Notes

  • GPU rollout/generation was not run as part of this PR validation.
  • Existing default MP4 layout is preserved for quality-regression references that expect HDMap stacked above generated frames.
  • Existing unrelated untracked local files are not part of this PR.

@copy-pr-bot

copy-pr-bot Bot commented Jun 12, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@gtong-nv gtong-nv marked this pull request as ready for review June 17, 2026 00:17
@greptile-apps

greptile-apps Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces a queued HTTP service (omnidreams-video-service) for OmniDreams video generation, supporting both in-process and subprocess execution modes, plus a matching multipart upload client. It also adds a release_oneshot_encoders_after_run flag to the runner config and a distributed-checkpoint fallback in the transformer loader.

  • service/server.py: New aiohttp app with POST /generate (multipart upload → FIFO queue → MP4 response), GET /health, and GET /queue; single-worker asyncio task serialises GPU-heavy rollouts. Contains a cleanup bug: app["worker_task"] is never initialised in create_app, so a failed in-process startup causes _stop_worker to raise KeyError on top of the real error. The shared runner config is also mutated in-place per job — safe today but will need explicit resets as the described overlay/inference-only fields are wired in.
  • runner.py / transformer/__init__.py: Small, targeted changes to support long-lived service usage (release_oneshot_encoders_after_run) and a distributed checkpoint fallback; the latter relies on an error-message substring match that would silently break if the upstream message changes.

Confidence Score: 3/5

The service logic is correct for the happy path, but a failed startup leaves the cleanup handler in a broken state that raises a secondary exception, making the root cause harder to diagnose.

The new server.py has a concrete crash path: if the in-process runner fails to preload, aiohttp's cleanup calls _stop_worker, which unconditionally dereferences app["worker_task"] — a key that was never written — raising KeyError and masking the real startup error. The in-place runner config mutation is safe today but is one missed reset away from cross-job state contamination as the described overlay fields get added. The transformer string-match fallback is fragile but lower-stakes. The runner and client changes are straightforward and low-risk.

integrations/omnidreams/omnidreams/service/server.py — specifically the create_app initialisation, _stop_worker, and _run_job_in_process functions.

Important Files Changed

Filename Overview
integrations/omnidreams/omnidreams/service/server.py New queued HTTP service with a KeyError during cleanup when startup fails, plus unbounded job history and in-place runner config mutation that will become a stale-state risk as new output flags are added.
integrations/omnidreams/omnidreams/runner.py Adds release_oneshot_encoders_after_run flag (default True) guarding an existing release_oneshot_encoders() call; clean and backward-compatible.
integrations/omnidreams/omnidreams/service/client.py New multipart upload client; straightforward aiohttp usage with ExitStack for file handles. No issues found.
integrations/omnidreams/omnidreams/transformer/init.py Adds a try/except ValueError fallback to distributed checkpoint loading; the string-match heuristic is fragile if the upstream error message changes.
integrations/omnidreams/tests/test_video_service.py New unit tests covering frame-count math, view expansion, layout resolution, and command building; all test pure functions with no GPU dependency.
integrations/omnidreams/pyproject.toml Registers two new entry points (omnidreams-video-service, omnidreams-video-client); no issues.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Client
    participant Server as aiohttp Server
    participant Queue as asyncio.Queue
    participant Worker as Single Worker Task
    participant Runner as OmnidreamsRunner (in_process)

    Client->>Server: POST /generate (multipart)
    Server->>Server: "Save uploaded files to work_dir/{job_id}/inputs/"
    Server->>Server: runner_layout() → validate + compute total_blocks
    Server->>Queue: queue.put(GenerationJob)
    Server->>Server: await asyncio.shield(job.future)

    Worker->>Queue: "job = await queue.get()"
    Worker->>Runner: asyncio.to_thread(runner.run)
    Note over Worker,Runner: Mutates runner.config in-place before each job
    Runner-->>Worker: writes MP4 to output_dir
    Worker->>Worker: job.future.set_result(output_path)

    Server-->>Client: FileResponse (MP4 + X-Job-Id header)

    Client->>Server: GET /health
    Server-->>Client: "{ok, loaded, queued, current_job_id}"

    Client->>Server: GET /queue
    Server-->>Client: "{jobs: [...job summaries]}"
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Client
    participant Server as aiohttp Server
    participant Queue as asyncio.Queue
    participant Worker as Single Worker Task
    participant Runner as OmnidreamsRunner (in_process)

    Client->>Server: POST /generate (multipart)
    Server->>Server: "Save uploaded files to work_dir/{job_id}/inputs/"
    Server->>Server: runner_layout() → validate + compute total_blocks
    Server->>Queue: queue.put(GenerationJob)
    Server->>Server: await asyncio.shield(job.future)

    Worker->>Queue: "job = await queue.get()"
    Worker->>Runner: asyncio.to_thread(runner.run)
    Note over Worker,Runner: Mutates runner.config in-place before each job
    Runner-->>Worker: writes MP4 to output_dir
    Worker->>Worker: job.future.set_result(output_path)

    Server-->>Client: FileResponse (MP4 + X-Job-Id header)

    Client->>Server: GET /health
    Server-->>Client: "{ok, loaded, queued, current_job_id}"

    Client->>Server: GET /queue
    Server-->>Client: "{jobs: [...job summaries]}"
Loading

Reviews (1): Last reviewed commit: "Merge branch 'main' into dev/gtong/omnid..." | Re-trigger Greptile

Comment on lines +437 to +452
async def _start_worker(app: web.Application) -> None:
config: ServiceConfig = app["config"]
if config.execution_mode == "in_process":
try:
app["runner_config"] = resolve_runner_config(
config.runner_name,
checkpoint_path=config.checkpoint_path,
device=config.device,
)
logger.info(f"preloading Omnidreams runner {config.runner_name!r}")
app["runner"] = app["runner_config"].setup()
logger.info(f"preloaded Omnidreams runner {config.runner_name!r}")
except Exception as exc:
app["startup_error"] = str(exc)
raise
app["worker_task"] = asyncio.create_task(_worker(app))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 KeyError on worker_task when in-process startup fails

If _start_worker raises during runner preload (lines 441–451), execution exits before app["worker_task"] is set. aiohttp still invokes _stop_worker during cleanup, which unconditionally accesses app["worker_task"] on line 637 — producing a KeyError that masks the real startup error. Initializing app["worker_task"] = None in create_app and guarding _stop_worker with a None check would fix this.

Comment on lines +321 to +323
app["runner"] = None
app["runner_config"] = None
app["startup_error"] = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Initialize worker_task alongside the other app keys so _stop_worker can check it safely even if _start_worker raises before reaching asyncio.create_task.

Suggested change
app["runner"] = None
app["runner_config"] = None
app["startup_error"] = None
app["runner"] = None
app["runner_config"] = None
app["startup_error"] = None
app["worker_task"] = None

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment on lines +455 to +461
async def _stop_worker(app: web.Application) -> None:
task: asyncio.Task[None] = app["worker_task"]
task.cancel()
try:
await task
except asyncio.CancelledError:
pass

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Guard _stop_worker against the case where startup raised before the task was created — otherwise cleanup raises a confusing KeyError on top of the original error.

Suggested change
async def _stop_worker(app: web.Application) -> None:
task: asyncio.Task[None] = app["worker_task"]
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def _stop_worker(app: web.Application) -> None:
task: asyncio.Task[None] | None = app.get("worker_task")
if task is None:
return
task.cancel()
try:
await task
except asyncio.CancelledError:
pass

Comment on lines +526 to +553
async def _run_job_in_process(*, app: web.Application, job: GenerationJob) -> Path:
runner = app["runner"]
if not isinstance(runner, OmnidreamsRunner):
raise RuntimeError("resident Omnidreams runner is not loaded")

cfg = runner.config
assert isinstance(cfg, OmnidreamsRunnerConfig)
cfg.prompt = job.prompt
cfg.prompts = ()
cfg.first_frame_paths = job.first_frame_paths
cfg.hdmap_video_paths = job.hdmap_video_paths
cfg.total_blocks = job.total_blocks
cfg.output_dir = job.output_dir
cfg.release_oneshot_encoders_after_run = False

logger.info(
f"[{job.job_id}] running resident Omnidreams runner "
f"{job.runner_name!r} total_blocks={job.total_blocks}"
)
await asyncio.to_thread(runner.run)

expected = job.output_dir / f"{job.runner_name}.mp4"
if expected.exists():
return expected
candidates = sorted(job.output_dir.glob("*.mp4"))
if candidates:
return candidates[0]
raise FileNotFoundError(f"resident runner wrote no MP4 in {job.output_dir}")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Shared runner config mutated in-place between jobs

_run_job_in_process writes directly into runner.config before every run. This works safely today because the single-worker FIFO queue serialises execution and all six job-scoped fields are explicitly reset per job. The fragility is that the PR description already describes adding inferenced_video_only and hdmap_overlay fields to the runner — any newly introduced field that is conditionally set (e.g. only when True) will silently carry its value from the previous job into the next one if the writer forgets to include a reset. Creating a per-job derived config copy (via derive_config) would make the pattern safe by default.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment on lines +416 to +418
jobs: dict[str, GenerationJob] = request.app["jobs"]
jobs[job_id] = job
await request.app["queue"].put(job)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 app["jobs"] dict grows without bound

Completed and failed GenerationJob objects are added to app["jobs"] but never evicted. For a long-lived service that processes many requests, this is a memory leak. Consider capping the history (e.g. keeping only the last N entries) or pruning jobs older than some TTL after finished_at is set.

Comment on lines +312 to +317
except ValueError as exc:
if "Model must be provided for distributed checkpoint loading" not in str(
exc
):
raise
load_checkpoint(config.checkpoint_path, model=self.network)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Fragile error-message string match for distributed checkpoint fallback

The fallback path is triggered by matching the substring "Model must be provided for distributed checkpoint loading" inside a caught ValueError. If load_checkpoint's error message is ever rephrased, the fallback silently stops working and the original ValueError propagates as if the feature never existed. A more stable approach would be to pass a sentinel or probe the checkpoint format explicitly, or at minimum log a warning when this fallback activates so it can be observed in production.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant