From b2260adaa386d132c9095ef70fa67f51349a9f8f Mon Sep 17 00:00:00 2001 From: Pingu Carsti Date: Wed, 17 Jun 2026 18:05:10 +0200 Subject: [PATCH 1/5] added s3fs and is_s3_uri function --- environment.yml | 1 + pyproject.toml | 1 + src/rook/utils/ops/consolidate.py | 11 +++++++++-- src/rook/utils/ops/helpers.py | 15 +++++++++++++++ tests/test_ops_consolidate.py | 22 ++++++++++++++++++++++ tests/test_ops_helpers.py | 8 ++++++++ 6 files changed, 56 insertions(+), 2 deletions(-) diff --git a/environment.yml b/environment.yml index 6a6b4db..1919ab5 100644 --- a/environment.yml +++ b/environment.yml @@ -32,6 +32,7 @@ dependencies: # catalog - intake <2.0 - fsspec + - s3fs - pandas >=2.0 - sqlalchemy >=2.0 - aiohttp diff --git a/pyproject.toml b/pyproject.toml index 58b50da..e29aba6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,7 @@ dependencies = [ "pydot", "intake <2.0", "fsspec", + "s3fs", "pandas >=2.0", "SQLAlchemy >=2.0", "aiohttp", diff --git a/src/rook/utils/ops/consolidate.py b/src/rook/utils/ops/consolidate.py index 1d1876a..a656b8b 100644 --- a/src/rook/utils/ops/consolidate.py +++ b/src/rook/utils/ops/consolidate.py @@ -11,7 +11,7 @@ from rook.catalog import get_catalog -from .helpers import is_kerchunk_file, ordered_dict, wrap_sequence +from .helpers import is_kerchunk_file, is_s3_uri, ordered_dict, wrap_sequence def to_year(time_string): @@ -78,7 +78,11 @@ def consolidate(collection, **kwargs): collection = wrap_sequence(collection.value) - if not isinstance(collection[0], FileMapper) and not is_kerchunk_file(collection[0]): + if ( + not isinstance(collection[0], FileMapper) + and not is_kerchunk_file(collection[0]) + and not is_s3_uri(collection[0]) + ): project = get_project_name(collection[0]) catalog = get_catalog(project) @@ -90,6 +94,9 @@ def consolidate(collection, **kwargs): if is_kerchunk_file(dset): filtered_refs[dset] = dset + elif is_s3_uri(dset): + filtered_refs[dset] = [dset] + elif not catalog: file_paths = dset_to_filepaths(dset, force=True) diff --git a/src/rook/utils/ops/helpers.py b/src/rook/utils/ops/helpers.py index 0f1fa94..f89a149 100644 --- a/src/rook/utils/ops/helpers.py +++ b/src/rook/utils/ops/helpers.py @@ -53,3 +53,18 @@ def is_kerchunk_file(dset): # Support local paths and URLs, including query fragments. path = urlsplit(value).path.lower() return path.endswith(KERCHUNK_EXTS) + + +def is_s3_uri(dset): + """Return True when the input points to an S3 object URI.""" + if isinstance(dset, Path): + dset = str(dset) + + if not isinstance(dset, str): + return False + + value = dset.strip() + if not value: + return False + + return value.lower().startswith("s3://") diff --git a/tests/test_ops_consolidate.py b/tests/test_ops_consolidate.py index 69e6e58..34662d5 100644 --- a/tests/test_ops_consolidate.py +++ b/tests/test_ops_consolidate.py @@ -18,3 +18,25 @@ def fail_get_catalog(_project): assert result == { "https://example.org/refs/mydataset.json": "https://example.org/refs/mydataset.json" } + + +def test_consolidate_s3_bypasses_catalog_and_mapper(monkeypatch): + def fail_get_project_name(_dset): + raise AssertionError("Project lookup should not be called for s3 input") + + def fail_get_catalog(_project): + raise AssertionError("Catalog lookup should not be called for s3 input") + + def fail_dset_to_filepaths(_dset, force=False): + raise AssertionError("dset_to_filepaths should not be called for s3 input") + + monkeypatch.setattr(consolidate, "get_project_name", fail_get_project_name) + monkeypatch.setattr(consolidate, "get_catalog", fail_get_catalog) + monkeypatch.setattr(consolidate, "dset_to_filepaths", fail_dset_to_filepaths) + + collection = DummyCollection(["s3://example-bucket/path/file.nc"]) + result = consolidate.consolidate(collection) + + assert result == { + "s3://example-bucket/path/file.nc": ["s3://example-bucket/path/file.nc"] + } diff --git a/tests/test_ops_helpers.py b/tests/test_ops_helpers.py index e1e9f90..d1131ea 100644 --- a/tests/test_ops_helpers.py +++ b/tests/test_ops_helpers.py @@ -86,3 +86,11 @@ def test_is_kerchunk_file_reference_scheme(): def test_is_kerchunk_file_non_kerchunk_path(): assert helpers.is_kerchunk_file("/data/file.nc") is False + + +def test_is_s3_uri_true(): + assert helpers.is_s3_uri("s3://my-bucket/path/file.nc") is True + + +def test_is_s3_uri_false_for_https(): + assert helpers.is_s3_uri("https://example.org/file.nc") is False From e4a7abc3309edbac7a085fa5353091858e247372 Mon Sep 17 00:00:00 2001 From: Pingu Carsti Date: Wed, 17 Jun 2026 18:12:15 +0200 Subject: [PATCH 2/5] added s3 config in roocs.ini --- src/rook/etc/roocs.ini | 10 +++++ src/rook/utils/ops/helpers.py | 76 ++++++++++++++++++++++++++++++++++- tests/test_ops_helpers.py | 67 ++++++++++++++++++++++++++++++ 3 files changed, 152 insertions(+), 1 deletion(-) diff --git a/src/rook/etc/roocs.ini b/src/rook/etc/roocs.ini index 5078e19..a6fdbd2 100644 --- a/src/rook/etc/roocs.ini +++ b/src/rook/etc/roocs.ini @@ -2,3 +2,13 @@ [catalog] intake_catalog_url = https://raw.githubusercontent.com/cp4cds/c3s_34g_manifests/master/intake/catalogs/c3s.yaml + +[s3] +# Example: +# anon = true +# endpoint_url = https://s3.example.org +# key = ${S3_ACCESS_KEY_ID} +# secret = ${S3_SECRET_ACCESS_KEY} +# token = ${S3_SESSION_TOKEN} +# storage_options_json = {"anon": true} +# client_kwargs_json = {"endpoint_url": "https://s3.example.org"} diff --git a/src/rook/utils/ops/helpers.py b/src/rook/utils/ops/helpers.py index f89a149..1ee16b7 100644 --- a/src/rook/utils/ops/helpers.py +++ b/src/rook/utils/ops/helpers.py @@ -1,11 +1,13 @@ """Helper utilities for operation plumbing.""" import collections +import json from pathlib import Path from urllib.parse import urlsplit from clisops.utils.dataset_utils import open_xr_dataset +from rook import CONFIG from rook.utils.apply_fixes import apply_fixes as apply_dataset_fixes KERCHUNK_EXTS = (".json", ".zst", ".zstd", ".parquet") @@ -20,7 +22,8 @@ def wrap_sequence(obj): def open_dataset(ds_id, file_paths, apply_fixes=True): """Open an xarray Dataset and optionally apply rook-native fixes.""" - ds = open_xr_dataset(file_paths) + open_kwargs = get_s3_open_kwargs(ds_id, file_paths) + ds = open_xr_dataset(file_paths, **open_kwargs) if apply_fixes and not is_kerchunk_file(ds_id): ds = apply_dataset_fixes(ds_id, ds) @@ -68,3 +71,74 @@ def is_s3_uri(dset): return False return value.lower().startswith("s3://") + + +def get_s3_open_kwargs(ds_id, file_paths): + """Return opener kwargs for S3-hosted NetCDF inputs.""" + dset = ds_id + if not isinstance(dset, str) and file_paths: + dset = str(file_paths[0]) + + if not is_s3_uri(dset) or is_kerchunk_file(dset): + return {} + + storage_options = get_s3_storage_options() + if not storage_options: + return {} + + return {"backend_kwargs": {"storage_options": storage_options}} + + +def get_s3_storage_options(): + """Build fsspec S3 storage options from rook config.""" + cfg = CONFIG.get("s3", {}) + if not isinstance(cfg, dict): + return {} + + options = {} + + raw_options = cfg.get("storage_options_json") + if raw_options: + parsed = _parse_json_dict(raw_options) + if parsed: + options.update(parsed) + + raw_client = cfg.get("client_kwargs_json") + if raw_client: + parsed = _parse_json_dict(raw_client) + if parsed: + options["client_kwargs"] = parsed + + endpoint_url = cfg.get("endpoint_url") + if endpoint_url: + options.setdefault("client_kwargs", {})["endpoint_url"] = endpoint_url + + for key in ("anon", "key", "secret", "token"): + value = cfg.get(key) + if value is None or value == "": + continue + if key == "anon": + value = _coerce_bool(value) + options[key] = value + + return options + + +def _parse_json_dict(value): + try: + parsed = json.loads(value) + except (TypeError, json.JSONDecodeError): + return {} + return parsed if isinstance(parsed, dict) else {} + + +def _coerce_bool(value): + if isinstance(value, bool): + return value + if isinstance(value, str): + lowered = value.strip().lower() + if lowered in {"1", "true", "yes", "on"}: + return True + if lowered in {"0", "false", "no", "off"}: + return False + return value diff --git a/tests/test_ops_helpers.py b/tests/test_ops_helpers.py index d1131ea..33bbca2 100644 --- a/tests/test_ops_helpers.py +++ b/tests/test_ops_helpers.py @@ -94,3 +94,70 @@ def test_is_s3_uri_true(): def test_is_s3_uri_false_for_https(): assert helpers.is_s3_uri("https://example.org/file.nc") is False + + +def test_get_s3_open_kwargs_for_s3_netcdf(monkeypatch): + monkeypatch.setattr( + helpers, + "CONFIG", + {"s3": {"anon": "true", "endpoint_url": "https://s3.example.org"}}, + ) + + kwargs = helpers.get_s3_open_kwargs( + "s3://example-bucket/path/file.nc", ["s3://example-bucket/path/file.nc"] + ) + + assert kwargs == { + "backend_kwargs": { + "storage_options": { + "anon": True, + "client_kwargs": {"endpoint_url": "https://s3.example.org"}, + } + } + } + + +def test_get_s3_open_kwargs_skips_kerchunk(monkeypatch): + monkeypatch.setattr( + helpers, + "CONFIG", + {"s3": {"anon": "true", "endpoint_url": "https://s3.example.org"}}, + ) + + kwargs = helpers.get_s3_open_kwargs( + "s3://example-bucket/path/ref.json", ["s3://example-bucket/path/ref.json"] + ) + + assert kwargs == {} + + +def test_open_dataset_passes_s3_backend_kwargs(monkeypatch): + calls = {"open_kwargs": None} + + def fake_open(file_paths, **kwargs): + calls["open_kwargs"] = kwargs + return "DATASET" + + monkeypatch.setattr(helpers, "open_xr_dataset", fake_open) + monkeypatch.setattr(helpers, "is_kerchunk_file", lambda _: False) + monkeypatch.setattr(helpers, "apply_dataset_fixes", lambda _ds_id, ds: ds) + monkeypatch.setattr( + helpers, + "CONFIG", + {"s3": {"anon": "true", "endpoint_url": "https://s3.example.org"}}, + ) + + _ = helpers.open_dataset( + "s3://example-bucket/path/file.nc", + ["s3://example-bucket/path/file.nc"], + apply_fixes=False, + ) + + assert calls["open_kwargs"] == { + "backend_kwargs": { + "storage_options": { + "anon": True, + "client_kwargs": {"endpoint_url": "https://s3.example.org"}, + } + } + } From 6e6990b3c2a335786039731e0ae7358bab7c835f Mon Sep 17 00:00:00 2001 From: Pingu Carsti Date: Wed, 17 Jun 2026 18:48:43 +0200 Subject: [PATCH 3/5] updating s3 config with base dir --- src/rook/catalog/base.py | 13 ++++-- src/rook/etc/roocs.ini | 5 ++ src/rook/utils/ops/helpers.py | 2 +- tests/test_catalog_base.py | 86 +++++++++++++++++++++++++++++++++++ tests/test_ops_helpers.py | 33 ++++++++++++++ 5 files changed, 134 insertions(+), 5 deletions(-) create mode 100644 tests/test_catalog_base.py diff --git a/src/rook/catalog/base.py b/src/rook/catalog/base.py index a5332e6..c3c734e 100644 --- a/src/rook/catalog/base.py +++ b/src/rook/catalog/base.py @@ -37,8 +37,13 @@ def __init__(self, project, records): Records are an OrderedDict of dataset ids with a list of files: {'ds_id': [files]}. """ - self.base_dir = CONFIG.get(f"project:{project}", {}).get("base_dir") - self.base_url = CONFIG.get(f"project:{project}", {}).get("data_node_root") + project_config = CONFIG.get(f"project:{project}", {}) + s3_config = CONFIG.get("s3", {}) + self.base_dir = project_config.get("base_dir") + self.s3_base_dir = project_config.get("s3_base_dir") or s3_config.get( + "base_dir" + ) + self.base_url = project_config.get("data_node_root") self.records = records @property @@ -52,7 +57,7 @@ def __len__(self): # noqa: D105 def _records(self, prefix): new_records = {} for ds_id, fpaths in self.records.items(): - if str(prefix).startswith(("http://", "https://")): + if str(prefix).startswith(("http://", "https://", "s3://")): new_records[ds_id] = [f"{str(prefix).rstrip('/')}/{fpath.lstrip('/')}" for fpath in fpaths] else: new_records[ds_id] = [str(Path(prefix) / fpath) for fpath in fpaths] @@ -60,7 +65,7 @@ def _records(self, prefix): def files(self): """Return matched records with file path.""" - return self._records(prefix=self.base_dir) + return self._records(prefix=self.s3_base_dir or self.base_dir) def download_urls(self): """Return matched records with download URL.""" diff --git a/src/rook/etc/roocs.ini b/src/rook/etc/roocs.ini index a6fdbd2..bc65849 100644 --- a/src/rook/etc/roocs.ini +++ b/src/rook/etc/roocs.ini @@ -5,6 +5,7 @@ intake_catalog_url = https://raw.githubusercontent.com/cp4cds/c3s_34g_manifests/ [s3] # Example: +# base_dir = s3://example-bucket/data # anon = true # endpoint_url = https://s3.example.org # key = ${S3_ACCESS_KEY_ID} @@ -12,3 +13,7 @@ intake_catalog_url = https://raw.githubusercontent.com/cp4cds/c3s_34g_manifests/ # token = ${S3_SESSION_TOKEN} # storage_options_json = {"anon": true} # client_kwargs_json = {"endpoint_url": "https://s3.example.org"} + +# Optional per-project override: +# [project:c3s-cmip6] +# s3_base_dir = s3://example-bucket/cmip6 diff --git a/src/rook/utils/ops/helpers.py b/src/rook/utils/ops/helpers.py index 1ee16b7..958ce1d 100644 --- a/src/rook/utils/ops/helpers.py +++ b/src/rook/utils/ops/helpers.py @@ -107,7 +107,7 @@ def get_s3_storage_options(): if raw_client: parsed = _parse_json_dict(raw_client) if parsed: - options["client_kwargs"] = parsed + options.setdefault("client_kwargs", {}).update(parsed) endpoint_url = cfg.get("endpoint_url") if endpoint_url: diff --git a/tests/test_catalog_base.py b/tests/test_catalog_base.py new file mode 100644 index 0000000..eb20ba5 --- /dev/null +++ b/tests/test_catalog_base.py @@ -0,0 +1,86 @@ +from rook.catalog import base +from rook.catalog.base import Result + + +RECORDS = { + "project.dataset": [ + "ScenarioMIP/Model/file_201501-210012.nc", + ] +} + + +def test_result_files_uses_project_base_dir(monkeypatch): + monkeypatch.setattr( + base, + "CONFIG", + {"project:c3s-cmip6": {"base_dir": "/data/CMIP6"}}, + ) + + result = Result("c3s-cmip6", RECORDS) + + assert result.files() == { + "project.dataset": ["/data/CMIP6/ScenarioMIP/Model/file_201501-210012.nc"] + } + + +def test_result_files_uses_global_s3_base_dir(monkeypatch): + monkeypatch.setattr( + base, + "CONFIG", + { + "project:c3s-cmip6": {"base_dir": "/data/CMIP6"}, + "s3": {"base_dir": "s3://example-bucket/data/CMIP6"}, + }, + ) + + result = Result("c3s-cmip6", RECORDS) + + assert result.files() == { + "project.dataset": [ + "s3://example-bucket/data/CMIP6/ScenarioMIP/Model/file_201501-210012.nc" + ] + } + + +def test_result_files_uses_project_s3_base_dir_override(monkeypatch): + monkeypatch.setattr( + base, + "CONFIG", + { + "project:c3s-cmip6": { + "base_dir": "/data/CMIP6", + "s3_base_dir": "s3://project-bucket/cmip6", + }, + "s3": {"base_dir": "s3://global-bucket/data"}, + }, + ) + + result = Result("c3s-cmip6", RECORDS) + + assert result.files() == { + "project.dataset": [ + "s3://project-bucket/cmip6/ScenarioMIP/Model/file_201501-210012.nc" + ] + } + + +def test_result_download_urls_keep_data_node_root(monkeypatch): + monkeypatch.setattr( + base, + "CONFIG", + { + "project:c3s-cmip6": { + "base_dir": "/data/CMIP6", + "data_node_root": "https://example.org/thredds/fileServer/cmip6", + }, + "s3": {"base_dir": "s3://example-bucket/data/CMIP6"}, + }, + ) + + result = Result("c3s-cmip6", RECORDS) + + assert result.download_urls() == { + "project.dataset": [ + "https://example.org/thredds/fileServer/cmip6/ScenarioMIP/Model/file_201501-210012.nc" + ] + } diff --git a/tests/test_ops_helpers.py b/tests/test_ops_helpers.py index 33bbca2..1936089 100644 --- a/tests/test_ops_helpers.py +++ b/tests/test_ops_helpers.py @@ -117,6 +117,16 @@ def test_get_s3_open_kwargs_for_s3_netcdf(monkeypatch): } +def test_get_s3_open_kwargs_without_s3_config(monkeypatch): + monkeypatch.setattr(helpers, "CONFIG", {}) + + kwargs = helpers.get_s3_open_kwargs( + "s3://example-bucket/path/file.nc", ["s3://example-bucket/path/file.nc"] + ) + + assert kwargs == {} + + def test_get_s3_open_kwargs_skips_kerchunk(monkeypatch): monkeypatch.setattr( helpers, @@ -131,6 +141,29 @@ def test_get_s3_open_kwargs_skips_kerchunk(monkeypatch): assert kwargs == {} +def test_get_s3_storage_options_merges_client_kwargs(monkeypatch): + monkeypatch.setattr( + helpers, + "CONFIG", + { + "s3": { + "storage_options_json": '{"anon": true, "client_kwargs": {"region_name": "eu-west-1"}}', + "client_kwargs_json": '{"use_ssl": false}', + "endpoint_url": "https://s3.example.org", + } + }, + ) + + assert helpers.get_s3_storage_options() == { + "anon": True, + "client_kwargs": { + "region_name": "eu-west-1", + "use_ssl": False, + "endpoint_url": "https://s3.example.org", + }, + } + + def test_open_dataset_passes_s3_backend_kwargs(monkeypatch): calls = {"open_kwargs": None} From 74727b68f2ed516a4c9c7492325acb6464bdbb05 Mon Sep 17 00:00:00 2001 From: Pingu Carsti Date: Wed, 17 Jun 2026 18:53:50 +0200 Subject: [PATCH 4/5] added s3fs consolidate test --- CHANGELOG.rst | 4 +++- tests/test_ops_consolidate.py | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 16ccb28..796d360 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -4,7 +4,9 @@ Changes Unreleased ========== -* No changes yet. +* Added preliminary S3 support by declaring ``s3fs``, allowing direct ``s3://`` + NetCDF inputs, and preparing optional S3 base-path mapping for catalog-backed + processing paths. 1.1.1 (2026-06-17) ================== diff --git a/tests/test_ops_consolidate.py b/tests/test_ops_consolidate.py index 34662d5..b2b7903 100644 --- a/tests/test_ops_consolidate.py +++ b/tests/test_ops_consolidate.py @@ -1,4 +1,6 @@ import rook.utils.ops.consolidate as consolidate +from rook.catalog import base +from rook.catalog.base import Result class DummyCollection: @@ -40,3 +42,35 @@ def fail_dset_to_filepaths(_dset, force=False): assert result == { "s3://example-bucket/path/file.nc": ["s3://example-bucket/path/file.nc"] } + + +def test_consolidate_catalog_files_can_use_s3_base_dir(monkeypatch): + class DummyCatalog: + def search(self, collection, time): + assert collection == "dataset" + assert time is None + return Result( + "c3s-cmip6", + {"c3s-cmip6.dataset": ["ScenarioMIP/Model/file_201501-210012.nc"]}, + ) + + monkeypatch.setattr(consolidate, "get_project_name", lambda _dset: "c3s-cmip6") + monkeypatch.setattr(consolidate, "derive_ds_id", lambda _dset: "dataset") + monkeypatch.setattr(consolidate, "get_catalog", lambda _project: DummyCatalog()) + monkeypatch.setattr( + base, + "CONFIG", + { + "project:c3s-cmip6": {"base_dir": "/data/CMIP6"}, + "s3": {"base_dir": "s3://example-bucket/data/CMIP6"}, + }, + ) + + collection = DummyCollection(["c3s-cmip6.dataset"]) + result = consolidate.consolidate(collection) + + assert result == { + "c3s-cmip6.dataset": [ + "s3://example-bucket/data/CMIP6/ScenarioMIP/Model/file_201501-210012.nc" + ] + } From 22ce19aecf9d7e284198bae6a5317e639bb0196e Mon Sep 17 00:00:00 2001 From: Pingu Carsti Date: Wed, 17 Jun 2026 18:59:54 +0200 Subject: [PATCH 5/5] lint --- tests/test_ops_consolidate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_ops_consolidate.py b/tests/test_ops_consolidate.py index b2b7903..ea230ff 100644 --- a/tests/test_ops_consolidate.py +++ b/tests/test_ops_consolidate.py @@ -29,7 +29,7 @@ def fail_get_project_name(_dset): def fail_get_catalog(_project): raise AssertionError("Catalog lookup should not be called for s3 input") - def fail_dset_to_filepaths(_dset, force=False): + def fail_dset_to_filepaths(_dset, **_kwargs): raise AssertionError("dset_to_filepaths should not be called for s3 input") monkeypatch.setattr(consolidate, "get_project_name", fail_get_project_name)