Skip to content

feat(airflow): Phase-1 Airflow 3.x orchestration scaffold#96

Draft
tabedzki wants to merge 3 commits into
masterfrom
airflow/phase1-scaffold
Draft

feat(airflow): Phase-1 Airflow 3.x orchestration scaffold#96
tabedzki wants to merge 3 commits into
masterfrom
airflow/phase1-scaffold

Conversation

@tabedzki

Copy link
Copy Markdown
Contributor

🤖 AI text below 🤖

Phase-1 Airflow scaffold (parse-only)

First increment of the Airflow migration assessed and planned in #95. No behavior change in production — this adds a self-contained airflow/ tree of skeleton DAGs whose task bodies are TODO stubs. The existing cron pipeline is untouched.

Targets Apache Airflow 3.x (3.2.2).

What's here

  • airflow/dags/
    • nightly_populate.py — the unified Python + MATLAB nightly + alerts DAG. Encodes the FK ordering plus the implicit edges found by auditing the MATLAB make() bodies (see Assessment: migrate orchestration layer to Apache Airflow #95): TowersSessionPsychTask runs after TowersBlock (not just TowersSession), and a barrier (towers_session_batch_complete) gates the subject-cumulative psych tables since they read all prior sessions.
    • ephys_processing.py — linear state-machine chain (transfer → SLURM → transfer → populate), stubbed.
    • imaging_processing.py — the AcquiredTiff seam forking into the Python element chain and the MATLAB SyncImagingBehavior.
  • airflow/plugins/u19/ — stub helpers that wrap existing code (no logic reimplemented): slurm.py (slurm_creator), transfers.py (clusters_paths_and_transfers), matlab.py (matlab -batch), status.py (dual-write recording_process status + LogStatus), callbacks.py (Slack via slack_utils), datajoint_sensor.py.
  • CI.github/workflows/airflow-dag-validation.yml runs a DagBag import-error check via uv sync --only-group airflow + uv run.
  • pyproject.toml — new locked airflow dependency group (apache-airflow 3.2.2 + ssh/standard providers); uv.lock updated.

Validation

All three DAGs parse with zero import errors against Airflow 3.2.2 locally (and in CI via the new workflow).

Not in this PR (tracked in #95)

Real task bodies, dynamic task mapping, deferrable SLURM sensors, deployment, and the open infra questions (PNI hosting, MATLAB host, AcquiredTiff owner, metadata DB).

Refs #95.

Add a parse-only scaffold for migrating the U19 orchestration layer to
Apache Airflow 3.x (assessment + plan in #95).

- airflow/dags: nightly_populate (unified Python+MATLAB nightly + alerts),
  ephys_processing, imaging_processing — TaskFlow DAGs with stubbed task
  bodies. nightly_populate encodes the FK ordering plus the implicit edges
  found in the make() audit (TowersSessionPsychTask after TowersBlock; a
  barrier before the subject-cumulative psych tables).
- airflow/plugins/u19: stub hooks/helpers that wrap existing code
  (slurm_creator, clusters_paths_and_transfers, slack_utils, dual-write of
  recording_process status, matlab -batch). No logic reimplemented.
- CI: .github/workflows/airflow-dag-validation.yml runs a DagBag
  import-error check via `uv sync --only-group airflow` + `uv run`.
- pyproject.toml: add locked `airflow` dependency group (apache-airflow
  3.2.2 + ssh/standard providers); uv.lock updated.

All three DAGs parse with zero import errors against Airflow 3.2.2. Task
bodies are TODO stubs; deployment/infra questions tracked in #95.

Assisted-by: ClaudeCode:claude-opus-4.8
tabedzki added 2 commits June 23, 2026 15:31
The ephys DAG task docstrings referenced STATUS_* names in their
dual_write_status TODOs but never defined them. Add them as named
constants mirroring recording_process_status_dict in params_config, so
each task's status dual-write is explicit and the file is self-coherent.
Implementation should import these from params_config rather than
redeclare, to stay in sync.

Assisted-by: ClaudeCode:claude-opus-4.8
Turn the ephys recording-process scaffold into a working, tested DAG that
reuses (does not reimplement) the existing u19_pipeline code.

Plugins (thin wrappers over u19_pipeline.automatic_job, imported lazily so DAG
parsing stays light):
- status.py: dual_write_status — advances recording_process.Processing status
  + appends LogStatus in one transaction (the issue #95 dual-write contract).
- jobs.py: get_active_job_ids / get_job_row for dynamic task mapping.
- transfers.py / slurm.py: wrap globus + slurm_creator with a dry-run switch.
- slurm_sensor.py: deferrable SlurmJobTrigger + sensor so multi-hour Kilosort
  jobs don't pin a worker slot.
- params.py: per-modality program_selection_params.

DAG: ephys_processing.py now discovers active jobs and dynamically maps one
task-group per job through transfer -> sbatch -> wait -> transfer-back ->
populate_element, dual-writing status after each step.

Tests (airflow/tests): unit tests (dry-run, no DB) + an opt-in integration
test that exercises dual_write_status against the docker-compose MariaDB. The
integration test caught a real bug (DataJoint update1 vs _update). 11 pass.

Tooling: add `airflow-test` dep group + pytest config; CI now runs the unit
tests after DAG validation. uv.lock updated.

Assisted-by: ClaudeCode:claude-opus-4.8
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant