Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/rook/io/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Dataset input and output utilities."""

from .datasets import open_dataset

__all__ = ["open_dataset"]
128 changes: 128 additions & 0 deletions src/rook/io/datasets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""Utilities for detecting and opening supported datasets."""

from pathlib import Path
from urllib.parse import urlsplit

import xarray as xr
from clisops.utils.dataset_utils import open_xr_dataset

from rook import config
from rook.utils.apply_fixes import apply_fixes as apply_dataset_fixes

KERCHUNK_EXTS = (".json", ".zst", ".zstd", ".parquet")
ZARR_EXT = ".zarr"


def open_dataset(ds_id, file_paths, apply_fixes=True):
"""Open an xarray Dataset and optionally apply rook-native fixes."""
zarr_store = get_zarr_store(ds_id, file_paths)
if zarr_store:
ds = xr.open_zarr(zarr_store, **get_zarr_open_kwargs(zarr_store))
else:
open_kwargs = get_s3_open_kwargs(ds_id, file_paths)
ds = open_xr_dataset(file_paths, **open_kwargs)

if apply_fixes and not is_kerchunk_file(ds_id) and not is_zarr_store(ds_id):
ds = apply_dataset_fixes(ds_id, ds)

return ds


def is_kerchunk_file(dset):
# Keep this local detector in sync with clisops and upstream when possible.
# Rook currently needs URL-aware kerchunk detection before clisops changes land.
"""Return True when the input looks like a kerchunk reference file."""
if isinstance(dset, Path):
dset = str(dset)

if not isinstance(dset, str):
return False

value = dset.strip()
if not value:
return False

if value.lower().startswith("reference://"):
return True

# Support local paths and URLs, including query fragments.
path = urlsplit(value).path.lower()
return path.endswith(KERCHUNK_EXTS)


def is_s3_uri(dset):
"""Return True when the input points to an S3 object URI."""
if isinstance(dset, Path):
dset = str(dset)

if not isinstance(dset, str):
return False

value = dset.strip()
if not value:
return False

return value.lower().startswith("s3://")


def is_zarr_store(dset):
"""Return True when the input looks like a Zarr store path."""
if isinstance(dset, Path):
dset = str(dset)

if not isinstance(dset, str):
return False

value = dset.strip()
if not value:
return False

path = urlsplit(value).path.rstrip("/").lower()
return path.endswith(ZARR_EXT)


def get_zarr_store(ds_id, file_paths):
"""Return a single Zarr store from a dataset id or resolved file paths."""
if is_zarr_store(ds_id):
return str(ds_id)

if isinstance(file_paths, (str, Path)):
return str(file_paths) if is_zarr_store(file_paths) else None

if file_paths and len(file_paths) == 1 and is_zarr_store(file_paths[0]):
return str(file_paths[0])

return None


def get_zarr_open_kwargs(store):
"""Return xarray opener kwargs for a Zarr store."""
if not is_s3_uri(store):
return {}

storage_options = get_s3_storage_options()
if not storage_options:
return {}

return {"storage_options": storage_options}


def get_s3_open_kwargs(ds_id, file_paths):
"""Return opener kwargs for S3-hosted NetCDF inputs."""
dset = ds_id
if not isinstance(dset, str) and file_paths:
dset = str(file_paths[0])

if not is_s3_uri(dset) or is_kerchunk_file(dset) or is_zarr_store(dset):
return {}

storage_options = get_s3_storage_options()
if not storage_options:
return {}

return {"backend_kwargs": {"storage_options": storage_options}}


def get_s3_storage_options():
"""Return shared S3 transport options from central configuration."""
return config.get_s3_storage_options()
9 changes: 2 additions & 7 deletions src/rook/utils/ops/consolidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,9 @@
from loguru import logger

from rook.catalog import get_catalog
from rook.io.datasets import is_kerchunk_file, is_s3_uri, is_zarr_store

from .helpers import (
is_kerchunk_file,
is_s3_uri,
is_zarr_store,
ordered_dict,
wrap_sequence,
)
from .helpers import ordered_dict, wrap_sequence


def to_year(time_string):
Expand Down
130 changes: 0 additions & 130 deletions src/rook/utils/ops/helpers.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
"""Helper utilities for operation plumbing."""

import collections
from pathlib import Path
from urllib.parse import urlsplit

import xarray as xr
from clisops.utils.dataset_utils import open_xr_dataset

from rook import config
from rook.utils.apply_fixes import apply_fixes as apply_dataset_fixes

KERCHUNK_EXTS = (".json", ".zst", ".zstd", ".parquet")
ZARR_EXT = ".zarr"


def wrap_sequence(obj):
Expand All @@ -21,125 +10,6 @@ def wrap_sequence(obj):
return obj


def open_dataset(ds_id, file_paths, apply_fixes=True):
"""Open an xarray Dataset and optionally apply rook-native fixes."""
zarr_store = get_zarr_store(ds_id, file_paths)
if zarr_store:
ds = xr.open_zarr(zarr_store, **get_zarr_open_kwargs(zarr_store))
else:
open_kwargs = get_s3_open_kwargs(ds_id, file_paths)
ds = open_xr_dataset(file_paths, **open_kwargs)

if (
apply_fixes
and not is_kerchunk_file(ds_id)
and not is_zarr_store(ds_id)
):
ds = apply_dataset_fixes(ds_id, ds)

return ds


def ordered_dict():
"""Return an OrderedDict instance."""
return collections.OrderedDict()


def is_kerchunk_file(dset):
# Keep this local detector in sync with clisops and upstream when possible.
# Rook currently needs URL-aware kerchunk detection before clisops changes land.
"""Return True when the input looks like a kerchunk reference file."""
if isinstance(dset, Path):
dset = str(dset)

if not isinstance(dset, str):
return False

value = dset.strip()
if not value:
return False

if value.lower().startswith("reference://"):
return True

# Support local paths and URLs, including query fragments.
path = urlsplit(value).path.lower()
return path.endswith(KERCHUNK_EXTS)


def is_s3_uri(dset):
"""Return True when the input points to an S3 object URI."""
if isinstance(dset, Path):
dset = str(dset)

if not isinstance(dset, str):
return False

value = dset.strip()
if not value:
return False

return value.lower().startswith("s3://")


def is_zarr_store(dset):
"""Return True when the input looks like a Zarr store path."""
if isinstance(dset, Path):
dset = str(dset)

if not isinstance(dset, str):
return False

value = dset.strip()
if not value:
return False

path = urlsplit(value).path.rstrip("/").lower()
return path.endswith(ZARR_EXT)


def get_zarr_store(ds_id, file_paths):
"""Return a single Zarr store from a dataset id or resolved file paths."""
if is_zarr_store(ds_id):
return str(ds_id)

if isinstance(file_paths, (str, Path)):
return str(file_paths) if is_zarr_store(file_paths) else None

if file_paths and len(file_paths) == 1 and is_zarr_store(file_paths[0]):
return str(file_paths[0])

return None


def get_zarr_open_kwargs(store):
"""Return xarray opener kwargs for a Zarr store."""
if not is_s3_uri(store):
return {}

storage_options = get_s3_storage_options()
if not storage_options:
return {}

return {"storage_options": storage_options}


def get_s3_open_kwargs(ds_id, file_paths):
"""Return opener kwargs for S3-hosted NetCDF inputs."""
dset = ds_id
if not isinstance(dset, str) and file_paths:
dset = str(file_paths[0])

if not is_s3_uri(dset) or is_kerchunk_file(dset) or is_zarr_store(dset):
return {}

storage_options = get_s3_storage_options()
if not storage_options:
return {}

return {"backend_kwargs": {"storage_options": storage_options}}


def get_s3_storage_options():
"""Return shared S3 transport options from central configuration."""
return config.get_s3_storage_options()
6 changes: 4 additions & 2 deletions src/rook/utils/ops/normalise.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Normalise datasets and hold operation results."""

import pathlib

from loguru import logger

from .helpers import open_dataset, ordered_dict
import pathlib
from rook.io.datasets import open_dataset

from .helpers import ordered_dict


def normalise(collection, apply_fixes=True):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_ops_helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import xarray as xr

from rook import config
import rook.utils.ops.helpers as helpers
import rook.io.datasets as helpers


def test_open_dataset_applies_fixes(monkeypatch):
Expand Down