Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Unreleased
* Added preliminary S3 support by declaring ``s3fs``, allowing direct ``s3://``
NetCDF inputs, and preparing optional S3 base-path mapping for catalog-backed
processing paths.
* Added preliminary support for opening local and S3-backed Zarr stores as
operation inputs.

1.1.1 (2026-06-17)
==================
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ dependencies = [
"intake <2.0",
"fsspec",
"s3fs",
"zarr >=2.13.0,<3.0",
"pandas >=2.0",
"SQLAlchemy >=2.0",
"aiohttp",
Expand Down
3 changes: 3 additions & 0 deletions src/rook/etc/roocs.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ intake_catalog_url = https://raw.githubusercontent.com/cp4cds/c3s_34g_manifests/

[s3]
# Example:
# These options are shared by NetCDF files and Zarr stores on S3.
# Zarr stores are detected from paths ending in .zarr; no [zarr] section is needed.
# base_dir = s3://example-bucket/data
# anon = true
# endpoint_url = https://s3.example.org
Expand All @@ -16,4 +18,5 @@ intake_catalog_url = https://raw.githubusercontent.com/cp4cds/c3s_34g_manifests/

# Optional per-project override:
# [project:c3s-cmip6]
# Catalog paths below this prefix may point to either NetCDF files or .zarr stores.
# s3_base_dir = s3://example-bucket/cmip6
11 changes: 9 additions & 2 deletions src/rook/utils/ops/consolidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@

from rook.catalog import get_catalog

from .helpers import is_kerchunk_file, is_s3_uri, ordered_dict, wrap_sequence
from .helpers import (
is_kerchunk_file,
is_s3_uri,
is_zarr_store,
ordered_dict,
wrap_sequence,
)


def to_year(time_string):
Expand Down Expand Up @@ -82,6 +88,7 @@ def consolidate(collection, **kwargs):
not isinstance(collection[0], FileMapper)
and not is_kerchunk_file(collection[0])
and not is_s3_uri(collection[0])
and not is_zarr_store(collection[0])
):
project = get_project_name(collection[0])
catalog = get_catalog(project)
Expand All @@ -91,7 +98,7 @@ def consolidate(collection, **kwargs):
time_param = kwargs.get("time")

for dset in collection:
if is_kerchunk_file(dset):
if is_kerchunk_file(dset) or is_zarr_store(dset):
filtered_refs[dset] = dset

elif is_s3_uri(dset):
Expand Down
62 changes: 57 additions & 5 deletions src/rook/utils/ops/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from pathlib import Path
from urllib.parse import urlsplit

import xarray as xr
from clisops.utils.dataset_utils import open_xr_dataset

from rook import CONFIG
from rook.utils.apply_fixes import apply_fixes as apply_dataset_fixes

KERCHUNK_EXTS = (".json", ".zst", ".zstd", ".parquet")
ZARR_EXT = ".zarr"


def wrap_sequence(obj):
Expand All @@ -22,10 +24,18 @@ def wrap_sequence(obj):

def open_dataset(ds_id, file_paths, apply_fixes=True):
"""Open an xarray Dataset and optionally apply rook-native fixes."""
open_kwargs = get_s3_open_kwargs(ds_id, file_paths)
ds = open_xr_dataset(file_paths, **open_kwargs)

if apply_fixes and not is_kerchunk_file(ds_id):
zarr_store = get_zarr_store(ds_id, file_paths)
if zarr_store:
ds = xr.open_zarr(zarr_store, **get_zarr_open_kwargs(zarr_store))
else:
open_kwargs = get_s3_open_kwargs(ds_id, file_paths)
ds = open_xr_dataset(file_paths, **open_kwargs)

if (
apply_fixes
and not is_kerchunk_file(ds_id)
and not is_zarr_store(ds_id)
):
ds = apply_dataset_fixes(ds_id, ds)

return ds
Expand Down Expand Up @@ -73,13 +83,55 @@ def is_s3_uri(dset):
return value.lower().startswith("s3://")


def is_zarr_store(dset):
"""Return True when the input looks like a Zarr store path."""
if isinstance(dset, Path):
dset = str(dset)

if not isinstance(dset, str):
return False

value = dset.strip()
if not value:
return False

path = urlsplit(value).path.rstrip("/").lower()
return path.endswith(ZARR_EXT)


def get_zarr_store(ds_id, file_paths):
"""Return a single Zarr store from a dataset id or resolved file paths."""
if is_zarr_store(ds_id):
return str(ds_id)

if isinstance(file_paths, (str, Path)):
return str(file_paths) if is_zarr_store(file_paths) else None

if file_paths and len(file_paths) == 1 and is_zarr_store(file_paths[0]):
return str(file_paths[0])

return None


def get_zarr_open_kwargs(store):
"""Return xarray opener kwargs for a Zarr store."""
if not is_s3_uri(store):
return {}

storage_options = get_s3_storage_options()
if not storage_options:
return {}

return {"storage_options": storage_options}


def get_s3_open_kwargs(ds_id, file_paths):
"""Return opener kwargs for S3-hosted NetCDF inputs."""
dset = ds_id
if not isinstance(dset, str) and file_paths:
dset = str(file_paths[0])

if not is_s3_uri(dset) or is_kerchunk_file(dset):
if not is_s3_uri(dset) or is_kerchunk_file(dset) or is_zarr_store(dset):
return {}

storage_options = get_s3_storage_options()
Expand Down
15 changes: 15 additions & 0 deletions tests/test_ops_consolidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ def fail_dset_to_filepaths(_dset, **_kwargs):
}


def test_consolidate_zarr_bypasses_catalog_and_mapper(monkeypatch):
def fail_lookup(*_args, **_kwargs):
raise AssertionError("Catalog and file lookup should not run for Zarr input")

monkeypatch.setattr(consolidate, "get_project_name", fail_lookup)
monkeypatch.setattr(consolidate, "get_catalog", fail_lookup)
monkeypatch.setattr(consolidate, "dset_to_filepaths", fail_lookup)

store = "s3://example-bucket/path/example.zarr"
collection = DummyCollection([store])
result = consolidate.consolidate(collection)

assert result == {store: store}


def test_consolidate_catalog_files_can_use_s3_base_dir(monkeypatch):
class DummyCatalog:
def search(self, collection, time):
Expand Down
97 changes: 97 additions & 0 deletions tests/test_ops_helpers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import xarray as xr

import rook.utils.ops.helpers as helpers


Expand Down Expand Up @@ -96,6 +98,101 @@ def test_is_s3_uri_false_for_https():
assert helpers.is_s3_uri("https://example.org/file.nc") is False


def test_is_zarr_store_local_path():
assert helpers.is_zarr_store("/data/example.zarr") is True


def test_is_zarr_store_url_with_query_and_trailing_slash():
assert helpers.is_zarr_store("s3://bucket/example.zarr/?token=abc") is True


def test_is_zarr_store_netcdf_path():
assert helpers.is_zarr_store("s3://bucket/example.nc") is False


def test_get_zarr_store_from_catalog_file_paths():
assert helpers.get_zarr_store(
"project.dataset", ["s3://bucket/example.zarr"]
) == "s3://bucket/example.zarr"


def test_open_dataset_opens_local_zarr_store(tmp_path):
store = tmp_path / "example.zarr"
expected = xr.Dataset({"tas": ("time", [280.0, 281.0])})
expected.to_zarr(store, mode="w")

result = helpers.open_dataset(str(store), str(store), apply_fixes=False)

xr.testing.assert_equal(result, expected)
result.close()


def test_open_dataset_keeps_local_netcdf_path(tmp_path, monkeypatch):
path = tmp_path / "example.nc"
expected = xr.Dataset({"tas": ("time", [280.0, 281.0])})
expected.to_netcdf(path)

def fail_open_zarr(*_args, **_kwargs):
raise AssertionError("NetCDF inputs must not use the Zarr opener")

monkeypatch.setattr(helpers.xr, "open_zarr", fail_open_zarr)

result = helpers.open_dataset(
"project.dataset", [str(path)], apply_fixes=False
)

xr.testing.assert_equal(result, expected)
result.close()


def test_open_dataset_passes_s3_options_to_zarr(monkeypatch):
calls = {}

def fake_open_zarr(store, **kwargs):
calls["store"] = store
calls["kwargs"] = kwargs
return "DATASET"

monkeypatch.setattr(helpers.xr, "open_zarr", fake_open_zarr)
monkeypatch.setattr(
helpers,
"CONFIG",
{"s3": {"anon": "true", "endpoint_url": "https://s3.example.org"}},
)

result = helpers.open_dataset(
"s3://example-bucket/path/example.zarr",
"s3://example-bucket/path/example.zarr",
apply_fixes=False,
)

assert result == "DATASET"
assert calls == {
"store": "s3://example-bucket/path/example.zarr",
"kwargs": {
"storage_options": {
"anon": True,
"client_kwargs": {"endpoint_url": "https://s3.example.org"},
}
},
}


def test_open_dataset_skips_fixes_for_direct_zarr(monkeypatch):
monkeypatch.setattr(helpers.xr, "open_zarr", lambda _store, **_kwargs: "DATASET")

def fail_apply_fixes(_ds_id, _ds):
raise AssertionError("Fix lookup should not be called for direct Zarr input")

monkeypatch.setattr(helpers, "apply_dataset_fixes", fail_apply_fixes)

result = helpers.open_dataset(
"/data/example.zarr", "/data/example.zarr", apply_fixes=True
)

assert result == "DATASET"


def test_get_s3_open_kwargs_for_s3_netcdf(monkeypatch):
monkeypatch.setattr(
helpers,
Expand Down
42 changes: 42 additions & 0 deletions tests/test_ops_zarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import xarray as xr

from rook.utils.ops.subset import subset


def test_subset_local_zarr_store(tmp_path):
store = tmp_path / "input.zarr"
output_dir = tmp_path / "outputs"
output_dir.mkdir()

dataset = xr.Dataset(
{
"tas": (
("time", "lat", "lon"),
[
[[280.0, 281.0], [282.0, 283.0]],
[[284.0, 285.0], [286.0, 287.0]],
[[288.0, 289.0], [290.0, 291.0]],
],
)
},
coords={
"time": xr.date_range("2000-01-01", periods=3),
"lat": [-10.0, 10.0],
"lon": [0.0, 20.0],
},
)
dataset["time"].attrs["standard_name"] = "time"
dataset["lat"].attrs["standard_name"] = "latitude"
dataset["lon"].attrs["standard_name"] = "longitude"
dataset.to_zarr(store, mode="w")

result = subset(
collection=str(store),
output_dir=output_dir,
apply_fixes=False,
)

assert len(result.file_uris) == 1
with xr.open_dataset(result.file_uris[0]) as output:
assert output.sizes["time"] == 3
assert output["tas"].shape == (3, 2, 2)