Skip to content

Eval recipe pipelines and config reorg#810

Open
pzharrington wants to merge 18 commits intoNVIDIA:mainfrom
pzharrington:vnv-recipe-extend
Open

Eval recipe pipelines and config reorg#810
pzharrington wants to merge 18 commits intoNVIDIA:mainfrom
pzharrington:vnv-recipe-extend

Conversation

@pzharrington
Copy link
Copy Markdown
Collaborator

Earth2Studio Pull Request

Description

An incremental update to eval recipe, making it more flexible for different types of models via the introduction of the Pipeline interface. Description copied from README below. Besides that, also reorganizes the config system to reduce bloat from having different possible models and inference campaigns.

Pipeline interface

All inference logic is driven by a Pipeline — an abstract base class
(src/pipeline.py) that separates per-work-item inference from the shared
scaffolding (work iteration, output filtering, ensemble injection, zarr
writes). Subclasses implement three methods:

Method Purpose
setup(cfg, device) Load models, move to device, cache coordinate metadata
build_total_coords(times, ensemble_size) Define the full zarr output coordinate system
run_item(item, data_source, device) Yield (tensor, coords) pairs for one work item

The base class Pipeline.run() handles everything else: iterating work
items, building the output variable filter, injecting the ensemble dimension,
and writing to the OutputManager.

Two built-in pipelines are provided:

  • ForecastPipeline (pipeline=forecast) — prognostic rollout with
    optional diagnostic models. Yields one output per lead-time step.
  • DiagnosticPipeline (pipeline=diagnostic) — diagnostic-only (no
    prognostic model). Yields a single output per work item.

Custom pipelines

To add a custom inference loop, subclass Pipeline and set pipeline in
your Hydra config to the fully-qualified class name:

# my_pipeline.py
from src.pipeline import Pipeline

class MyPipeline(Pipeline):
    def setup(self, cfg, device):
        ...
    def build_total_coords(self, times, ensemble_size):
        ...
    def run_item(self, item, data_source, device):
        ...
        yield x, coords
# In your Hydra config override:
pipeline: my_pipeline.MyPipeline

Custom pipelines inherit the full shared machinery — distributed output
management, ensemble dimension handling, threaded zarr writes — for free.

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.
  • The CHANGELOG.md is up to date with these changes.
  • An issue is linked to this pull request.
  • Assess and address Greptile feedback (AI code review bot for guidance; use discretion, addressing all feedback is not required).

Dependencies

@pzharrington pzharrington self-assigned this Apr 10, 2026
@pzharrington pzharrington requested a review from NickGeneva April 10, 2026 01:04
@pzharrington pzharrington marked this pull request as ready for review April 10, 2026 01:06
@pzharrington
Copy link
Copy Markdown
Collaborator Author

/blossom-ci

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Apr 10, 2026

Greptile Summary

This PR introduces a Pipeline ABC (src/pipeline.py) that decouples per-work-item inference from shared scaffolding (output coord building, ensemble injection, zarr writes). Two built-in implementations are provided — ForecastPipeline and DiagnosticPipeline — along with resume/progress-tracking support via filesystem markers. The config system is reorganised so that predownload settings now live in default.yaml and campaign-level configs override the model and IC sweep.

Confidence Score: 5/5

Safe to merge — no P0/P1 defects found; all findings are minor style/documentation suggestions.

The pipeline abstraction is well-designed and correctly handles distributed barriers, ensemble injection, output filtering, and threaded-write flushing. The resume marker system is globally consistent across ranks. All three findings are P2 and do not affect correctness in any currently exercised code path.

No files require special attention for merge safety.

Important Files Changed

Filename Overview
recipes/eval/src/pipeline.py New file — well-structured ABC with two built-in implementations; shared run() correctly handles output filtering, ensemble injection, and threaded-write flushing before markers.
recipes/eval/src/output.py Refactored — OutputManager.__init__ slimmed down; validate_output_store added for deferred store creation; flush() for resume sync; build_forecast_coords / build_diagnostic_coords extracted as standalone helpers.
recipes/eval/src/work.py Added write_marker, filter_completed_items, clear_progress, and progress_dir; also fixes _parse_initial_times to treat start_times: null as absent so campaign configs can use the IC-block path.
recipes/eval/main.py Cleanly delegates to build_pipeline / pipeline.setup / pipeline.run; resume early-exit is globally consistent because filter_completed_items reads from the same shared filesystem across all ranks.
recipes/eval/predownload.py Split into _predownload_forecast and _predownload_diagnostic; custom pipelines (fully-qualified class names) will hit a ValueError with a message that doesn't hint users need to implement their own pre-download step.
recipes/eval/test/_multigpu_worker.py Updated to use ForecastPipeline; manually assigns attributes instead of calling pipeline.setup(), bypassing the run_on_rank0_first barrier inside load_prognostic.
recipes/eval/test/test_resume.py New — comprehensive coverage of progress tracking, resume/flush, and pipeline resume integration.
recipes/eval/test/test_pipeline.py New — covers ABC enforcement, registry lookup, custom-class-by-FQN path, and the stub pipeline's build/run methods.
recipes/eval/test/test_diagnostic_inference.py New — covers single-IC, multi-IC, ensemble, and empty-work-item paths for DiagnosticPipeline.
recipes/eval/cfg/default.yaml Adds pipeline, resume, and predownload keys; moves predownload config into the shared default.
recipes/eval/cfg/campaign/fcn3_2024_full.yaml New campaign config for FCN3 full-year ensemble; correctly sets start_times: null to activate the IC-block path and resume: true for multi-job splitting.

Reviews (1): Last reviewed commit: "Merge branch 'main' into vnv-recipe-exte..." | Re-trigger Greptile

Comment on lines +349 to +352
if self._executor is not None:
for f in self._futures:
f.result()
self._futures.clear()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 flush() leaves stale futures on exception

If any future raises, f.result() re-raises immediately and self._futures.clear() is never reached. __exit__ will then encounter the same failed future again (it wraps f.result() in try/except, so it's safe), but the list isn't left in a clean state after a failed flush() call. Consider using a swap-and-clear pattern to guarantee the list is cleared:

Suggested change
if self._executor is not None:
for f in self._futures:
f.result()
self._futures.clear()
def flush(self) -> None:
"""Wait for all pending threaded writes to complete.
Called between work items during resume runs to guarantee that all
zarr writes have landed before a completion marker is written.
Raises immediately if any pending write failed.
"""
if self._executor is not None:
futures, self._futures = self._futures, []
for f in futures:
f.result()

Comment on lines +152 to +158

# --- Sentinel file ------------------------------------------------------
if dist.distributed:
torch.distributed.barrier()

if dist.rank == 0:
sp = sentinel_path(cfg)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Custom pipelines not supported with an unhelpful error message

build_pipeline() in pipeline.py accepts fully-qualified class names for custom pipelines, but predownload.py only handles "forecast" and "diagnostic". If a user sets pipeline: my_module.MyPipeline and runs predownload.py, they get ValueError: Unknown pipeline 'my_module.MyPipeline'. Expected 'forecast' or 'diagnostic'. with no hint that they need to implement their own pre-download step. Consider improving the error message:

Suggested change
# --- Sentinel file ------------------------------------------------------
if dist.distributed:
torch.distributed.barrier()
if dist.rank == 0:
sp = sentinel_path(cfg)
else:
raise ValueError(
f"Unknown built-in pipeline '{pipeline}'. "
"predownload.py only supports 'forecast' and 'diagnostic'. "
"For custom pipelines, implement pre-download logic directly "
"or extend this script."
)

Comment on lines +155 to +163

with OutputManager(cfg) as output_mgr:
output_mgr.validate_output_store(total_coords, list(VARIABLES))
pipeline.run(
work_items=my_items,
prognostic=prognostic,
data_source=data_source,
output_mgr=output_mgr,
nsteps=nsteps,
device=torch.device(f"cuda:{dist.local_rank}"),
output_variables=list(VARIABLES),
device=dist.device,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Bypasses pipeline.setup() distributed barriers

The pipeline is assembled via direct attribute assignment instead of pipeline.setup(cfg, device). ForecastPipeline.setup() contains the comment "All ranks must participate in model loading for barrier correctness" because load_prognostic uses run_on_rank0_first. This is safe here only because Persistence never triggers those barriers, but the pattern could mislead future test maintainers. Consider calling pipeline.setup(cfg, dist.device) or at least leaving a comment explaining why direct assignment is used.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant