Parallelize updates across variable groups, unifying updates and backfills#529
Draft
Parallelize updates across variable groups, unifying updates and backfills#529
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR updates the reformatter orchestration layer to support parallelizing operational updates the same way backfills are parallelized, with shared worker coordination logic that preserves reader safety for both Zarr v3 and Icechunk-backed datasets.
Changes:
- Unifies update + backfill processing into
DynamicalDataset._process_region_jobs(...), adding worker coordination via_internal/{job_name}/files. - Replaces
max_vars_per_backfill_jobwithmax_vars_per_joband removeskind/worker partitioning fromRegionJob.get_jobs()(partitioning now done byget_worker_jobs). - Adds Icechunk temp-branch strategy and Zarr v3 “defer metadata write until last worker” behavior; adds docs + tests for these workflows.
Reviewed changes
Copilot reviewed 30 out of 30 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/noaa/gfs/analysis/region_job_test.py | Adjusts expectations for job splitting by max_vars_per_job. |
| tests/noaa/gefs/forecast_35_day/region_job_test.py | Renames test to max_vars_per_job and updates get_jobs() call expectations. |
| tests/noaa/gefs/forecast_35_day/dynamical_dataset_test.py | Updates integration assertion to max_vars_per_job. |
| tests/noaa/gefs/analysis/region_job_test.py | Renames test to max_vars_per_job and updates get_jobs() call expectations. |
| tests/noaa/gefs/analysis/dynamical_dataset_test.py | Updates integration assertion to max_vars_per_job. |
| tests/ecmwf/ifs_ens/forecast_15_day_0_25_degree/region_job_test.py | Updates operational update job count/vars assertions due to per-var job splitting. |
| tests/datasets_test.py | Updates CLI command expectation from process-backfill-region-jobs to backfill. |
| tests/common/test_storage.py | Adds tests for coordination file IO, branch support, and icechunk repo enumeration. |
| tests/common/test_parallel_writes.py | New integration tests for multi-worker coordination and reader safety (Zarr v3 + Icechunk + replicas). |
| tests/common/region_job_test.py | Updates tests for get_jobs() API change and shifts worker partitioning to get_worker_jobs. |
| tests/common/dynamical_dataset_test.py | Updates dataset API tests for new backfill() name and changed orchestration behavior. |
| src/reformatters/noaa/mrms/conus_analysis_hourly/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/noaa/hrrr/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/noaa/gfs/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/noaa/gfs/analysis/region_job.py | Renames max_vars_per_backfill_job to max_vars_per_job. |
| src/reformatters/noaa/gefs/forecast_35_day/region_job.py | Renames max_vars_per_backfill_job to max_vars_per_job; removes kind=. |
| src/reformatters/noaa/gefs/analysis/region_job.py | Renames max_vars_per_backfill_job to max_vars_per_job; removes kind=. |
| src/reformatters/example/region_job.py | Updates commented example removing kind= usage. |
| src/reformatters/ecmwf/ifs_ens/forecast_15_day_0_25_degree/region_job.py | Renames max_vars_per_backfill_job to max_vars_per_job; removes kind=. |
| src/reformatters/ecmwf/aifs_deterministic/forecast/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/dwd/icon_eu/forecast_5_day/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/contrib/uarizona/swann/analysis/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/contrib/noaa/ndvi_cdr/analysis/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/contrib/nasa/smap/level3_36km_v9/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/common/zarr.py | Adds zarr3_only flag and consolidates metadata-copy store filtering logic. |
| src/reformatters/common/storage.py | Adds branch support for Icechunk stores, icechunk_repos(), and coordination file read/write/cleanup. |
| src/reformatters/common/region_job.py | Updates get_jobs() API (no kind/worker partitioning) and unifies variable splitting via max_vars_per_job. |
| src/reformatters/common/dynamical_dataset.py | Introduces unified parallel orchestration with setup/results/finalize phases, Icechunk temp branch flow, and deferred Zarr v3 metadata write. |
| docs/parallel_processing.md | New documentation describing the worker coordination protocol, reader safety, and failure modes. |
| AGENTS.md | Updates contributor docs to reflect parallel updates and adds docs directory listing. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Member
Author
|
TODO
|
8fdae7c to
e8a75b4
Compare
…418) Unify update and backfill codepaths into a shared `_process_region_jobs` method that coordinates parallel writes across Kubernetes indexed jobs. - Rename `max_vars_per_backfill_job` to `max_vars_per_job`, applying variable group splitting to both updates and backfills - Remove `kind` parameter from `get_jobs()` and `worker_index`/`workers_total` (partitioning now handled by caller via `get_worker_jobs`) - Add `backfill()` method, rename CLI command from `process-backfill-region-jobs` - Add `worker_index`/`workers_total` parameters to `update()` for parallel updates - Icechunk: use temp branch strategy so readers on main never see partial data - Zarr v3: defer metadata write until last worker to prevent exposing empty holes - Worker coordination via `_internal/{job_name}/` files in object store - Deterministic branch names from job name for safe worker restarts - Two-pass finalize (reset all repos, then delete branches) with skip-if-already-reset - Replicas updated before primary so primary drives retry behavior - StoreFactory gains branch support, icechunk_repos(), and coordination file I/O - Add `zarr3_only` parameter to `copy_zarr_metadata` Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Split _finalize icechunk loop into two passes (reset all, then delete branches) with skip-if-already-reset for safe retries - Guard icechunk finalization on branch_name != "main" to skip for backfills - Add required sort kwarg to icechunk_repos() for explicit ordering - Remove worker_index/workers_total from get_jobs() (caller handles partitioning) - Add replica parallel write integration test - Add K8s pod_active_deadline timeout comments on polling loops Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tup_info Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…setup_info - Fix dict overwrite bug in results merging: use setdefault+extend to properly accumulate source file coords across multiple jobs for the same variable (different regions) - Split _collect_results into _wait_for_workers + _collect_results so backfills (update_template_with_results=False) only wait for completion without loading/deserializing result files - Serialize setup_info as JSON instead of pickle to avoid code-execution risk from untrusted data in _internal/ Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
e8a75b4 to
92f95ca
Compare
Skip store creation and icechunk commit for workers with zero assigned jobs to avoid committing empty sessions. Split Sentry monitor checkins so worker 0 sends in_progress and the last worker sends ok/error. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Add update_num_variable_groups() on DynamicalDataset that computes the number of variable groups from source_groups and max_vars_per_job. Use it in the 4 datasets that set max_vars_per_job to auto-set workers_total and parallelism on their ReformatCronJob. Extract split_groups into common/iterating.py as a generic function. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
…tes" This reverts commit 444f0e6.
Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
S3FileSystem caches directory listings, so polling loops in _wait_for_workers and _parallel_setup never see new files after the first ls call. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
primary_store()/replica_stores() return ALL stores regardless of format. For datasets where the primary is zarr v3 (e.g. ecmwf-ifs-ens), _parallel_setup was calling write_metadata(mode="w") on the production zarr v3 store, wiping all chunk data. Get stores from icechunk_repos instead, which only contains icechunk stores by construction. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Take the most descriptive summary from old and new versions for each doc entry, preferring specificity over brevity. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Use copy_zarr_metadata to expand icechunk dimensions by copying zarr.json and coordinate arrays from the local tmp_store, instead of to_zarr(mode=w) which deletes existing data first. Add a guard in write_metadata that raises ValueError if mode=w is used on a non-Path store, preventing accidental remote data deletion. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Add a log.warning in _process_region_jobs when workers_total doesn't match the number of jobs, guiding operators to the optimal value. Set GEFS forecast-35-day and GEFS analysis to 2x variable groups since their operational updates reprocess the most recent time slice. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Add parallelism guidance to dataset integration guide. Update example dynamical_dataset.py to show setting workers_total via update_num_variable_groups. Fix setup/ready.pkl -> setup/ready.json in parallel_processing.md. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
…alysis Forecast datasets reprocess the most recent init_time + process a new one, always producing 2 region jobs (or 2 * num_variable_groups). Analysis datasets almost always stay within one shard, so 1x is right. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
…railing dashes Pod names for indexed jobs include job timestamp + index + random suffix. Reduce max cronjob name from 52 to 42 to fit within the 63-char DNS label limit. Strip trailing dashes after trimming dataset_id to avoid double dashes in names. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
…leanups
- Cache poll results in _parallel_setup and _wait_for_workers to avoid
double read_all_coordination_files calls against object store
- Return result data from _wait_for_workers so _collect_results reuses it
- Use urlparse instead of manual split("://") in _coordination_fs
- Use truthiness check on icechunk_repos list instead of len() > 0
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…n retry - Replace broad try/except IcechunkError around create_branch/delete_branch with list_branches() membership checks. - On worker 0 retry, reuse snapshots from prior ready.json so original_snapshot stays stable. Without this, a retry after an external main write would refresh original_snapshot to match the moved main, bypassing finalize's from_snapshot_id skip and silently overwriting the external change. - Replace the workers_total != len(all_jobs) warning with a plain info log including total job count, since backfills intentionally run fewer workers than jobs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…parallel test coverage
- Replace hardcoded workers_total=2 on 4 forecast datasets (gfs/forecast,
aifs_single, icon_eu, hrrr) with 2 * self.update_num_variable_groups().
Same numeric result today, scales automatically if max_vars_per_job is
added later.
- Remove obsolete "warning logged at runtime" note from the integration guide.
- Add failure-mode tests in test_parallel_writes.py:
- Worker restart idempotence (zarr3 and icechunk).
- Worker 0 retry preserves snapshot in ready.json via setdefault.
- Last worker retry after partial finalize skips already-reset repos
and completes remaining ones.
- Replica-last sort ordering invariant.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Backfills can have tens of thousands of workers, making the per-file cat_file loop in read_all_coordination_files prohibitive (N serial GETs). - Add StoreFactory.count_coordination_files for lightweight polling via ls only. - _wait_for_workers now counts instead of reading, and returns None. Backfills never load the pickled result bodies. - _collect_results (updates only) waits then reads once. - Switch read_all_coordination_files from serial cat_file to fs.cat, which runs concurrently on async backends like s3fs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Towards #418
Unify update and backfill codepaths into a shared
_process_region_jobsmethod that coordinates parallel writes across Kubernetes indexed jobs.max_vars_per_backfill_jobtomax_vars_per_job, applying variable group splitting to both updates and backfillskindparameter fromget_jobs()andworker_index/workers_total(partitioning now handled by caller viaget_worker_jobs)backfill()method, rename CLI command fromprocess-backfill-region-jobsworker_index/workers_totalparameters toupdate()for parallel updates_internal/{job_name}/files in object storezarr3_onlyparameter tocopy_zarr_metadata