Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ Changes
Unreleased
==========

* No changes yet.
* Added preliminary S3 support by declaring ``s3fs``, allowing direct ``s3://``
NetCDF inputs, and preparing optional S3 base-path mapping for catalog-backed
processing paths.

1.1.1 (2026-06-17)
==================
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies:
# catalog
- intake <2.0
- fsspec
- s3fs
- pandas >=2.0
- sqlalchemy >=2.0
- aiohttp
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dependencies = [
"pydot",
"intake <2.0",
"fsspec",
"s3fs",
"pandas >=2.0",
"SQLAlchemy >=2.0",
"aiohttp",
Expand Down
13 changes: 9 additions & 4 deletions src/rook/catalog/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ def __init__(self, project, records):

Records are an OrderedDict of dataset ids with a list of files: {'ds_id': [files]}.
"""
self.base_dir = CONFIG.get(f"project:{project}", {}).get("base_dir")
self.base_url = CONFIG.get(f"project:{project}", {}).get("data_node_root")
project_config = CONFIG.get(f"project:{project}", {})
s3_config = CONFIG.get("s3", {})
self.base_dir = project_config.get("base_dir")
self.s3_base_dir = project_config.get("s3_base_dir") or s3_config.get(
"base_dir"
)
self.base_url = project_config.get("data_node_root")
self.records = records

@property
Expand All @@ -52,15 +57,15 @@ def __len__(self): # noqa: D105
def _records(self, prefix):
new_records = {}
for ds_id, fpaths in self.records.items():
if str(prefix).startswith(("http://", "https://")):
if str(prefix).startswith(("http://", "https://", "s3://")):
new_records[ds_id] = [f"{str(prefix).rstrip('/')}/{fpath.lstrip('/')}" for fpath in fpaths]
else:
new_records[ds_id] = [str(Path(prefix) / fpath) for fpath in fpaths]
return new_records

def files(self):
"""Return matched records with file path."""
return self._records(prefix=self.base_dir)
return self._records(prefix=self.s3_base_dir or self.base_dir)

def download_urls(self):
"""Return matched records with download URL."""
Expand Down
15 changes: 15 additions & 0 deletions src/rook/etc/roocs.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,18 @@

[catalog]
intake_catalog_url = https://raw.githubusercontent.com/cp4cds/c3s_34g_manifests/master/intake/catalogs/c3s.yaml

[s3]
# Example:
# base_dir = s3://example-bucket/data
# anon = true
# endpoint_url = https://s3.example.org
# key = ${S3_ACCESS_KEY_ID}
# secret = ${S3_SECRET_ACCESS_KEY}
# token = ${S3_SESSION_TOKEN}
# storage_options_json = {"anon": true}
# client_kwargs_json = {"endpoint_url": "https://s3.example.org"}

# Optional per-project override:
# [project:c3s-cmip6]
# s3_base_dir = s3://example-bucket/cmip6
11 changes: 9 additions & 2 deletions src/rook/utils/ops/consolidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from rook.catalog import get_catalog

from .helpers import is_kerchunk_file, ordered_dict, wrap_sequence
from .helpers import is_kerchunk_file, is_s3_uri, ordered_dict, wrap_sequence


def to_year(time_string):
Expand Down Expand Up @@ -78,7 +78,11 @@ def consolidate(collection, **kwargs):

collection = wrap_sequence(collection.value)

if not isinstance(collection[0], FileMapper) and not is_kerchunk_file(collection[0]):
if (
not isinstance(collection[0], FileMapper)
and not is_kerchunk_file(collection[0])
and not is_s3_uri(collection[0])
):
project = get_project_name(collection[0])
catalog = get_catalog(project)

Expand All @@ -90,6 +94,9 @@ def consolidate(collection, **kwargs):
if is_kerchunk_file(dset):
filtered_refs[dset] = dset

elif is_s3_uri(dset):
filtered_refs[dset] = [dset]

elif not catalog:
file_paths = dset_to_filepaths(dset, force=True)

Expand Down
91 changes: 90 additions & 1 deletion src/rook/utils/ops/helpers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Helper utilities for operation plumbing."""

import collections
import json
from pathlib import Path
from urllib.parse import urlsplit

from clisops.utils.dataset_utils import open_xr_dataset

from rook import CONFIG
from rook.utils.apply_fixes import apply_fixes as apply_dataset_fixes

KERCHUNK_EXTS = (".json", ".zst", ".zstd", ".parquet")
Expand All @@ -20,7 +22,8 @@ def wrap_sequence(obj):

def open_dataset(ds_id, file_paths, apply_fixes=True):
"""Open an xarray Dataset and optionally apply rook-native fixes."""
ds = open_xr_dataset(file_paths)
open_kwargs = get_s3_open_kwargs(ds_id, file_paths)
ds = open_xr_dataset(file_paths, **open_kwargs)

if apply_fixes and not is_kerchunk_file(ds_id):
ds = apply_dataset_fixes(ds_id, ds)
Expand Down Expand Up @@ -53,3 +56,89 @@ def is_kerchunk_file(dset):
# Support local paths and URLs, including query fragments.
path = urlsplit(value).path.lower()
return path.endswith(KERCHUNK_EXTS)


def is_s3_uri(dset):
"""Return True when the input points to an S3 object URI."""
if isinstance(dset, Path):
dset = str(dset)

if not isinstance(dset, str):
return False

value = dset.strip()
if not value:
return False

return value.lower().startswith("s3://")


def get_s3_open_kwargs(ds_id, file_paths):
"""Return opener kwargs for S3-hosted NetCDF inputs."""
dset = ds_id
if not isinstance(dset, str) and file_paths:
dset = str(file_paths[0])

if not is_s3_uri(dset) or is_kerchunk_file(dset):
return {}

storage_options = get_s3_storage_options()
if not storage_options:
return {}

return {"backend_kwargs": {"storage_options": storage_options}}


def get_s3_storage_options():
"""Build fsspec S3 storage options from rook config."""
cfg = CONFIG.get("s3", {})
if not isinstance(cfg, dict):
return {}

options = {}

raw_options = cfg.get("storage_options_json")
if raw_options:
parsed = _parse_json_dict(raw_options)
if parsed:
options.update(parsed)

raw_client = cfg.get("client_kwargs_json")
if raw_client:
parsed = _parse_json_dict(raw_client)
if parsed:
options.setdefault("client_kwargs", {}).update(parsed)

endpoint_url = cfg.get("endpoint_url")
if endpoint_url:
options.setdefault("client_kwargs", {})["endpoint_url"] = endpoint_url

for key in ("anon", "key", "secret", "token"):
value = cfg.get(key)
if value is None or value == "":
continue
if key == "anon":
value = _coerce_bool(value)
options[key] = value

return options


def _parse_json_dict(value):
try:
parsed = json.loads(value)
except (TypeError, json.JSONDecodeError):
return {}
return parsed if isinstance(parsed, dict) else {}


def _coerce_bool(value):
if isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in {"1", "true", "yes", "on"}:
return True
if lowered in {"0", "false", "no", "off"}:
return False
return value
86 changes: 86 additions & 0 deletions tests/test_catalog_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from rook.catalog import base
from rook.catalog.base import Result


RECORDS = {
"project.dataset": [
"ScenarioMIP/Model/file_201501-210012.nc",
]
}


def test_result_files_uses_project_base_dir(monkeypatch):
monkeypatch.setattr(
base,
"CONFIG",
{"project:c3s-cmip6": {"base_dir": "/data/CMIP6"}},
)

result = Result("c3s-cmip6", RECORDS)

assert result.files() == {
"project.dataset": ["/data/CMIP6/ScenarioMIP/Model/file_201501-210012.nc"]
}


def test_result_files_uses_global_s3_base_dir(monkeypatch):
monkeypatch.setattr(
base,
"CONFIG",
{
"project:c3s-cmip6": {"base_dir": "/data/CMIP6"},
"s3": {"base_dir": "s3://example-bucket/data/CMIP6"},
},
)

result = Result("c3s-cmip6", RECORDS)

assert result.files() == {
"project.dataset": [
"s3://example-bucket/data/CMIP6/ScenarioMIP/Model/file_201501-210012.nc"
]
}


def test_result_files_uses_project_s3_base_dir_override(monkeypatch):
monkeypatch.setattr(
base,
"CONFIG",
{
"project:c3s-cmip6": {
"base_dir": "/data/CMIP6",
"s3_base_dir": "s3://project-bucket/cmip6",
},
"s3": {"base_dir": "s3://global-bucket/data"},
},
)

result = Result("c3s-cmip6", RECORDS)

assert result.files() == {
"project.dataset": [
"s3://project-bucket/cmip6/ScenarioMIP/Model/file_201501-210012.nc"
]
}


def test_result_download_urls_keep_data_node_root(monkeypatch):
monkeypatch.setattr(
base,
"CONFIG",
{
"project:c3s-cmip6": {
"base_dir": "/data/CMIP6",
"data_node_root": "https://example.org/thredds/fileServer/cmip6",
},
"s3": {"base_dir": "s3://example-bucket/data/CMIP6"},
},
)

result = Result("c3s-cmip6", RECORDS)

assert result.download_urls() == {
"project.dataset": [
"https://example.org/thredds/fileServer/cmip6/ScenarioMIP/Model/file_201501-210012.nc"
]
}
56 changes: 56 additions & 0 deletions tests/test_ops_consolidate.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import rook.utils.ops.consolidate as consolidate
from rook.catalog import base
from rook.catalog.base import Result


class DummyCollection:
Expand All @@ -18,3 +20,57 @@ def fail_get_catalog(_project):
assert result == {
"https://example.org/refs/mydataset.json": "https://example.org/refs/mydataset.json"
}


def test_consolidate_s3_bypasses_catalog_and_mapper(monkeypatch):
def fail_get_project_name(_dset):
raise AssertionError("Project lookup should not be called for s3 input")

def fail_get_catalog(_project):
raise AssertionError("Catalog lookup should not be called for s3 input")

def fail_dset_to_filepaths(_dset, **_kwargs):
raise AssertionError("dset_to_filepaths should not be called for s3 input")

monkeypatch.setattr(consolidate, "get_project_name", fail_get_project_name)
monkeypatch.setattr(consolidate, "get_catalog", fail_get_catalog)
monkeypatch.setattr(consolidate, "dset_to_filepaths", fail_dset_to_filepaths)

collection = DummyCollection(["s3://example-bucket/path/file.nc"])
result = consolidate.consolidate(collection)

assert result == {
"s3://example-bucket/path/file.nc": ["s3://example-bucket/path/file.nc"]
}


def test_consolidate_catalog_files_can_use_s3_base_dir(monkeypatch):
class DummyCatalog:
def search(self, collection, time):
assert collection == "dataset"
assert time is None
return Result(
"c3s-cmip6",
{"c3s-cmip6.dataset": ["ScenarioMIP/Model/file_201501-210012.nc"]},
)

monkeypatch.setattr(consolidate, "get_project_name", lambda _dset: "c3s-cmip6")
monkeypatch.setattr(consolidate, "derive_ds_id", lambda _dset: "dataset")
monkeypatch.setattr(consolidate, "get_catalog", lambda _project: DummyCatalog())
monkeypatch.setattr(
base,
"CONFIG",
{
"project:c3s-cmip6": {"base_dir": "/data/CMIP6"},
"s3": {"base_dir": "s3://example-bucket/data/CMIP6"},
},
)

collection = DummyCollection(["c3s-cmip6.dataset"])
result = consolidate.consolidate(collection)

assert result == {
"c3s-cmip6.dataset": [
"s3://example-bucket/data/CMIP6/ScenarioMIP/Model/file_201501-210012.nc"
]
}
Loading
Loading