Skip to content

feat: implement async scheduling admission control#661

Open
eric-tramel wants to merge 11 commits into
epic/645-async-schedulingfrom
scheduling-yolo
Open

feat: implement async scheduling admission control#661
eric-tramel wants to merge 11 commits into
epic/645-async-schedulingfrom
scheduling-yolo

Conversation

@eric-tramel
Copy link
Copy Markdown
Contributor

📋 Summary

Implements the issue 645 async scheduling epic by splitting runtime control into explicit scheduler task admission and concrete model-request admission, with typed scheduling metadata, AIMD-backed request leases, capacity snapshots, and correlated observability. This PR also updates architecture/docs/Fern assets and includes benchmark evidence from live GPT-5.5 and GPT-5 Nano traffic.

🔗 Related Issue

Refs #645

🔄 Changes

✨ Added

  • Generator-facing SchedulingMetadata and validation in the config package.
  • Engine-side task scheduling bridge, fair ready queue views, and task admission leases.
  • ModelRequestExecutor and AdaptiveRequestAdmissionController for per-attempt provider/model/domain admission.
  • Scheduler/request observability events, runtime correlation, capacity snapshots, and a deterministic benchmark harness.
  • Focused unit/regression tests for scheduling metadata, task admission, fair queue behavior, request admission, model request execution, async scheduler behavior, and capacity reporting.
  • HTML benchmark/QA report plus condensed live benchmark summaries under artifacts/645-live-bench*.

🔧 Changed

  • Rewired the async scheduler so root and downstream work use the same ready queue, admission, task lease, worker spawn, and release flow.
  • Routed model clients through request admission and exposed request event sinks at the model-call boundary.
  • Updated architecture docs, devnotes, and Fern docs/assets from throttle-oriented language to request-admission terminology.
  • Tightened issue 645 plan contracts around task admission, request admission, migration cleanup, and capacity vocabulary.

🗑️ Removed

  • Removed legacy scheduling hint resolver code and tests.
  • Removed transport-level throttle manager/client wrapper code and tests in favor of request admission.

🔍 Attention Areas

⚠️ Reviewers: This is intentionally a large draft PR against the epic branch.

  • packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py — central runtime control flow and lease lifecycle.
  • packages/data-designer-engine/src/data_designer/engine/models/clients/request_admission.py — AIMD request admission state machine and exact request lease accounting.
  • packages/data-designer-engine/src/data_designer/engine/models/clients/model_request_executor.py — concrete model-call attempt boundary and release outcome classification.
  • packages/data-designer-engine/src/data_designer/engine/observability.py — scheduler/request event contracts used by benchmark evidence.
  • reports/async-scheduling-epic-benchmark-report.html — high-level QA and live benchmark report.

🧪 Testing

  • .venv/bin/ruff check packages scripts tests_e2e
  • .venv/bin/ruff format --check packages scripts tests_e2e
  • git diff --check
  • Config package tests: 570 passed
  • Engine package tests: 1,995 passed
  • Interface package tests: 899 passed, 1 skipped
  • Benchmark schema and derived metric checks passed
  • Stale throttle/scheduling term cleanup gates passed, excluding expected migration/removal references
  • Live GPT-5.5 benchmark lanes: max_parallel, AIMD, short/long, fan-in/fan-out, bottleneck workloads
  • Live GPT-5 Nano 1024+ benchmark lanes: cap scale, AIMD scale, fan scale, mixed GPT-5.5 -> Nano pipeline
  • make test as a single aggregate command was not rerun; equivalent package suites above passed
  • E2E tests added/updated: N/A for this PR; live provider benchmark artifacts were collected instead

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated

Notes

Raw live benchmark traces were left local because the full artifact tree is roughly 519 MB. This PR includes the condensed README/combined-summary artifacts and the standalone HTML report so reviewers can inspect the benchmark evidence without committing the full JSONL timelines.

eric-tramel and others added 10 commits May 14, 2026 10:46
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Resolve the second review pass over plans/645 by making the Markdown spec and UML source agree on task admission, request admission, capacity, telemetry, benchmark, migration, and issue-map contracts.

Key updates include canonical event names, richer AsyncCapacityPlan fields, request waiter and cancellation semantics, timed wakeups, retry/salvage lease ordering, and clearer public/internal documentation boundaries.

Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 14, 2026

MkDocs preview: https://f0b2b167.dd-docs-preview.pages.dev

Fern preview: https://nvidia-preview-pr-661.docs.buildwithfern.com/nemo/datadesigner​

Notebook tutorials are rendered without execution outputs in previews.

Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
@eric-tramel eric-tramel marked this pull request as ready for review May 18, 2026 16:39
@eric-tramel eric-tramel requested a review from a team as a code owner May 18, 2026 16:39
@nabinchha
Copy link
Copy Markdown
Contributor

Thanks for putting this together, @eric-tramel — this is a substantial reshaping of the runtime control surfaces and the new module ownership reads cleanly. Here are my thoughts.

Summary

This PR splits runtime control into explicit scheduler task admission (FairTaskQueue + TaskAdmissionController) and provider/model request admission (AdaptiveRequestAdmissionController + ModelRequestExecutor), retires the old ThrottleManager/ThrottledModelClient stack, adds an observability/capacity surface, and updates architecture docs and the published Fern site to match. The implementation matches the PR description and the contracts laid out under plans/645/.

Findings

Warnings — Worth addressing

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/completion.py:347_stable_task_id duplicates stable_task_id and uses a function-local import

  • What: completion.py defines _stable_task_id(task) with the same hashing recipe as resources.stable_task_id(task), plus an in-function import hashlib.
  • Why: This is both a DRY violation and a STYLEGUIDE.md violation ("Place imports at module level, not inside functions"). It also creates real drift risk: _stable_task_id produces the IDs we compare against in mark_enqueued, while stable_task_id is the one the resolver/scheduler actually attaches to SchedulableTask. If either drifts (e.g. someone changes the hash algorithm), mark_enqueued will silently stop matching and frontier dedup breaks.
  • Suggestion: Drop the local function and import the public one from resources:
from data_designer.engine.dataset_builders.scheduling.resources import stable_task_id
# ...
self._frontier = {task for task in self._frontier if stable_task_id(task) not in wanted}

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/queue.py:93,108blocked accumulator is collected but never read

  • What: FairTaskQueue.select_next builds a blocked: list[tuple[float, int, TaskGroupKey]] and appends ineligible heap entries to it, but the list is never consumed before the function returns.
  • Why: Dead code. It's not flagged by F841 (the list is mutated, just never read), so it'll quietly stick around. It also reads as if it was meant to feed diagnostics or a stable backoff that didn't make it into the PR.
  • Suggestion: Either remove the local entirely if it's leftover from refactoring, or wire it into the _DispatchOutcome/explain_blocked path you already have so callers see which groups got skipped during selection.

packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py:639-640, model_request_executor.py:245-246, dataset_builders/async_scheduler.py:369-370 — Event sink exceptions are silently swallowed

  • What: Three independent paths emit observability events through a try / except Exception: return (or continue).
  • Why: We lose any signal that the sink itself is misbehaving (e.g. a buggy custom RequestAdmissionEventSink that always raises). Given the new benchmark/QA story leans heavily on these events, a stuck sink would be effectively invisible.
  • Suggestion: Log at logging.WARNING once per kind (or with rate limiting), and document the contract on Sink.emit_* ("must not raise; failures are dropped"). At a minimum, attach exc_info=True so users can find the broken sink in their logs.
except Exception:
    logger.warning("Admission event sink raised; dropping event", exc_info=True)
    return

packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py:229-281acquire_async polls every ~50 ms instead of being woken

  • What: The async path loops with self._lock: ... ; wait = self._wait_seconds_locked(...) then await asyncio.sleep(wait) where _wait_seconds_locked clamps wait to a minimum candidate of 0.05. The threading _condition.notify_all() paths only wake acquire_sync waiters; the async coroutine is on asyncio.sleep and isn't notified by them.
  • Why: Under contention, every async lease release adds up to ~50 ms of additional latency before the next waiter sees its assigned lease. The benchmark harness will mask this on hot endpoints, but it's a measurable ceiling on scheduler-driven async traffic, and "wakeup" is in the contracts language but not implemented for asyncio.
  • Suggestion: Add a per-resource asyncio.Event (or a small list of asyncio.Futures) that _admit_waiters_locked and release set when an async waiter is admitted; switch acquire_async to asyncio.wait_for(event.wait(), timeout=wait) so we still bound the wait by deadline but stop polling. Alternatively, document the polling cadence as intentional in plans/645/request-admission.md and pull 0.05 out as a named module constant (e.g. _ASYNC_WAIT_POLL_INTERVAL_S) so it's easy to find and tune.

packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py:196-227acquire_sync will block the event loop if called from async code

  • What: acquire_sync calls self._condition.wait(timeout=wait), which holds a real OS-level wait inside a threading.Condition.
  • Why: There's no enforcement that this method is never reached from an asyncio task. _execute_sync in ModelRequestExecutor is the intended caller, but a future caller (e.g. a custom column generator that re-enters the controller) could deadlock the loop. The async/sync executor split assumes everyone reads the contract.
  • Suggestion: Either rename to acquire_blocking and add a docstring warning, or assert asyncio.get_running_loop() raises (i.e. we're not on a loop) at the top of the method:
def acquire_sync(self, item: RequestAdmissionItem) -> RequestAdmissionLease:
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        pass
    else:
        raise RuntimeError(
            "acquire_sync would block the running event loop; use acquire_async instead."
        )
    ...

packages/data-designer-config/src/data_designer/config/run_config.pyRunConfig.throttle removed with no deprecation path

  • What: ThrottleConfig and RunConfig.throttle are deleted outright. ConfigBase enables extra="forbid", so any user code that still does RunConfig(throttle=...) will fail Pydantic validation with extra inputs are not permitted.
  • Why: This is a real public-API breaking change. The PR description correctly calls it out, but a sudden Pydantic error on first run will be confusing — users will see a validator stack trace, not "throttle has been replaced by request admission".
  • Suggestion: Add a model_validator(mode="before") that detects throttle in the input dict and raises a typed ConfigValidationError (or your equivalent canonical error) with a one-line "Replaced by adaptive request admission; see X." This keeps extra="forbid" honest while giving users a humane error. If we want to be even friendlier, accept and warn for one release. Either way, this should be in the release notes when the epic lands on main.

packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py:166-208 — Endpoint-bucket typing forces runtime isinstance casts

  • What: endpoints: dict[tuple[str, str, str], dict[str, object]] is built up with bucket["aliases"] and bucket["caps"], and the code defends each access with if isinstance(cast_aliases, list): cast_aliases.append(...).
  • Why: The runtime checks are there only to satisfy the loose dict[str, object] typing — they can't actually fail because the only writer is this same function. The result is harder to read than the underlying logic.
  • Suggestion: Lift a small private dataclass:
@dataclass
class _EndpointBucket:
    aliases: list[str] = field(default_factory=list)
    caps: list[int] = field(default_factory=list)

endpoints: dict[tuple[str, str, str], _EndpointBucket] = {}
...
bucket = endpoints.setdefault(endpoint, _EndpointBucket())
bucket.aliases.append(alias)
bucket.caps.append(cap)

This drops the isinstance ladder and tightens the typing without expanding the public surface.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py:1042-1085 — Generic except Exception in task body classifies non-LLM bugs as drop-row events

  • What: When a task raises something that isn't in RETRYABLE_MODEL_ERRORS, the scheduler treats it as a non-retryable drop, logs a warning, and continues. So a KeyError/TypeError/AttributeError from our own scheduler/generator code looks identical to "model returned a JSON that fails validation".
  • Why: That makes legitimate engine bugs invisible behind row drops. The early-shutdown gate will eventually fire if this is widespread, but a one-off bug on a hot column path can quietly drain a large portion of a run.
  • Suggestion: Either bump the existing log to logger.error(..., exc_info=True) for non-retryable, non-ProviderError/Generation*Error types, or add an explicit allowlist of "expected provider/generator errors" and re-raise anything else into a typed DatasetGenerationError that the run knows is internal. Tests already exercise the success/failure mix; a unit test that asserts KeyError from a generator raises rather than silently drops the row would lock this in.

packages/data-designer-config/src/data_designer/config/run_config.py:55-57 — Note glued into the Attributes: section

  • What: The "Request admission is engine-internal in V1 and is not exposed as a public run-config knob." sentence sits inside the Google-style Attributes: block where every other entry documents an attribute.
  • Why: Tools that parse the docstring (and reviewers skimming) will read this as a malformed attribute entry. It also looks like a dangling note from a previous iteration.
  • Suggestion: Move it into a separate Notes: section below Attributes:, or up into the class-level summary paragraph so it's clearly a meta-note, not a field.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py:176-181 — Bounded-borrow release silently discards "credit" for non-borrowing tasks

  • What: BoundedBorrowTaskAdmissionPolicy.on_release always returns negative debt_changes for every leased resource, regardless of whether the lease originally borrowed. _apply_delta clamps to zero, so no debt goes negative — but a task that never borrowed still "repays" debt incurred by someone else.
  • Why: This is probably intentional cross-lease debt repayment (and repay_on_withheld_peer_pressure=True hints at that), but it's not obvious from the code or the test names. A future contributor reading this without architecture/ context could easily change it.
  • Suggestion: Add a one-line comment on on_release explaining "any release in the group repays group-level debt up to zero", and add a test that pins the behaviour: borrow once, release a non-borrowing lease, observe debt drops by the full lease size.

Suggestions — Take it or leave it

packages/data-designer-engine/src/data_designer/engine/observability.py:35-39RuntimeCorrelationProvider.set shadows the builtin

  • What: The method name set shadows Python's set() builtin within method bodies and reads oddly at call sites (runtime_correlation_provider.set(...)).
  • Suggestion: push or set_correlation would be more grep-able. Not a blocker.

packages/data-designer-engine/src/data_designer/engine/models/request_admission/resolver.py:20-40RequestResourceResolver is a stateless one-method class

  • What: The class has no instance state and a single resolve(...) method.
  • Suggestion: A free function resolve_request_resource(...) would convey "no state" more directly. If the plan is to swap implementations later, a Protocol with one method captures that without the class indirection. YAGNI either way.

packages/data-designer-engine/src/data_designer/engine/observability.py:100-101,128-132snapshot/request_resource_key typed as object | None

  • What: SchedulerAdmissionEvent.snapshot, RequestAdmissionEvent.request_resource_key, request_group_key, and pressure_snapshot are all object | None (presumably to avoid circular imports at runtime).
  • Suggestion: With TYPE_CHECKING-only imports you can keep the precise types in static checkers without taking a runtime cost. Sinks consuming these events would then get autocomplete instead of poking at object.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/resources.py:57-61stable_task_id uses SHA-1 where a non-cryptographic hash would do

  • What: We hash a small key string with SHA-1 and truncate to 16 hex chars per dispatch.
  • Suggestion: For task identity only, hashlib.blake2b(raw, digest_size=8).hexdigest() is a touch faster and signals "not cryptographic" by name. Not measurable on its own; mostly a readability nudge.

Duplication between FairTaskQueue and RequestFairQueue (scheduling/queue.py and models/request_admission/queue.py)

  • What: Two virtual-time fair queues with very similar shape but slightly different commit contracts and item types.
  • Suggestion: Wait until a third one shows up before extracting a base class — STYLEGUIDE.md says rule-of-three for DRY. Just flagging that the next reviewer touching either will likely want to skim both.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/completion.py:159-160 — Empty mark_complete body

  • What: The compatibility hook is just a docstring with no body, which works because docstrings count as a no-op statement, but reads as "did the author forget to implement this?".
  • Suggestion: Add an explicit pass or ... so it parses as intentionally empty at a glance.

packages/data-designer-config/src/data_designer/config/scheduling.py:103-118SchedulingMetadataError is defined after SchedulingMetadata

  • What: __post_init__ references SchedulingMetadataError which is defined a few classes below. It works (Python resolves at call time) but it's an unusual layout in this codebase.
  • Suggestion: Move the error class above the dataclass that raises it — mirrors the rest of data_designer.config.errors and makes the file read top-down.

What Looks Good

  • Lease lifecycle hygiene. Both controllers track controller_generation, distinguish duplicate / unknown_lease / stale_lease, and have tests that pin the duplicate-release-doesn't-double-credit invariant. That's exactly the kind of thing that bit the old throttle manager.
  • Two-phase fair queue (select_nextcommit). Non-mutating selection plus a sequence_version check makes "select, fail to admit, retry next eligible group" possible without losing fairness. Clean design.
  • Cancellation correctness in acquire_async. The race where CancelledError arrives after a lease was assigned but before the loop noticed is explicitly handled and tested (test_async_cancellation_after_waiter_assignment_releases_lease). Easy to get wrong; it's right here.
  • Doc + plan alignment. architecture/dataset-builders.md, architecture/models.md, the user-facing docs/concepts/architecture-and-performance.md, and the Fern mirrors all move in lockstep with the code, and plans/645/ documents the contracts at the level of detail this kind of refactor needs.
  • Linter clean. ruff check and ruff format --check pass on every file I touched.

Verdict

Needs changes — the duplicated _stable_task_id (with the in-function import), the dead blocked accumulator, the swallowed observability-sink exceptions, the polling/blocking semantics on acquire_async/acquire_sync, and the unguarded RunConfig.throttle removal are all worth addressing before this lands on main. None are critical blockers individually, but collectively they're the kind of subtle issues that become hard to track down once the epic ships.


This review was generated by an AI assistant.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants