feat(airflow): Phase-1 Airflow 3.x orchestration scaffold#96
Draft
tabedzki wants to merge 3 commits into
Draft
Conversation
Add a parse-only scaffold for migrating the U19 orchestration layer to Apache Airflow 3.x (assessment + plan in #95). - airflow/dags: nightly_populate (unified Python+MATLAB nightly + alerts), ephys_processing, imaging_processing — TaskFlow DAGs with stubbed task bodies. nightly_populate encodes the FK ordering plus the implicit edges found in the make() audit (TowersSessionPsychTask after TowersBlock; a barrier before the subject-cumulative psych tables). - airflow/plugins/u19: stub hooks/helpers that wrap existing code (slurm_creator, clusters_paths_and_transfers, slack_utils, dual-write of recording_process status, matlab -batch). No logic reimplemented. - CI: .github/workflows/airflow-dag-validation.yml runs a DagBag import-error check via `uv sync --only-group airflow` + `uv run`. - pyproject.toml: add locked `airflow` dependency group (apache-airflow 3.2.2 + ssh/standard providers); uv.lock updated. All three DAGs parse with zero import errors against Airflow 3.2.2. Task bodies are TODO stubs; deployment/infra questions tracked in #95. Assisted-by: ClaudeCode:claude-opus-4.8
The ephys DAG task docstrings referenced STATUS_* names in their dual_write_status TODOs but never defined them. Add them as named constants mirroring recording_process_status_dict in params_config, so each task's status dual-write is explicit and the file is self-coherent. Implementation should import these from params_config rather than redeclare, to stay in sync. Assisted-by: ClaudeCode:claude-opus-4.8
Turn the ephys recording-process scaffold into a working, tested DAG that reuses (does not reimplement) the existing u19_pipeline code. Plugins (thin wrappers over u19_pipeline.automatic_job, imported lazily so DAG parsing stays light): - status.py: dual_write_status — advances recording_process.Processing status + appends LogStatus in one transaction (the issue #95 dual-write contract). - jobs.py: get_active_job_ids / get_job_row for dynamic task mapping. - transfers.py / slurm.py: wrap globus + slurm_creator with a dry-run switch. - slurm_sensor.py: deferrable SlurmJobTrigger + sensor so multi-hour Kilosort jobs don't pin a worker slot. - params.py: per-modality program_selection_params. DAG: ephys_processing.py now discovers active jobs and dynamically maps one task-group per job through transfer -> sbatch -> wait -> transfer-back -> populate_element, dual-writing status after each step. Tests (airflow/tests): unit tests (dry-run, no DB) + an opt-in integration test that exercises dual_write_status against the docker-compose MariaDB. The integration test caught a real bug (DataJoint update1 vs _update). 11 pass. Tooling: add `airflow-test` dep group + pytest config; CI now runs the unit tests after DAG validation. uv.lock updated. Assisted-by: ClaudeCode:claude-opus-4.8
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
🤖 AI text below 🤖
Phase-1 Airflow scaffold (parse-only)
First increment of the Airflow migration assessed and planned in #95. No behavior change in production — this adds a self-contained
airflow/tree of skeleton DAGs whose task bodies are TODO stubs. The existing cron pipeline is untouched.Targets Apache Airflow 3.x (3.2.2).
What's here
airflow/dags/nightly_populate.py— the unified Python + MATLAB nightly + alerts DAG. Encodes the FK ordering plus the implicit edges found by auditing the MATLABmake()bodies (see Assessment: migrate orchestration layer to Apache Airflow #95):TowersSessionPsychTaskruns afterTowersBlock(not justTowersSession), and a barrier (towers_session_batch_complete) gates the subject-cumulative psych tables since they read all prior sessions.ephys_processing.py— linear state-machine chain (transfer → SLURM → transfer → populate), stubbed.imaging_processing.py— theAcquiredTiffseam forking into the Python element chain and the MATLABSyncImagingBehavior.airflow/plugins/u19/— stub helpers that wrap existing code (no logic reimplemented):slurm.py(slurm_creator),transfers.py(clusters_paths_and_transfers),matlab.py(matlab -batch),status.py(dual-writerecording_processstatus +LogStatus),callbacks.py(Slack viaslack_utils),datajoint_sensor.py..github/workflows/airflow-dag-validation.ymlruns aDagBagimport-error check viauv sync --only-group airflow+uv run.pyproject.toml— new lockedairflowdependency group (apache-airflow 3.2.2 + ssh/standard providers);uv.lockupdated.Validation
All three DAGs parse with zero import errors against Airflow 3.2.2 locally (and in CI via the new workflow).
Not in this PR (tracked in #95)
Real task bodies, dynamic task mapping, deferrable SLURM sensors, deployment, and the open infra questions (PNI hosting, MATLAB host,
AcquiredTiffowner, metadata DB).Refs #95.