From b2c6438f68d4919255d014d7731019798158804d Mon Sep 17 00:00:00 2001 From: Pingu Carsti Date: Thu, 18 Jun 2026 13:32:22 +0200 Subject: [PATCH 1/2] added initial zarr support --- CHANGELOG.rst | 2 + pyproject.toml | 1 + src/rook/utils/ops/consolidate.py | 11 ++++- src/rook/utils/ops/helpers.py | 62 ++++++++++++++++++++++-- tests/test_ops_consolidate.py | 15 ++++++ tests/test_ops_helpers.py | 79 +++++++++++++++++++++++++++++++ tests/test_ops_zarr.py | 42 ++++++++++++++++ 7 files changed, 205 insertions(+), 7 deletions(-) create mode 100644 tests/test_ops_zarr.py diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 796d360..e3dfc2c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,8 @@ Unreleased * Added preliminary S3 support by declaring ``s3fs``, allowing direct ``s3://`` NetCDF inputs, and preparing optional S3 base-path mapping for catalog-backed processing paths. +* Added preliminary support for opening local and S3-backed Zarr stores as + operation inputs. 1.1.1 (2026-06-17) ================== diff --git a/pyproject.toml b/pyproject.toml index e29aba6..f90a63a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ dependencies = [ "intake <2.0", "fsspec", "s3fs", + "zarr >=2.13.0,<3.0", "pandas >=2.0", "SQLAlchemy >=2.0", "aiohttp", diff --git a/src/rook/utils/ops/consolidate.py b/src/rook/utils/ops/consolidate.py index a656b8b..a6c4dc6 100644 --- a/src/rook/utils/ops/consolidate.py +++ b/src/rook/utils/ops/consolidate.py @@ -11,7 +11,13 @@ from rook.catalog import get_catalog -from .helpers import is_kerchunk_file, is_s3_uri, ordered_dict, wrap_sequence +from .helpers import ( + is_kerchunk_file, + is_s3_uri, + is_zarr_store, + ordered_dict, + wrap_sequence, +) def to_year(time_string): @@ -82,6 +88,7 @@ def consolidate(collection, **kwargs): not isinstance(collection[0], FileMapper) and not is_kerchunk_file(collection[0]) and not is_s3_uri(collection[0]) + and not is_zarr_store(collection[0]) ): project = get_project_name(collection[0]) catalog = get_catalog(project) @@ -91,7 +98,7 @@ def consolidate(collection, **kwargs): time_param = kwargs.get("time") for dset in collection: - if is_kerchunk_file(dset): + if is_kerchunk_file(dset) or is_zarr_store(dset): filtered_refs[dset] = dset elif is_s3_uri(dset): diff --git a/src/rook/utils/ops/helpers.py b/src/rook/utils/ops/helpers.py index 958ce1d..51844e1 100644 --- a/src/rook/utils/ops/helpers.py +++ b/src/rook/utils/ops/helpers.py @@ -5,12 +5,14 @@ from pathlib import Path from urllib.parse import urlsplit +import xarray as xr from clisops.utils.dataset_utils import open_xr_dataset from rook import CONFIG from rook.utils.apply_fixes import apply_fixes as apply_dataset_fixes KERCHUNK_EXTS = (".json", ".zst", ".zstd", ".parquet") +ZARR_EXT = ".zarr" def wrap_sequence(obj): @@ -22,10 +24,18 @@ def wrap_sequence(obj): def open_dataset(ds_id, file_paths, apply_fixes=True): """Open an xarray Dataset and optionally apply rook-native fixes.""" - open_kwargs = get_s3_open_kwargs(ds_id, file_paths) - ds = open_xr_dataset(file_paths, **open_kwargs) - - if apply_fixes and not is_kerchunk_file(ds_id): + zarr_store = get_zarr_store(ds_id, file_paths) + if zarr_store: + ds = xr.open_zarr(zarr_store, **get_zarr_open_kwargs(zarr_store)) + else: + open_kwargs = get_s3_open_kwargs(ds_id, file_paths) + ds = open_xr_dataset(file_paths, **open_kwargs) + + if ( + apply_fixes + and not is_kerchunk_file(ds_id) + and not is_zarr_store(ds_id) + ): ds = apply_dataset_fixes(ds_id, ds) return ds @@ -73,13 +83,55 @@ def is_s3_uri(dset): return value.lower().startswith("s3://") +def is_zarr_store(dset): + """Return True when the input looks like a Zarr store path.""" + if isinstance(dset, Path): + dset = str(dset) + + if not isinstance(dset, str): + return False + + value = dset.strip() + if not value: + return False + + path = urlsplit(value).path.rstrip("/").lower() + return path.endswith(ZARR_EXT) + + +def get_zarr_store(ds_id, file_paths): + """Return a single Zarr store from a dataset id or resolved file paths.""" + if is_zarr_store(ds_id): + return str(ds_id) + + if isinstance(file_paths, (str, Path)): + return str(file_paths) if is_zarr_store(file_paths) else None + + if file_paths and len(file_paths) == 1 and is_zarr_store(file_paths[0]): + return str(file_paths[0]) + + return None + + +def get_zarr_open_kwargs(store): + """Return xarray opener kwargs for a Zarr store.""" + if not is_s3_uri(store): + return {} + + storage_options = get_s3_storage_options() + if not storage_options: + return {} + + return {"storage_options": storage_options} + + def get_s3_open_kwargs(ds_id, file_paths): """Return opener kwargs for S3-hosted NetCDF inputs.""" dset = ds_id if not isinstance(dset, str) and file_paths: dset = str(file_paths[0]) - if not is_s3_uri(dset) or is_kerchunk_file(dset): + if not is_s3_uri(dset) or is_kerchunk_file(dset) or is_zarr_store(dset): return {} storage_options = get_s3_storage_options() diff --git a/tests/test_ops_consolidate.py b/tests/test_ops_consolidate.py index ea230ff..85c582a 100644 --- a/tests/test_ops_consolidate.py +++ b/tests/test_ops_consolidate.py @@ -44,6 +44,21 @@ def fail_dset_to_filepaths(_dset, **_kwargs): } +def test_consolidate_zarr_bypasses_catalog_and_mapper(monkeypatch): + def fail_lookup(*_args, **_kwargs): + raise AssertionError("Catalog and file lookup should not run for Zarr input") + + monkeypatch.setattr(consolidate, "get_project_name", fail_lookup) + monkeypatch.setattr(consolidate, "get_catalog", fail_lookup) + monkeypatch.setattr(consolidate, "dset_to_filepaths", fail_lookup) + + store = "s3://example-bucket/path/example.zarr" + collection = DummyCollection([store]) + result = consolidate.consolidate(collection) + + assert result == {store: store} + + def test_consolidate_catalog_files_can_use_s3_base_dir(monkeypatch): class DummyCatalog: def search(self, collection, time): diff --git a/tests/test_ops_helpers.py b/tests/test_ops_helpers.py index 1936089..4db4498 100644 --- a/tests/test_ops_helpers.py +++ b/tests/test_ops_helpers.py @@ -1,3 +1,5 @@ +import xarray as xr + import rook.utils.ops.helpers as helpers @@ -96,6 +98,83 @@ def test_is_s3_uri_false_for_https(): assert helpers.is_s3_uri("https://example.org/file.nc") is False +def test_is_zarr_store_local_path(): + assert helpers.is_zarr_store("/data/example.zarr") is True + + +def test_is_zarr_store_url_with_query_and_trailing_slash(): + assert helpers.is_zarr_store("s3://bucket/example.zarr/?token=abc") is True + + +def test_is_zarr_store_netcdf_path(): + assert helpers.is_zarr_store("s3://bucket/example.nc") is False + + +def test_get_zarr_store_from_catalog_file_paths(): + assert helpers.get_zarr_store( + "project.dataset", ["s3://bucket/example.zarr"] + ) == "s3://bucket/example.zarr" + + +def test_open_dataset_opens_local_zarr_store(tmp_path): + store = tmp_path / "example.zarr" + expected = xr.Dataset({"tas": ("time", [280.0, 281.0])}) + expected.to_zarr(store, mode="w") + + result = helpers.open_dataset(str(store), str(store), apply_fixes=False) + + xr.testing.assert_equal(result, expected) + result.close() + + +def test_open_dataset_passes_s3_options_to_zarr(monkeypatch): + calls = {} + + def fake_open_zarr(store, **kwargs): + calls["store"] = store + calls["kwargs"] = kwargs + return "DATASET" + + monkeypatch.setattr(helpers.xr, "open_zarr", fake_open_zarr) + monkeypatch.setattr( + helpers, + "CONFIG", + {"s3": {"anon": "true", "endpoint_url": "https://s3.example.org"}}, + ) + + result = helpers.open_dataset( + "s3://example-bucket/path/example.zarr", + "s3://example-bucket/path/example.zarr", + apply_fixes=False, + ) + + assert result == "DATASET" + assert calls == { + "store": "s3://example-bucket/path/example.zarr", + "kwargs": { + "storage_options": { + "anon": True, + "client_kwargs": {"endpoint_url": "https://s3.example.org"}, + } + }, + } + + +def test_open_dataset_skips_fixes_for_direct_zarr(monkeypatch): + monkeypatch.setattr(helpers.xr, "open_zarr", lambda _store, **_kwargs: "DATASET") + + def fail_apply_fixes(_ds_id, _ds): + raise AssertionError("Fix lookup should not be called for direct Zarr input") + + monkeypatch.setattr(helpers, "apply_dataset_fixes", fail_apply_fixes) + + result = helpers.open_dataset( + "/data/example.zarr", "/data/example.zarr", apply_fixes=True + ) + + assert result == "DATASET" + + def test_get_s3_open_kwargs_for_s3_netcdf(monkeypatch): monkeypatch.setattr( helpers, diff --git a/tests/test_ops_zarr.py b/tests/test_ops_zarr.py new file mode 100644 index 0000000..18f2db0 --- /dev/null +++ b/tests/test_ops_zarr.py @@ -0,0 +1,42 @@ +import xarray as xr + +from rook.utils.ops.subset import subset + + +def test_subset_local_zarr_store(tmp_path): + store = tmp_path / "input.zarr" + output_dir = tmp_path / "outputs" + output_dir.mkdir() + + dataset = xr.Dataset( + { + "tas": ( + ("time", "lat", "lon"), + [ + [[280.0, 281.0], [282.0, 283.0]], + [[284.0, 285.0], [286.0, 287.0]], + [[288.0, 289.0], [290.0, 291.0]], + ], + ) + }, + coords={ + "time": xr.date_range("2000-01-01", periods=3), + "lat": [-10.0, 10.0], + "lon": [0.0, 20.0], + }, + ) + dataset["time"].attrs["standard_name"] = "time" + dataset["lat"].attrs["standard_name"] = "latitude" + dataset["lon"].attrs["standard_name"] = "longitude" + dataset.to_zarr(store, mode="w") + + result = subset( + collection=str(store), + output_dir=output_dir, + apply_fixes=False, + ) + + assert len(result.file_uris) == 1 + with xr.open_dataset(result.file_uris[0]) as output: + assert output.sizes["time"] == 3 + assert output["tas"].shape == (3, 2, 2) From 814385bafadd007a4eb5af74a1cb6447b7cea533 Mon Sep 17 00:00:00 2001 From: Pingu Carsti Date: Thu, 18 Jun 2026 13:36:08 +0200 Subject: [PATCH 2/2] update config example for zarr --- src/rook/etc/roocs.ini | 3 +++ tests/test_ops_helpers.py | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/src/rook/etc/roocs.ini b/src/rook/etc/roocs.ini index bc65849..9e57a90 100644 --- a/src/rook/etc/roocs.ini +++ b/src/rook/etc/roocs.ini @@ -5,6 +5,8 @@ intake_catalog_url = https://raw.githubusercontent.com/cp4cds/c3s_34g_manifests/ [s3] # Example: +# These options are shared by NetCDF files and Zarr stores on S3. +# Zarr stores are detected from paths ending in .zarr; no [zarr] section is needed. # base_dir = s3://example-bucket/data # anon = true # endpoint_url = https://s3.example.org @@ -16,4 +18,5 @@ intake_catalog_url = https://raw.githubusercontent.com/cp4cds/c3s_34g_manifests/ # Optional per-project override: # [project:c3s-cmip6] +# Catalog paths below this prefix may point to either NetCDF files or .zarr stores. # s3_base_dir = s3://example-bucket/cmip6 diff --git a/tests/test_ops_helpers.py b/tests/test_ops_helpers.py index 4db4498..195e238 100644 --- a/tests/test_ops_helpers.py +++ b/tests/test_ops_helpers.py @@ -127,6 +127,24 @@ def test_open_dataset_opens_local_zarr_store(tmp_path): result.close() +def test_open_dataset_keeps_local_netcdf_path(tmp_path, monkeypatch): + path = tmp_path / "example.nc" + expected = xr.Dataset({"tas": ("time", [280.0, 281.0])}) + expected.to_netcdf(path) + + def fail_open_zarr(*_args, **_kwargs): + raise AssertionError("NetCDF inputs must not use the Zarr opener") + + monkeypatch.setattr(helpers.xr, "open_zarr", fail_open_zarr) + + result = helpers.open_dataset( + "project.dataset", [str(path)], apply_fixes=False + ) + + xr.testing.assert_equal(result, expected) + result.close() + + def test_open_dataset_passes_s3_options_to_zarr(monkeypatch): calls = {}