diff --git a/src/rook/io/__init__.py b/src/rook/io/__init__.py new file mode 100644 index 0000000..9c8c139 --- /dev/null +++ b/src/rook/io/__init__.py @@ -0,0 +1,5 @@ +"""Dataset input and output utilities.""" + +from .datasets import open_dataset + +__all__ = ["open_dataset"] diff --git a/src/rook/io/datasets.py b/src/rook/io/datasets.py new file mode 100644 index 0000000..28d5aaf --- /dev/null +++ b/src/rook/io/datasets.py @@ -0,0 +1,128 @@ +"""Utilities for detecting and opening supported datasets.""" + +from pathlib import Path +from urllib.parse import urlsplit + +import xarray as xr +from clisops.utils.dataset_utils import open_xr_dataset + +from rook import config +from rook.utils.apply_fixes import apply_fixes as apply_dataset_fixes + +KERCHUNK_EXTS = (".json", ".zst", ".zstd", ".parquet") +ZARR_EXT = ".zarr" + + +def open_dataset(ds_id, file_paths, apply_fixes=True): + """Open an xarray Dataset and optionally apply rook-native fixes.""" + zarr_store = get_zarr_store(ds_id, file_paths) + if zarr_store: + ds = xr.open_zarr(zarr_store, **get_zarr_open_kwargs(zarr_store)) + else: + open_kwargs = get_s3_open_kwargs(ds_id, file_paths) + ds = open_xr_dataset(file_paths, **open_kwargs) + + if apply_fixes and not is_kerchunk_file(ds_id) and not is_zarr_store(ds_id): + ds = apply_dataset_fixes(ds_id, ds) + + return ds + + +def is_kerchunk_file(dset): + # Keep this local detector in sync with clisops and upstream when possible. + # Rook currently needs URL-aware kerchunk detection before clisops changes land. + """Return True when the input looks like a kerchunk reference file.""" + if isinstance(dset, Path): + dset = str(dset) + + if not isinstance(dset, str): + return False + + value = dset.strip() + if not value: + return False + + if value.lower().startswith("reference://"): + return True + + # Support local paths and URLs, including query fragments. + path = urlsplit(value).path.lower() + return path.endswith(KERCHUNK_EXTS) + + +def is_s3_uri(dset): + """Return True when the input points to an S3 object URI.""" + if isinstance(dset, Path): + dset = str(dset) + + if not isinstance(dset, str): + return False + + value = dset.strip() + if not value: + return False + + return value.lower().startswith("s3://") + + +def is_zarr_store(dset): + """Return True when the input looks like a Zarr store path.""" + if isinstance(dset, Path): + dset = str(dset) + + if not isinstance(dset, str): + return False + + value = dset.strip() + if not value: + return False + + path = urlsplit(value).path.rstrip("/").lower() + return path.endswith(ZARR_EXT) + + +def get_zarr_store(ds_id, file_paths): + """Return a single Zarr store from a dataset id or resolved file paths.""" + if is_zarr_store(ds_id): + return str(ds_id) + + if isinstance(file_paths, (str, Path)): + return str(file_paths) if is_zarr_store(file_paths) else None + + if file_paths and len(file_paths) == 1 and is_zarr_store(file_paths[0]): + return str(file_paths[0]) + + return None + + +def get_zarr_open_kwargs(store): + """Return xarray opener kwargs for a Zarr store.""" + if not is_s3_uri(store): + return {} + + storage_options = get_s3_storage_options() + if not storage_options: + return {} + + return {"storage_options": storage_options} + + +def get_s3_open_kwargs(ds_id, file_paths): + """Return opener kwargs for S3-hosted NetCDF inputs.""" + dset = ds_id + if not isinstance(dset, str) and file_paths: + dset = str(file_paths[0]) + + if not is_s3_uri(dset) or is_kerchunk_file(dset) or is_zarr_store(dset): + return {} + + storage_options = get_s3_storage_options() + if not storage_options: + return {} + + return {"backend_kwargs": {"storage_options": storage_options}} + + +def get_s3_storage_options(): + """Return shared S3 transport options from central configuration.""" + return config.get_s3_storage_options() diff --git a/src/rook/utils/ops/consolidate.py b/src/rook/utils/ops/consolidate.py index a6c4dc6..7500be0 100644 --- a/src/rook/utils/ops/consolidate.py +++ b/src/rook/utils/ops/consolidate.py @@ -10,14 +10,9 @@ from loguru import logger from rook.catalog import get_catalog +from rook.io.datasets import is_kerchunk_file, is_s3_uri, is_zarr_store -from .helpers import ( - is_kerchunk_file, - is_s3_uri, - is_zarr_store, - ordered_dict, - wrap_sequence, -) +from .helpers import ordered_dict, wrap_sequence def to_year(time_string): diff --git a/src/rook/utils/ops/helpers.py b/src/rook/utils/ops/helpers.py index 6f1f5f4..47b05af 100644 --- a/src/rook/utils/ops/helpers.py +++ b/src/rook/utils/ops/helpers.py @@ -1,17 +1,6 @@ """Helper utilities for operation plumbing.""" import collections -from pathlib import Path -from urllib.parse import urlsplit - -import xarray as xr -from clisops.utils.dataset_utils import open_xr_dataset - -from rook import config -from rook.utils.apply_fixes import apply_fixes as apply_dataset_fixes - -KERCHUNK_EXTS = (".json", ".zst", ".zstd", ".parquet") -ZARR_EXT = ".zarr" def wrap_sequence(obj): @@ -21,125 +10,6 @@ def wrap_sequence(obj): return obj -def open_dataset(ds_id, file_paths, apply_fixes=True): - """Open an xarray Dataset and optionally apply rook-native fixes.""" - zarr_store = get_zarr_store(ds_id, file_paths) - if zarr_store: - ds = xr.open_zarr(zarr_store, **get_zarr_open_kwargs(zarr_store)) - else: - open_kwargs = get_s3_open_kwargs(ds_id, file_paths) - ds = open_xr_dataset(file_paths, **open_kwargs) - - if ( - apply_fixes - and not is_kerchunk_file(ds_id) - and not is_zarr_store(ds_id) - ): - ds = apply_dataset_fixes(ds_id, ds) - - return ds - - def ordered_dict(): """Return an OrderedDict instance.""" return collections.OrderedDict() - - -def is_kerchunk_file(dset): - # Keep this local detector in sync with clisops and upstream when possible. - # Rook currently needs URL-aware kerchunk detection before clisops changes land. - """Return True when the input looks like a kerchunk reference file.""" - if isinstance(dset, Path): - dset = str(dset) - - if not isinstance(dset, str): - return False - - value = dset.strip() - if not value: - return False - - if value.lower().startswith("reference://"): - return True - - # Support local paths and URLs, including query fragments. - path = urlsplit(value).path.lower() - return path.endswith(KERCHUNK_EXTS) - - -def is_s3_uri(dset): - """Return True when the input points to an S3 object URI.""" - if isinstance(dset, Path): - dset = str(dset) - - if not isinstance(dset, str): - return False - - value = dset.strip() - if not value: - return False - - return value.lower().startswith("s3://") - - -def is_zarr_store(dset): - """Return True when the input looks like a Zarr store path.""" - if isinstance(dset, Path): - dset = str(dset) - - if not isinstance(dset, str): - return False - - value = dset.strip() - if not value: - return False - - path = urlsplit(value).path.rstrip("/").lower() - return path.endswith(ZARR_EXT) - - -def get_zarr_store(ds_id, file_paths): - """Return a single Zarr store from a dataset id or resolved file paths.""" - if is_zarr_store(ds_id): - return str(ds_id) - - if isinstance(file_paths, (str, Path)): - return str(file_paths) if is_zarr_store(file_paths) else None - - if file_paths and len(file_paths) == 1 and is_zarr_store(file_paths[0]): - return str(file_paths[0]) - - return None - - -def get_zarr_open_kwargs(store): - """Return xarray opener kwargs for a Zarr store.""" - if not is_s3_uri(store): - return {} - - storage_options = get_s3_storage_options() - if not storage_options: - return {} - - return {"storage_options": storage_options} - - -def get_s3_open_kwargs(ds_id, file_paths): - """Return opener kwargs for S3-hosted NetCDF inputs.""" - dset = ds_id - if not isinstance(dset, str) and file_paths: - dset = str(file_paths[0]) - - if not is_s3_uri(dset) or is_kerchunk_file(dset) or is_zarr_store(dset): - return {} - - storage_options = get_s3_storage_options() - if not storage_options: - return {} - - return {"backend_kwargs": {"storage_options": storage_options}} - - -def get_s3_storage_options(): - """Return shared S3 transport options from central configuration.""" - return config.get_s3_storage_options() diff --git a/src/rook/utils/ops/normalise.py b/src/rook/utils/ops/normalise.py index 7ac7ce9..ab17ad6 100644 --- a/src/rook/utils/ops/normalise.py +++ b/src/rook/utils/ops/normalise.py @@ -1,10 +1,12 @@ """Normalise datasets and hold operation results.""" +import pathlib from loguru import logger -from .helpers import open_dataset, ordered_dict -import pathlib +from rook.io.datasets import open_dataset + +from .helpers import ordered_dict def normalise(collection, apply_fixes=True): diff --git a/tests/test_ops_helpers.py b/tests/test_ops_helpers.py index 87381cd..bf10c77 100644 --- a/tests/test_ops_helpers.py +++ b/tests/test_ops_helpers.py @@ -1,7 +1,7 @@ import xarray as xr from rook import config -import rook.utils.ops.helpers as helpers +import rook.io.datasets as helpers def test_open_dataset_applies_fixes(monkeypatch):