diff --git a/.claude/skills/create-data-source/SKILL.md b/.claude/skills/create-data-source/SKILL.md index c359fe6bd..76cc27ae4 100644 --- a/.claude/skills/create-data-source/SKILL.md +++ b/.claude/skills/create-data-source/SKILL.md @@ -1679,7 +1679,7 @@ Zarr file and mock `fetch_array` or the filesystem layer. - **At least one mock test** per source (see 12d) — no network, no timeout, no xfail - **Target 90%+ line coverage** on the source module when running - with `--slow` (see Step 13b). Use `--cov-report=term-missing` + with `--slow` (see Step 13c). Use `--cov-report=term-missing` to identify uncovered lines. - Run via: `make pytest TOX_ENV=test-data` or `pytest test/data/test_.py -v` @@ -1720,16 +1720,34 @@ Present: ## Step 13 — Run Tests -### 13a. Run the new test file +### 13a. Run non-slow tests first + +Run only the non-slow (offline / mock) tests to verify they all pass: + +```bash +uv run python -m pytest test/data/test_.py -v +``` + +All non-slow tests must pass. Fix failures and re-run until green. + +### 13b. Run slow tests without timeout + +Run the slow (network) tests **without** a `--timeout` CLI flag to +ensure they actually complete successfully without being killed by +a global timeout. The per-test `@pytest.mark.timeout()` decorators +remain on the tests as a safeguard, but the CLI must not impose an +additional cap: ```bash -uv run python -m pytest test/data/test_.py -v --timeout=60 +uv run python -m pytest test/data/test_.py -v --slow ``` -All tests must pass (or `xfail` for network tests). Fix failures -and re-run until green. +All slow tests should either pass or `xfail`. If any test times out +via its `@pytest.mark.timeout()` decorator, investigate whether the +timeout value is too low or the fetch is genuinely hanging, and fix +accordingly. -### 13b. Run coverage report with `--slow` tests +### 13c. Run coverage report with `--slow` tests Run the new test file **with coverage** and the `--slow` flag to include network tests. The new data source file must achieve @@ -1737,7 +1755,7 @@ include network tests. The new data source file must achieve ```bash uv run python -m pytest test/data/test_.py -v \ - --slow --timeout=300 \ + --slow \ --cov=earth2studio/data/ \ --cov-report=term-missing \ --cov-fail-under=90 @@ -1759,7 +1777,7 @@ cover the missing lines. Common gaps: Re-run until coverage is at or above 90%. -### 13c. Run the full data test suite (optional but recommended) +### 13d. Run the full data test suite (optional but recommended) ```bash make pytest TOX_ENV=test-data diff --git a/CHANGELOG.md b/CHANGELOG.md index 02635595d..4404fe46b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added AMSU-A (channels 1–14) and AVHRR channel variables to `E2STUDIO_VOCAB` - Added JPSS ATMS Level 1 BUFR brightness-temperature data source (`JPSS_ATMS`) - Added `JPSSATMSLexicon` for ATMS variable mappings +- Added JPSS CrIS FSR Level 1 spectral radiance data source (`JPSS_CRIS`) +- Added `JPSSCrISLexicon` for CrIS FSR variable mappings ### Changed diff --git a/docs/modules/datasources_dataframe.rst b/docs/modules/datasources_dataframe.rst index 7630255d2..d73f88f65 100644 --- a/docs/modules/datasources_dataframe.rst +++ b/docs/modules/datasources_dataframe.rst @@ -22,6 +22,7 @@ Data sources that provide tabular data as DataFrames. data.ISD data.JPSS_ATMS + data.JPSS_CRIS data.MetOpAMSUA data.MetOpAVHRR data.MetOpMHS diff --git a/earth2studio/data/__init__.py b/earth2studio/data/__init__.py index 76adbf64e..709116910 100644 --- a/earth2studio/data/__init__.py +++ b/earth2studio/data/__init__.py @@ -30,6 +30,7 @@ from .isd import ISD from .jpss import JPSS from .jpss_atms import JPSS_ATMS +from .jpss_cris import JPSS_CRIS from .metop_amsua import MetOpAMSUA from .metop_avhrr import MetOpAVHRR from .metop_mhs import MetOpMHS diff --git a/earth2studio/data/jpss_cris.py b/earth2studio/data/jpss_cris.py new file mode 100644 index 000000000..5428bf2e8 --- /dev/null +++ b/earth2studio/data/jpss_cris.py @@ -0,0 +1,1292 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import asyncio +import hashlib +import os +import pathlib +import shutil +import uuid +from collections.abc import Callable +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Any + +import h5py +import nest_asyncio +import numpy as np +import pandas as pd +import pyarrow as pa +import s3fs +from loguru import logger + +from earth2studio.data.utils import ( + datasource_cache_root, + gather_with_concurrency, + prep_data_inputs, +) +from earth2studio.lexicon.base import E2STUDIO_SCHEMA +from earth2studio.lexicon.jpss import JPSSCrISLexicon +from earth2studio.utils.time import TimeTolerance, normalize_time_tolerance +from earth2studio.utils.type import TimeArray, VariableArray + +# --------------------------------------------------------------------------- +# NOAA CrIS S3 bucket layout +# s3://noaa-nesdis-n20-pds/CrIS-FS-SDR///
/SCRIF_*.h5 +# s3://noaa-nesdis-n20-pds/CrIS-SDR-GEO///
/GCRSO_*.h5 +# --------------------------------------------------------------------------- + +# S3 bucket per satellite short-name +_SAT_BUCKET_MAP: dict[str, str] = { + "n20": "noaa-nesdis-n20-pds", + "n21": "noaa-nesdis-n21-pds", + "npp": "noaa-nesdis-snpp-pds", +} + +# Platform identifier in filenames +_SAT_PLATFORM_MAP: dict[str, str] = { + "n20": "j01", + "n21": "j02", + "npp": "npp", +} + +# Reverse mapping: platform code -> satellite short-name +_PLATFORM_SAT_MAP: dict[str, str] = {v: k for k, v in _SAT_PLATFORM_MAP.items()} + +# Earliest date with CrIS FSR data on S3 per satellite +_SAT_START_DATE: dict[str, datetime] = { + "npp": datetime(2023, 9, 6), + "n20": datetime(2023, 9, 6), + "n21": datetime(2023, 9, 6), +} + +# --------------------------------------------------------------------------- +# CrIS instrument constants +# --------------------------------------------------------------------------- +_CRIS_NUM_CHANNELS_LW: int = 717 +_CRIS_NUM_CHANNELS_MW: int = 869 +_CRIS_NUM_CHANNELS_SW: int = 637 +_CRIS_NUM_CHANNELS: int = ( + _CRIS_NUM_CHANNELS_LW + _CRIS_NUM_CHANNELS_MW + _CRIS_NUM_CHANNELS_SW +) # 2223 + +_CRIS_NUM_FOR: int = 30 # Fields of Regard per scan line +_CRIS_NUM_FOV: int = 9 # Fields of View per FOR (3x3 detector array) + +# GSI / CRTM sensor_chan numbering for CrIS FSR. +# https://www.star.nesdis.noaa.gov/jpss/documents/UserGuides/CrIS_SDR_Users_Guide1p1_20180405.pdf +# (Table 4, LWIR, MWIR FSR, SWIR FSR) +# +# The physical instrument has 2223 channels (717 + 869 + 637) but the CRTM +# SpcCoeff defines only 2211 *science* channels (713 + 865 + 633). Each band +# has 2 guard channels at the low-wavenumber end and 2 at the high-wavenumber +# end (4 total per band, 12 overall) that are excluded from CRTM and never +# appear in GSI diagnostic files. GSI uses contiguous 1-based numbering +# across the three science bands: +# +# Band | JPSS 0-based | sensor_chan | Wavenumber (cm^-1) +# -------+--------------+--------------+-------------------- +# (guard)| 0 .. 1 | 0 (sentinel) | 648.75 -- 649.375 +# LWIR | 2 .. 714 | 1 .. 713 | 650.0 -- 1095.0 +# (guard)| 715 .. 716 | 0 (sentinel) | 1095.625 -- 1096.25 +# (guard)| 717 .. 718 | 0 (sentinel) | 1208.75 -- 1209.375 +# MWIR | 719 .. 1583 | 714 .. 1578 | 1210.0 -- 1750.0 +# (guard)| 1584 .. 1585 | 0 (sentinel) | 1750.625 -- 1751.25 +# (guard)| 1586 .. 1587 | 0 (sentinel) | 2153.75 -- 2154.375 +# SWIR | 1588 .. 2220 | 1579 .. 2211 | 2155.0 -- 2550.0 +# (guard)| 2221 .. 2222 | 0 (sentinel) | 2550.625 -- 2551.25 +# + +# Guard channels are assigned sensor_chan 0 (sentinel for "not in GSI/CRTM"). +_CRIS_NUM_SCIENCE_LW: int = 713 # CRTM science channels per band +_CRIS_NUM_SCIENCE_MW: int = 865 +_CRIS_NUM_SCIENCE_SW: int = 633 +_CRIS_NUM_GUARD_LO: int = 2 # guard channels at low-wavenumber end of each band +_CRIS_NUM_GUARD_HI: int = 2 # guard channels at high-wavenumber end of each band + +# Full (unapodized) sensor_chan mapping — 2223 elements including guard channels. +_CRIS_GSI_SENSOR_CHAN: np.ndarray = np.zeros( + _CRIS_NUM_CHANNELS_LW + _CRIS_NUM_CHANNELS_MW + _CRIS_NUM_CHANNELS_SW, + dtype=np.uint16, +) +# LWIR: skip 2 low guards, 713 science channels → 1..713, 2 high guards → 0 +_CRIS_GSI_SENSOR_CHAN[ + _CRIS_NUM_GUARD_LO : _CRIS_NUM_GUARD_LO + _CRIS_NUM_SCIENCE_LW +] = np.arange(1, _CRIS_NUM_SCIENCE_LW + 1, dtype=np.uint16) +# MWIR: skip 2 low guards, 865 science channels → 714..1578, 2 high guards → 0 +_mw_start = _CRIS_NUM_CHANNELS_LW # physical index where MWIR begins +_CRIS_GSI_SENSOR_CHAN[ + _mw_start + + _CRIS_NUM_GUARD_LO : _mw_start + + _CRIS_NUM_GUARD_LO + + _CRIS_NUM_SCIENCE_MW +] = np.arange( + _CRIS_NUM_SCIENCE_LW + 1, + _CRIS_NUM_SCIENCE_LW + _CRIS_NUM_SCIENCE_MW + 1, + dtype=np.uint16, +) +# SWIR: skip 2 low guards, 633 science channels → 1579..2211, 2 high guards → 0 +_sw_start = _CRIS_NUM_CHANNELS_LW + _CRIS_NUM_CHANNELS_MW +_CRIS_GSI_SENSOR_CHAN[ + _sw_start + + _CRIS_NUM_GUARD_LO : _sw_start + + _CRIS_NUM_GUARD_LO + + _CRIS_NUM_SCIENCE_SW +] = np.arange( + _CRIS_NUM_SCIENCE_LW + _CRIS_NUM_SCIENCE_MW + 1, + _CRIS_NUM_SCIENCE_LW + _CRIS_NUM_SCIENCE_MW + _CRIS_NUM_SCIENCE_SW + 1, + dtype=np.uint16, +) + +# Apodized sensor_chan mapping — 2211 science channels only (guard trimmed). +_CRIS_GSI_SENSOR_CHAN_APOD: np.ndarray = np.arange( + 1, + _CRIS_NUM_SCIENCE_LW + _CRIS_NUM_SCIENCE_MW + _CRIS_NUM_SCIENCE_SW + 1, + dtype=np.uint16, +) + +# CrIS wavenumber grid for each channel (cm^-1). +# All three bands use a uniform 0.625 cm^-1 spacing. +# The SDR stores 2 guard channels at the low-wavenumber end of each band, +# so the physical grid starts 2 * 0.625 = 1.25 cm^-1 below the science start. +_CRIS_WAVENUMBER: np.ndarray = np.concatenate( + [ + 648.75 + 0.625 * np.arange(_CRIS_NUM_CHANNELS_LW), # LWIR + 1208.75 + 0.625 * np.arange(_CRIS_NUM_CHANNELS_MW), # MWIR + 2153.75 + 0.625 * np.arange(_CRIS_NUM_CHANNELS_SW), # SWIR + ] +).astype(np.float64) + +# Apodized (science-only) wavenumber grid — guard channels removed. +_CRIS_WAVENUMBER_APOD: np.ndarray = np.concatenate( + [ + 650.0 + 0.625 * np.arange(_CRIS_NUM_SCIENCE_LW), # LWIR science + 1210.0 + 0.625 * np.arange(_CRIS_NUM_SCIENCE_MW), # MWIR science + 2155.0 + 0.625 * np.arange(_CRIS_NUM_SCIENCE_SW), # SWIR science + ] +).astype(np.float64) + +# Planck radiation constants for converting spectral radiance (mW/(m^2 sr cm^-1)) +# to brightness temperature (K). +# c1 = 2 h c^2 in mW m^-2 sr^-1 (cm^-1)^-3 (i.e. mW/(m^2 sr cm^-4)) +# c2 = h c / k_B in cm K +_PLANCK_C1: float = 1.191042722543250e-5 # mW / (m^2 sr cm^-4) [CRTM SpcCoeff] +_PLANCK_C2: float = 1.438775246065195 # cm K [CRTM SpcCoeff] + +# --------------------------------------------------------------------------- +# Hamming apodization constants +# --------------------------------------------------------------------------- +# CrIS is a Fourier Transform Spectrometer. The SDR files store unapodized +# (sinc ILS) spectral radiance. NCEP applies Hamming apodization before +# encoding the data into BUFR for GSI assimilation, so UFS/GSI brightness +# temperatures correspond to apodized spectra. +_HAMMING_A0: float = 0.54 +_HAMMING_A1: float = 0.23 # symmetric: a_{-1} = a_{+1} + +# Band offsets into the concatenated 2223-channel array (for per-band apodization). +_BAND_SLICES: list[tuple[int, int, int]] = [ + (0, _CRIS_NUM_CHANNELS_LW, _CRIS_NUM_SCIENCE_LW), # LWIR + ( + _CRIS_NUM_CHANNELS_LW, + _CRIS_NUM_CHANNELS_LW + _CRIS_NUM_CHANNELS_MW, + _CRIS_NUM_SCIENCE_MW, + ), # MWIR + ( + _CRIS_NUM_CHANNELS_LW + _CRIS_NUM_CHANNELS_MW, + _CRIS_NUM_CHANNELS, + _CRIS_NUM_SCIENCE_SW, + ), # SWIR +] + + +def _radiance_to_bt( + radiance: np.ndarray, + wavenumber: np.ndarray = _CRIS_WAVENUMBER, +) -> np.ndarray: + """Convert spectral radiance to brightness temperature via inverse Planck. + + Parameters + ---------- + radiance : np.ndarray + Spectral radiance in mW/(m^2 sr cm^-1). Shape ``(n_fov, n_channels)`` + or ``(n_channels,)``. NaN values are preserved. + wavenumber : np.ndarray + Wavenumber grid in cm^-1, shape ``(n_channels,)``. + + Returns + ------- + np.ndarray + Brightness temperature in Kelvin, same shape as *radiance*. + """ + nu = wavenumber # (n_channels,) + # T_B = c2 * nu / ln(1 + c1 * nu^3 / L) + nu3 = nu * nu * nu + with np.errstate(divide="ignore", invalid="ignore"): + bt = _PLANCK_C2 * nu / np.log1p(_PLANCK_C1 * nu3 / radiance) + return bt + + +def _hamming_apodize(radiance: np.ndarray) -> np.ndarray: + """Apply Hamming apodization to unapodized CrIS spectral radiance. + + The 3-tap symmetric Hamming convolution kernel ``[0.23, 0.54, 0.23]`` + is the exact spectral-domain equivalent of multiplying the CrIS + interferogram by ``w(x) = 0.54 + 0.46 cos(pi x / L)``. It is applied + independently to each of the three bands (LWIR, MWIR, SWIR) in radiance + space. The 2 guard channels at each end of each band (4 per band, 12 + total) are trimmed after convolution, reducing the total from 2223 to + 2211 science channels. + + At band-edge channels (first/last of each band) the filter would need a + neighbour outside the band. We use symmetric (reflect) padding so that + the edge value is replicated, which is consistent with the interferogram + being even-symmetric. + + References + ---------- + + - https://www.star.nesdis.noaa.gov/jpss/documents/UserGuides/CrIS_SDR_Users_Guide1p1_20180405.pdf + - https://www.star.nesdis.noaa.gov/jpss/documents/ATBD/D0001-M01-S01-002_JPSS_ATBD_CRIS-SDR_nsr_20180614.pdf + - https://www-cdn.eumetsat.int/files/2022-11/12%20-%20Tobin%20-%20CrIS_spectral_20221019.pdf + + """ + squeeze = radiance.ndim == 1 + if squeeze: + radiance = radiance[np.newaxis, :] + + out_parts: list[np.ndarray] = [] + for band_start, band_end, n_science in _BAND_SLICES: + band = radiance[:, band_start:band_end] # (n_fov, n_band) + # Pad one sample on each side using reflect (symmetric boundary) + padded = np.pad(band, ((0, 0), (1, 1)), mode="reflect") + # 3-tap Hamming convolution + apod = ( + _HAMMING_A1 * padded[:, :-2] + + _HAMMING_A0 * padded[:, 1:-1] + + _HAMMING_A1 * padded[:, 2:] + ) + # Trim guard channels: 2 at low end + 2 at high end → keep n_science + out_parts.append(apod[:, _CRIS_NUM_GUARD_LO : _CRIS_NUM_GUARD_LO + n_science]) + + result = np.concatenate(out_parts, axis=1) + if squeeze: + return result[0] + return result + + +# HDF5 dataset paths in SDR files (CrIS-FS-SDR) +_SDR_GROUP = "All_Data/CrIS-FS-SDR_All" +_SDR_RADIANCE_KEYS = { + "LW": f"{_SDR_GROUP}/ES_RealLW", # shape (n_scan, 30, 9, 717) + "MW": f"{_SDR_GROUP}/ES_RealMW", # shape (n_scan, 30, 9, 869) + "SW": f"{_SDR_GROUP}/ES_RealSW", # shape (n_scan, 30, 9, 637) +} + +# Quality flag datasets +_SDR_QF_KEYS = { + "QF1": f"{_SDR_GROUP}/QF1_SCAN_CRISSDR", # shape (n_scan,) + "QF2": f"{_SDR_GROUP}/QF2_CRISSDR", # shape (n_scan, 9, 3) + "QF3": f"{_SDR_GROUP}/QF3_CRISSDR", # shape (n_scan, 30, 9, 3) +} + +# HDF5 dataset paths in GEO files (CrIS-SDR-GEO) +_GEO_GROUP = "All_Data/CrIS-SDR-GEO_All" +_GEO_KEYS = { + "lat": f"{_GEO_GROUP}/Latitude", # shape (n_scan, 30, 9) + "lon": f"{_GEO_GROUP}/Longitude", # shape (n_scan, 30, 9) + "height": f"{_GEO_GROUP}/Height", # shape (n_scan, 30, 9) + "sat_za": f"{_GEO_GROUP}/SatelliteZenithAngle", # shape (n_scan, 30, 9) + "sat_aza": f"{_GEO_GROUP}/SatelliteAzimuthAngle", # shape (n_scan, 30, 9) + "sol_za": f"{_GEO_GROUP}/SolarZenithAngle", # shape (n_scan, 30, 9) + "sol_aza": f"{_GEO_GROUP}/SolarAzimuthAngle", # shape (n_scan, 30, 9) + "for_time": f"{_GEO_GROUP}/FORTime", # shape (n_scan, 30) IET µs +} + + +@dataclass +class _CrISAsyncTask: + """Metadata for a single CrIS granule download task (SDR + GEO pair).""" + + sdr_uri: str + geo_uri: str + datetime_min: datetime + datetime_max: datetime + satellite: str + variable: str + modifier: Callable[[Any], Any] + + +@dataclass +class _CrISDecodedGranule: + """Compact decoded data from a single CrIS granule.""" + + lat: np.ndarray # (n_valid,) float32 + lon: np.ndarray # (n_valid,) float32 + sat_za: np.ndarray # (n_valid,) float32 + sat_aza: np.ndarray # (n_valid,) float32 + sol_za: np.ndarray # (n_valid,) float32 + sol_aza: np.ndarray # (n_valid,) float32 + quality: np.ndarray # (n_valid,) uint16 + times: np.ndarray # (n_valid,) datetime64[ms] + radiance: np.ndarray # (n_valid, n_channels) float32 (brightness temp K) + satellite: str + variable: str + + +class JPSS_CRIS: + """JPSS CrIS (Cross-track Infrared Sounder) Full Spectral Resolution (FSR) + Level 1 brightness temperature observations served from NOAA Open Data on + AWS. + + Raw spectral radiance from the HDF5 SDR files is converted to brightness + temperature (K) via the inverse Planck function so that the ``observation`` + column is directly comparable with :class:`~earth2studio.data.UFSObsSat`. + + By default, Hamming apodization is applied to the unapodized (sinc ILS) + radiance before the Planck inversion. This matches the processing used by + NCEP/GSI (which receives Hamming-apodized CrIS radiance via BUFR) and + produces brightness temperatures consistent with + :class:`~earth2studio.data.UFSObsSat`. The 2 guard channels at each end + of each band (4 per band, 12 total) are trimmed during apodization, + yielding 2211 science channels. Set ``apodize=False`` to retain the full + 2223 unapodized channels (including 12 guard channels with + ``channel_index=0``). + + Each HDF5 granule contains a small number of scan lines, each with 30 + Fields of Regard (FOR) and 9 Fields of View (FOV) per FOR (3x3 detector + array). In FSR mode the instrument produces 2223 spectral channels: + + - **LWIR** (9.14--15.38 µm, 650--1095 cm^-1): 717 channels at 0.625 cm^-1 + - **MWIR** (5.71--8.26 µm, 1210--1750 cm^-1): 869 channels at 0.625 cm^-1 + - **SWIR** (3.92--4.64 µm, 2155--2550 cm^-1): 637 channels at 0.625 cm^-1 + + When ``apodize=True`` (default), guard channels are trimmed and the output + has 2211 channels with contiguous ``channel_index`` 1--2211. + + When ``apodize=False``, the returned :class:`~pandas.DataFrame` has one row + per FOV per channel including guard channels. The ``channel_index`` column + uses the GSI ``sensor_chan`` numbering convention: + + - **LWIR** channels 0--1 (0-based) → sensor_chan 0 (guard; not in GSI) + - **LWIR** channels 2--714 (0-based) → sensor_chan 1--713 + - **LWIR** channels 715--716 (0-based) → sensor_chan 0 (guard; not in GSI) + - **MWIR** channels 717--718 (0-based) → sensor_chan 0 (guard; not in GSI) + - **MWIR** channels 719--1583 (0-based) → sensor_chan 714--1578 + - **MWIR** channels 1584--1585 (0-based) → sensor_chan 0 (guard; not in GSI) + - **SWIR** channels 1586--1587 (0-based) → sensor_chan 0 (guard; not in GSI) + - **SWIR** channels 1588--2220 (0-based) → sensor_chan 1579--2211 + - **SWIR** channels 2221--2222 (0-based) → sensor_chan 0 (guard; not in GSI) + + Data is stored as paired HDF5 files on S3: + + - **SDR** (``SCRIF_*.h5``): spectral radiance arrays + - **GEO** (``GCRSO_*.h5``): geolocation (lat, lon, angles, time) + + Parameters + ---------- + satellites : list[str] | None, optional + Satellite short-names to query. Valid values are ``"n20"`` + (NOAA-20), ``"n21"`` (NOAA-21), and ``"npp"`` (Suomi NPP). + By default ``None``, which queries all valid satellites. + subsample : int, optional + Temporal sub-sampling factor applied at the granule level. Each + CrIS granule covers roughly 32 seconds of observations; setting + ``subsample=N`` selects every *N*-th granule from the time-ordered + list, reducing data volume by approximately that factor. By + default 1 (no sub-sampling). + apodize : bool, optional + Apply Hamming apodization to the unapodized SDR radiance before + converting to brightness temperature. When ``True`` (default), + the 3-tap Hamming kernel ``[0.23, 0.54, 0.23]`` is convolved + per-band in radiance space and the 2 guard channels at each + end of each band are trimmed, yielding 2211 science + channels that are directly comparable with + :class:`~earth2studio.data.UFSObsSat`. Set to ``False`` to + retain the unapodized spectra with all 2223 channels (including + 12 guard channels with ``channel_index=0``). + + .. note:: + + The spectral-domain 3-tap kernel matches interferogram-domain + Hamming for smooth spectral regions (window channels, + ``sensor_chan`` ≥ 200: agreement < 0.5 K with UFS/GSI). + Channels on sharp CO₂ Q-branch features near 667 cm⁻¹ and + 720 cm⁻¹ may show residuals of 5–20 K because the 3-tap + discrete convolution does not fully replicate the + interferogram-domain apodization applied by NESDIS upstream + of GSI. + time_tolerance : TimeTolerance, optional + Time tolerance window for filtering observations. Accepts a single value + (symmetric +/- window) or a tuple (lower, upper) for asymmetric windows, + by default, np.timedelta64(10, 'm'). + cache : bool, optional + Cache downloaded HDF5 files locally, by default True + verbose : bool, optional + Show download progress bars, by default True + async_timeout : int, optional + Total timeout in seconds for the async fetch, by default 600 + max_workers : int, optional + Maximum number of concurrent S3 fetch tasks, by default 24 + retries : int, optional + Per-file retry count on transient I/O failures, by default 3 + + Warning + ------- + This is a remote data source and can potentially download a large amount + of data to your local machine for large requests. Each CrIS granule pair + (SDR + GEO) is approximately 16 MB. + + Note + ---- + Additional information on the data repository: + + - https://registry.opendata.aws/noaa-jpss/ + - https://www.star.nesdis.noaa.gov/jpss/CrIS.php + - https://www.star.nesdis.noaa.gov/jpss/documents/UserGuides/CrIS_SDR_Users_Guide1p1_20180405.pdf + + Badges + ------ + region:global dataclass:observation product:sat + """ + + SOURCE_ID = "earth2studio.data.JPSS_CRIS" + VALID_SATELLITES = frozenset(["n20", "n21", "npp"]) + + SCHEMA = pa.schema( + [ + E2STUDIO_SCHEMA.field("time"), + E2STUDIO_SCHEMA.field("class"), + E2STUDIO_SCHEMA.field("lat"), + E2STUDIO_SCHEMA.field("lon"), + pa.field( + "scan_angle", + pa.float32(), + nullable=True, + metadata={"description": "SatelliteZenithAngle from CrIS GEO file"}, + ), + E2STUDIO_SCHEMA.field("channel_index"), + E2STUDIO_SCHEMA.field("solza"), + E2STUDIO_SCHEMA.field("solaza"), + E2STUDIO_SCHEMA.field("satellite_za"), + E2STUDIO_SCHEMA.field("satellite_aza"), + E2STUDIO_SCHEMA.field("quality"), + pa.field("satellite", pa.string()), + pa.field("observation", pa.float32()), + pa.field("variable", pa.string()), + ] + ) + + def __init__( + self, + satellites: list[str] | None = None, + subsample: int = 1, + apodize: bool = True, + time_tolerance: TimeTolerance = np.timedelta64(10, "m"), + cache: bool = True, + verbose: bool = True, + async_timeout: int = 600, + max_workers: int = 24, + retries: int = 3, + ) -> None: + if satellites is None: + satellites = list(self.VALID_SATELLITES) + else: + invalid = set(satellites) - self.VALID_SATELLITES + if invalid: + raise ValueError( + f"Invalid satellite(s): {invalid}. " + f"Valid options: {sorted(self.VALID_SATELLITES)}" + ) + self._satellites = satellites + self._subsample = max(1, int(subsample)) + self._apodize = apodize + self._cache = cache + self._verbose = verbose + self._max_workers = max_workers + self._retries = retries + self.async_timeout = async_timeout + self._tmp_cache_hash: str | None = None + + try: + nest_asyncio.apply() + loop = asyncio.get_running_loop() + loop.run_until_complete(self._async_init()) + except RuntimeError: + self.fs: s3fs.S3FileSystem | None = None + + lower, upper = normalize_time_tolerance(time_tolerance) + self._tolerance_lower = pd.to_timedelta(lower).to_pytimedelta() + self._tolerance_upper = pd.to_timedelta(upper).to_pytimedelta() + + # ------------------------------------------------------------------ + # Async initialisation + # ------------------------------------------------------------------ + async def _async_init(self) -> None: + """Initialise the async S3 filesystem.""" + self.fs = s3fs.S3FileSystem( + anon=True, + client_kwargs={}, + asynchronous=True, + skip_instance_cache=True, + ) + + def __call__( + self, + time: datetime | list[datetime] | TimeArray, + variable: str | list[str] | VariableArray, + fields: str | list[str] | pa.Schema | None = None, + ) -> pd.DataFrame: + """Fetch CrIS FSR brightness temperature observations. + + Parameters + ---------- + time : datetime | list[datetime] | TimeArray + Timestamps to return data for (UTC). + variable : str | list[str] | VariableArray + Variable names to return (e.g. ``["crisfsr"]``). + fields : str | list[str] | pa.Schema | None, optional + Subset of schema fields to include, by default None (all). + + Returns + ------- + pd.DataFrame + Long-format DataFrame with one row per FOV per channel. + """ + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + if self.fs is None: + loop.run_until_complete(self._async_init()) + + try: + df = loop.run_until_complete( + asyncio.wait_for( + self.fetch(time, variable, fields), timeout=self.async_timeout + ) + ) + finally: + if not self._cache: + shutil.rmtree(self.cache, ignore_errors=True) + + return df + + async def fetch( + self, + time: datetime | list[datetime] | TimeArray, + variable: str | list[str] | VariableArray, + fields: str | list[str] | pa.Schema | None = None, + ) -> pd.DataFrame: + """Async implementation of the data fetch. + + Parameters + ---------- + time : datetime | list[datetime] | TimeArray + Timestamps to return data for (UTC). + variable : str | list[str] | VariableArray + Variable names to return. + fields : str | list[str] | pa.Schema | None, optional + Subset of schema fields to include, by default None. + + Returns + ------- + pd.DataFrame + Long-format DataFrame. + """ + if self.fs is None: + raise ValueError( + "File store is not initialised! If calling this function " + "directly, make sure the data source is initialised inside " + "the async loop!" + ) + + session = await self.fs.set_session(refresh=True) + + time_list, variable_list = prep_data_inputs(time, variable) + schema = self.resolve_fields(fields) + self._validate_time(time_list) + pathlib.Path(self.cache).mkdir(parents=True, exist_ok=True) + + # Validate variables + for v in variable_list: + try: + JPSSCrISLexicon[v] # type: ignore + except KeyError: + logger.error(f"Variable id {v} not found in JPSS CrIS lexicon") + raise + + # Discover and download HDF5 file pairs within tolerance windows + tasks = await self._create_tasks(time_list, variable_list) + + # Deduplicate by S3 URI (both SDR and GEO) + uri_set: set[str] = set() + for t in tasks: + uri_set.add(t.sdr_uri) + uri_set.add(t.geo_uri) + + fetch_jobs = [self._fetch_remote_file(uri) for uri in uri_set] + await gather_with_concurrency( + fetch_jobs, + max_workers=self._max_workers, + desc="Fetching CrIS HDF5 files", + verbose=(not self._verbose), + ) + + if session: + await session.close() + + # Decode and compile + df = self._compile_dataframe(tasks, schema) + return df + + async def _create_tasks( + self, + time_list: list[datetime], + variable_list: list[str], + ) -> list[_CrISAsyncTask]: + """Build download tasks by listing the S3 day-directory. + + For each requested time +/- tolerance we list the relevant day + directories on each satellite bucket and select SDR files whose + embedded start-timestamp falls within the tolerance window. The + corresponding GEO file is paired by matching the common filename + fields (platform, date, start time, end time, orbit). + + SDR and GEO directory listings are issued concurrently. + """ + tasks: list[_CrISAsyncTask] = [] + + for v in variable_list: + _, modifier = JPSSCrISLexicon[v] # type: ignore + + for sat in self._satellites: + bucket = _SAT_BUCKET_MAP[sat] + + for t in time_list: + tmin = t + self._tolerance_lower + tmax = t + self._tolerance_upper + + # Iterate over calendar days covered by the window + day = tmin.replace(hour=0, minute=0, second=0, microsecond=0) + end_day = tmax.replace(hour=0, minute=0, second=0, microsecond=0) + + while day <= end_day: + sdr_prefix = ( + f"{bucket}/CrIS-FS-SDR/" + f"{day.year:04d}/{day.month:02d}/{day.day:02d}/" + ) + geo_prefix = ( + f"{bucket}/CrIS-SDR-GEO/" + f"{day.year:04d}/{day.month:02d}/{day.day:02d}/" + ) + + # Issue both listings concurrently + sdr_coro = self.fs._ls(sdr_prefix, detail=False) # type: ignore[union-attr] + geo_coro = self.fs._ls(geo_prefix, detail=False) # type: ignore[union-attr] + + sdr_listing: list[str] = [] + geo_listing: list[str] = [] + try: + sdr_listing, geo_listing = await asyncio.gather( + sdr_coro, geo_coro + ) + except FileNotFoundError: + # One or both directories missing — try individually + try: + sdr_listing = await self.fs._ls( # type: ignore[union-attr] + sdr_prefix, detail=False + ) + except FileNotFoundError: + logger.warning(f"No CrIS data at s3://{sdr_prefix}") + try: + geo_listing = await self.fs._ls( # type: ignore[union-attr] + geo_prefix, detail=False + ) + except FileNotFoundError: + logger.warning(f"No CrIS GEO data at s3://{geo_prefix}") + + if not sdr_listing: + day += timedelta(days=1) + continue + + # Build GEO lookup keyed by granule key + geo_lookup: dict[str, str] = {} + for gpath in geo_listing: + gname = gpath.rsplit("/", 1)[-1] + if gname.startswith("GCRSO_"): + geo_lookup[self._granule_key(gname)] = gpath + + for path in sdr_listing: + fname = path.rsplit("/", 1)[-1] + if not fname.startswith("SCRIF_"): + continue + + file_time = self._parse_filename_time(fname) + if file_time is None: + continue + if tmin <= file_time <= tmax: + sdr_key = self._granule_key(fname) + if sdr_key not in geo_lookup: + logger.warning(f"No matching GEO file for {fname}") + continue + tasks.append( + _CrISAsyncTask( + sdr_uri=f"s3://{path}", + geo_uri=f"s3://{geo_lookup[sdr_key]}", + datetime_min=tmin, + datetime_max=tmax, + satellite=sat, + variable=v, + modifier=modifier, + ) + ) + + day += timedelta(days=1) + + return tasks + + # ------------------------------------------------------------------ + # Download helpers + # ------------------------------------------------------------------ + async def _fetch_remote_file(self, s3_uri: str) -> None: + """Download a single HDF5 file to local cache (with retry).""" + local_path = self._cache_path(s3_uri) + if pathlib.Path(local_path).is_file(): + return + + last_exc: Exception | None = None + for attempt in range(1, self._retries + 1): + try: + data = await self.fs._cat_file(s3_uri.replace("s3://", "", 1)) # type: ignore[union-attr] + with open(local_path, "wb") as fh: + fh.write(data) + return + except (OSError, TimeoutError, ConnectionError) as exc: + last_exc = exc + if attempt < self._retries: + await asyncio.sleep(2 ** (attempt - 1)) + + logger.warning(f"Failed to fetch {s3_uri} after {self._retries} retries") + if last_exc is not None: + raise last_exc + + # ------------------------------------------------------------------ + # HDF5 decoding & DataFrame compilation + # ------------------------------------------------------------------ + def _compile_dataframe( + self, + tasks: list[_CrISAsyncTask], + schema: pa.Schema, + ) -> pd.DataFrame: + """Decode cached HDF5 files and assemble the output DataFrame. + + HDF5 decoding is parallelised across threads. Both h5py I/O and + numpy compute release the GIL, so threads give effective speedup + without the serialisation overhead of multiprocessing. + + Each granule is decoded into a compact :class:`_CrISDecodedGranule` + (spatial arrays + 2-D radiance matrix). The expensive expansion + to long-format (one row per channel per FOV) is done once at the + end over all granules combined, avoiding intermediate DataFrame + allocation per granule. + """ + + def _decode_one(task: _CrISAsyncTask) -> _CrISDecodedGranule | None: + sdr_path = self._cache_path(task.sdr_uri) + geo_path = self._cache_path(task.geo_uri) + + if not pathlib.Path(sdr_path).is_file(): + logger.warning(f"Cached SDR file missing for {task.sdr_uri}") + return None + if not pathlib.Path(geo_path).is_file(): + logger.warning(f"Cached GEO file missing for {task.geo_uri}") + return None + + try: + return self._decode_hdf5(sdr_path, geo_path, task) + except Exception: + logger.warning(f"Failed to decode {task.sdr_uri}", exc_info=True) + return None + + # Sub-sample granules (temporal sub-sampling) before decoding + if self._subsample > 1: + tasks = tasks[:: self._subsample] + + # Decode all granules in parallel using threads + n_workers = min(len(tasks), self._max_workers) if tasks else 1 + with ThreadPoolExecutor(max_workers=n_workers) as pool: + results = list(pool.map(_decode_one, tasks)) + + granules = [g for g in results if g is not None] + + if not granules: + return pd.DataFrame(columns=schema.names) + + # --- Batch: concatenate compact spatial arrays across granules --- + all_lat = np.concatenate([g.lat for g in granules]) + all_lon = np.concatenate([g.lon for g in granules]) + all_sat_za = np.concatenate([g.sat_za for g in granules]) + all_sat_aza = np.concatenate([g.sat_aza for g in granules]) + all_sol_za = np.concatenate([g.sol_za for g in granules]) + all_sol_aza = np.concatenate([g.sol_aza for g in granules]) + all_quality = np.concatenate([g.quality for g in granules]) + all_times = np.concatenate([g.times for g in granules]) + all_radiance = np.concatenate([g.radiance for g in granules]) # (N, n_channels) + + n_total = len(all_lat) + n_channels = all_radiance.shape[1] + + # Build satellite/variable arrays for dedup support + # (each granule may have different satellite) + sat_pieces = [np.broadcast_to(g.satellite, g.lat.shape[0]) for g in granules] + var_pieces = [np.broadcast_to(g.variable, g.lat.shape[0]) for g in granules] + all_sat = np.concatenate(sat_pieces) + all_var = np.concatenate(var_pieces) + + # Free granule list — data is now in the concatenated arrays + del granules + + # --- Deduplicate at spatial level (before channel expansion) --- + # This really isnt needed, its more of a safety net + time_as_i8 = all_times.view(np.int64) + lat_i = (all_lat * 100).astype(np.int32) + lon_i = (all_lon * 100).astype(np.int32) + + # Encode satellite strings as integer codes for lexsort + unique_sats, sat_codes = np.unique(all_sat, return_inverse=True) + sat_i = sat_codes.astype(np.int32) + + order = np.lexsort((lon_i, lat_i, sat_i, time_as_i8)) + sorted_t = time_as_i8[order] + sorted_lat = lat_i[order] + sorted_lon = lon_i[order] + sorted_sat = sat_i[order] + diffs = ( + (sorted_t[1:] != sorted_t[:-1]) + | (sorted_lat[1:] != sorted_lat[:-1]) + | (sorted_lon[1:] != sorted_lon[:-1]) + | (sorted_sat[1:] != sorted_sat[:-1]) + ) + unique_mask: np.ndarray = np.empty(n_total, dtype=bool) + unique_mask[0] = True + unique_mask[1:] = diffs + keep_idx = order[unique_mask] + keep_idx.sort() # preserve original order + + if len(keep_idx) < n_total: + all_lat = all_lat[keep_idx] + all_lon = all_lon[keep_idx] + all_sat_za = all_sat_za[keep_idx] + all_sat_aza = all_sat_aza[keep_idx] + all_sol_za = all_sol_za[keep_idx] + all_sol_aza = all_sol_aza[keep_idx] + all_quality = all_quality[keep_idx] + all_times = all_times[keep_idx] + all_radiance = all_radiance[keep_idx] + all_sat = all_sat[keep_idx] + all_var = all_var[keep_idx] + n_total = len(keep_idx) + + # --- Expand to long-format using PyArrow for efficiency --- + n_rows = n_total * n_channels + + # Select the correct sensor_chan mapping based on apodization setting + sensor_chan = ( + _CRIS_GSI_SENSOR_CHAN_APOD if self._apodize else _CRIS_GSI_SENSOR_CHAN + ) + + arrs: dict[str, pa.Array] = { + "time": pa.array(np.repeat(all_times, n_channels)), + "class": pa.DictionaryArray.from_arrays( + np.zeros(n_rows, dtype=np.int8), ["rad"] + ), + "lat": pa.array(np.repeat(all_lat, n_channels), type=pa.float32()), + "lon": pa.array(np.repeat(all_lon, n_channels), type=pa.float32()), + "scan_angle": pa.array( + np.repeat(all_sat_za, n_channels), type=pa.float32() + ), + "channel_index": pa.array( + np.tile(sensor_chan, n_total), + type=pa.uint16(), + ), + "solza": pa.array(np.repeat(all_sol_za, n_channels), type=pa.float32()), + "solaza": pa.array(np.repeat(all_sol_aza, n_channels), type=pa.float32()), + "satellite_za": pa.array( + np.repeat(all_sat_za, n_channels), type=pa.float32() + ), + "satellite_aza": pa.array( + np.repeat(all_sat_aza, n_channels), type=pa.float32() + ), + "quality": pa.array(np.repeat(all_quality, n_channels), type=pa.uint16()), + "observation": pa.array(all_radiance.ravel(), type=pa.float32()), + } + + # Build satellite and variable using DictionaryArray for memory + unique_sats = list(dict.fromkeys(all_sat)) + sat_codes = {s: i for i, s in enumerate(unique_sats)} + sat_indices = np.repeat( + np.array([sat_codes[s] for s in all_sat], dtype=np.int8), n_channels + ) + arrs["satellite"] = pa.DictionaryArray.from_arrays(sat_indices, unique_sats) + + unique_vars = list(dict.fromkeys(all_var)) + var_codes = {v: i for i, v in enumerate(unique_vars)} + var_indices = np.repeat( + np.array([var_codes[v] for v in all_var], dtype=np.int8), n_channels + ) + arrs["variable"] = pa.DictionaryArray.from_arrays(var_indices, unique_vars) + + # Select only schema-requested columns + schema_names = [n for n in schema.names if n in arrs] + tbl = pa.table({n: arrs[n] for n in schema_names}) + result = tbl.to_pandas() + + result.attrs["source"] = self.SOURCE_ID + return result + + def _decode_hdf5( + self, + sdr_path: str, + geo_path: str, + task: _CrISAsyncTask, + ) -> _CrISDecodedGranule | None: + """Decode a CrIS SDR + GEO HDF5 file pair into compact arrays. + + Returns a :class:`_CrISDecodedGranule` containing spatial arrays + (one element per valid FOV) and a 2-D radiance matrix + ``(n_valid, n_channels)``. The expensive channel-expansion into + long-format rows is deferred to + :py:meth:`_compile_dataframe`. + + Scan lines whose FORTime falls entirely outside the tolerance + window are skipped before reading the (much larger) radiance + arrays. + + Parameters + ---------- + sdr_path : str + Local path to the CrIS SDR HDF5 file. + geo_path : str + Local path to the CrIS GEO HDF5 file. + task : _CrISAsyncTask + Task metadata (tolerance bounds, satellite, variable, modifier). + """ + # IET epoch for vectorized time conversion + iet_epoch = np.datetime64("1958-01-01T00:00:00", "us") + tmin_dt64 = np.datetime64(task.datetime_min, "ms") + tmax_dt64 = np.datetime64(task.datetime_max, "ms") + + # --- Phase 1: Read GEO time first (tiny) to filter scan lines --- + with h5py.File(geo_path, "r") as geo: + for_time = geo[_GEO_KEYS["for_time"]][:] # (n_scan, 30) IET µs + + # Convert FORTime to datetime64 for each (scan, FOR) + time_dt64 = (iet_epoch + for_time.astype("timedelta64[us]")).astype( + "datetime64[ms]" + ) + # A scan line is relevant if ANY FOR in that scan is within window + scan_has_valid_time = np.any( + (time_dt64 >= tmin_dt64) & (time_dt64 <= tmax_dt64) & (for_time > 0), + axis=1, + ) # (n_scan,) + + if not scan_has_valid_time.any(): + return None + + valid_scans = np.where(scan_has_valid_time)[0] + + # --- Phase 2: Read only the scan lines we need --- + with h5py.File(sdr_path, "r") as sdr, h5py.File(geo_path, "r") as geo: + # HDF5 fancy indexing with sorted indices (efficient for contiguous) + rad_lw = sdr[_SDR_RADIANCE_KEYS["LW"]][valid_scans] + rad_mw = sdr[_SDR_RADIANCE_KEYS["MW"]][valid_scans] + rad_sw = sdr[_SDR_RADIANCE_KEYS["SW"]][valid_scans] + + try: + qf3 = sdr[_SDR_QF_KEYS["QF3"]][valid_scans] + except KeyError: + qf3 = np.zeros( + (len(valid_scans), _CRIS_NUM_FOR, _CRIS_NUM_FOV, 3), + dtype=np.uint8, + ) + + lat = geo[_GEO_KEYS["lat"]][valid_scans] + lon = geo[_GEO_KEYS["lon"]][valid_scans] + sat_za = geo[_GEO_KEYS["sat_za"]][valid_scans] + sat_aza = geo[_GEO_KEYS["sat_aza"]][valid_scans] + sol_za = geo[_GEO_KEYS["sol_za"]][valid_scans] + sol_aza = geo[_GEO_KEYS["sol_aza"]][valid_scans] + + # Use the pre-read time but only for valid scans + for_time = for_time[valid_scans] + + n_scan = len(valid_scans) + + # Concatenate radiance: (n_scan, n_for, n_fov, n_channels) + radiance = np.concatenate([rad_lw, rad_mw, rad_sw], axis=-1) + del rad_lw, rad_mw, rad_sw # free memory early + n_for = radiance.shape[1] + n_fov = radiance.shape[2] + n_channels = radiance.shape[3] + n_spatial = n_scan * n_for * n_fov + + # Combine QF3 band flags into single uint16 for the quality column. + # These flags are exposed as metadata; filtering is left to the caller. + qf_combined = ( + qf3[:, :, :, 0].astype(np.uint16) + | (qf3[:, :, :, 1].astype(np.uint16) << 4) + | (qf3[:, :, :, 2].astype(np.uint16) << 8) + ) + del qf3 + + # Flatten spatial dims + lat_flat = lat.reshape(-1).astype(np.float32) + lon_flat = lon.reshape(-1).astype(np.float32) + + # Expand for_time to (n_scan, n_for, n_fov) + for_time_3d = np.broadcast_to( + for_time[:, :, np.newaxis], (n_scan, n_for, n_fov) + ) + time_flat = for_time_3d.reshape(-1) + + # Convert times for tolerance filtering + times_dt64 = (iet_epoch + time_flat.astype("timedelta64[us]")).astype( + "datetime64[ms]" + ) + + # Valid spatial mask: good lat/lon, positive time, AND within tolerance + valid_spatial = ( + (lat_flat >= -90.0) + & (lat_flat <= 90.0) + & (lon_flat >= -180.1) + & (lon_flat <= 360.1) + & (time_flat > 0) + & (times_dt64 >= tmin_dt64) + & (times_dt64 <= tmax_dt64) + ) + + if not valid_spatial.any(): + return None + + # Apply spatial mask + lat_valid = lat_flat[valid_spatial] + lon_valid = lon_flat[valid_spatial] % 360.0 + times_valid = times_dt64[valid_spatial] + + sat_za_valid = sat_za.reshape(-1)[valid_spatial].astype(np.float32) + sat_aza_valid = sat_aza.reshape(-1)[valid_spatial].astype(np.float32) + sol_za_valid = sol_za.reshape(-1)[valid_spatial].astype(np.float32) + sol_aza_valid = sol_aza.reshape(-1)[valid_spatial].astype(np.float32) + qf_valid = qf_combined.reshape(-1)[valid_spatial].astype(np.uint16) + + # Radiance: (n_valid, n_channels) + radiance_valid = radiance.reshape(n_spatial, n_channels)[valid_spatial] + del radiance # free the large array + + # Apply modifier + radiance_valid = task.modifier(radiance_valid).astype(np.float32) + + # Mask out fill values: set them to NaN (kept for now, could filter) + bad = (radiance_valid <= -1e6) | (radiance_valid >= 1e6) + if bad.any(): + radiance_valid = radiance_valid.copy() + radiance_valid[bad] = np.float32("nan") + + # Optional Hamming apodization: smooth the unapodized (sinc ILS) + # radiance to match the apodized spectra used by GSI/CRTM, then trim + # the 2 guard channels at each end of each band. + if self._apodize: + radiance_valid = _hamming_apodize(radiance_valid) + wn = _CRIS_WAVENUMBER_APOD + else: + wn = _CRIS_WAVENUMBER + + # Convert spectral radiance → brightness temperature (K) so that + # the observation column is in the same units as UFSObsSat. + radiance_valid = _radiance_to_bt(radiance_valid, wn).astype(np.float32) + + return _CrISDecodedGranule( + lat=lat_valid, + lon=lon_valid, + sat_za=sat_za_valid, + sat_aza=sat_aza_valid, + sol_za=sol_za_valid, + sol_aza=sol_aza_valid, + quality=qf_valid, + times=times_valid, + radiance=radiance_valid, + satellite=task.satellite, + variable=task.variable, + ) + + @staticmethod + def _parse_filename_time(filename: str) -> datetime | None: + """Extract the granule start time from a CrIS SDR filename. + + Expected pattern:: + + SCRIF_{platform}_d{YYYYMMDD}_t{HHMMSSF}_e{HHMMSSF}_b{orbit}_c{creation}_oebc_ops.h5 + + The ``t`` field encodes the start time as HHMMSS followed by a + tenths-of-second digit. + + Returns ``None`` if the filename does not match. + """ + parts = filename.split("_") + # Find the date part (dYYYYMMDD) and time part (tHHMMSSF) + date_str: str | None = None + time_str: str | None = None + + for part in parts: + if part.startswith("d") and len(part) == 9: + date_str = part[1:] # YYYYMMDD + elif part.startswith("t") and len(part) == 8: + time_str = part[1:7] # HHMMSS (ignore tenths digit) + + if date_str is None or time_str is None: + return None + + try: + return datetime.strptime(date_str + time_str, "%Y%m%d%H%M%S") + except ValueError: + return None + + @staticmethod + def _granule_key(filename: str) -> str: + """Extract the matching key from an SDR or GEO filename. + + Both SDR (SCRIF) and GEO (GCRSO) files share a common key + formed by the platform, date, start-time, end-time, and orbit + fields. The creation timestamp (``_c`` field) differs between + the two products, so it is excluded from the key. + + Parameters + ---------- + filename : str + Filename (basename only) of an SDR or GEO HDF5 file. + + Returns + ------- + str + Key string ``"{platform}_{date}_{start}_{end}_{orbit}"``. + """ + parts = filename.split("_") + # parts: [prefix, platform, d, t, e, b, ...] + if len(parts) >= 6: + return "_".join(parts[1:6]) # platform_d*_t*_e*_b* + return filename # fallback – should not happen for valid files + + @classmethod + def resolve_fields(cls, fields: str | list[str] | pa.Schema | None) -> pa.Schema: + """Convert *fields* parameter into a validated PyArrow schema. + + Parameters + ---------- + fields : str | list[str] | pa.Schema | None + Field specification. + + Returns + ------- + pa.Schema + """ + if fields is None: + return cls.SCHEMA + + if isinstance(fields, str): + fields = [fields] + + if isinstance(fields, pa.Schema): + for field in fields: + if field.name not in cls.SCHEMA.names: + raise KeyError( + f"Field '{field.name}' not in SCHEMA. " + f"Available: {cls.SCHEMA.names}" + ) + expected = cls.SCHEMA.field(field.name).type + if field.type != expected: + raise TypeError( + f"Field '{field.name}' type {field.type} != {expected}" + ) + return fields + + selected = [] + for name in fields: + if name not in cls.SCHEMA.names: + raise KeyError( + f"Field '{name}' not in SCHEMA. Available: {cls.SCHEMA.names}" + ) + selected.append(cls.SCHEMA.field(name)) + return pa.schema(selected) + + @property + def cache(self) -> str: + """Get the appropriate cache location.""" + cache_location = os.path.join(datasource_cache_root(), "jpss_cris") + if not self._cache: + if self._tmp_cache_hash is None: + self._tmp_cache_hash = uuid.uuid4().hex[:8] + cache_location = os.path.join( + cache_location, f"tmp_jpss_cris_{self._tmp_cache_hash}" + ) + return cache_location + + def _cache_path(self, s3_uri: str) -> str: + """Deterministic local cache path for an S3 URI.""" + sha = hashlib.sha256(s3_uri.encode()) + return os.path.join(self.cache, sha.hexdigest()) + + @classmethod + def _validate_time(cls, times: list[datetime]) -> None: + """Validate that requested times are within the data range. + + Parameters + ---------- + times : list[datetime] + Date-times to validate. + """ + start_date = min(_SAT_START_DATE.values()) + for t in times: + if t < start_date: + raise ValueError( + f"Requested date time {t} needs to be after " + f"{start_date} for JPSS CrIS" + ) + + @classmethod + def available(cls, time: datetime | np.datetime64) -> bool: + """Check whether data is available for a given time. + + Parameters + ---------- + time : datetime | np.datetime64 + Date-time to check. + + Returns + ------- + bool + """ + if isinstance(time, np.datetime64): + time = time.astype("datetime64[ns]").astype("datetime64[us]").item() + try: + cls._validate_time([time]) + except ValueError: + return False + return True diff --git a/earth2studio/lexicon/__init__.py b/earth2studio/lexicon/__init__.py index e914d4802..545a2dbed 100644 --- a/earth2studio/lexicon/__init__.py +++ b/earth2studio/lexicon/__init__.py @@ -26,7 +26,7 @@ from .goes import GOESLexicon from .hrrr import HRRRFXLexicon, HRRRLexicon from .isd import ISDLexicon -from .jpss import JPSSATMSLexicon, JPSSLexicon +from .jpss import JPSSATMSLexicon, JPSSCrISLexicon, JPSSLexicon from .metop import MetOpAMSUALexicon, MetOpAVHRRLexicon, MetOpMHSLexicon from .mrms import MRMSLexicon from .ncar import NCAR_ERA5Lexicon diff --git a/earth2studio/lexicon/jpss.py b/earth2studio/lexicon/jpss.py index 555c494d1..11bebec29 100644 --- a/earth2studio/lexicon/jpss.py +++ b/earth2studio/lexicon/jpss.py @@ -387,3 +387,106 @@ def get_item(cls, val: str) -> tuple[str, Callable[[Any], Any]]: if val not in cls.VOCAB: raise KeyError(f"Variable {val} not found in ATMS lexicon") return cls.VOCAB[val], lambda x: x + + +class JPSSCrISLexicon(metaclass=LexiconType): + """Lexicon for JPSS CrIS (Cross-track Infrared Sounder) FSR data source. + + This lexicon maps the ``crisfsr`` variable to an identity modifier for + the raw HDF5 spectral-radiance field. The data source itself applies + optional Hamming apodization (default on) followed by the inverse Planck + function to convert radiance into brightness temperature (K), so the + ``observation`` column in the returned DataFrame is directly comparable + with :class:`~earth2studio.data.UFSObsSat`. Individual channels are + distinguished by the ``channel_index`` column, which uses the GSI + ``sensor_chan`` numbering convention. + + The CrIS instrument is a Fourier-transform spectrometer operating in three + infrared spectral bands aboard Suomi NPP, NOAA-20 (JPSS-1) and NOAA-21 + (JPSS-2). In Full Spectral Resolution (FSR) mode the instrument produces + 2223 channels: + + - **LWIR** (9.14--15.38 µm, 650--1095 cm^-1): 717 channels at 0.625 cm^-1 + - **MWIR** (5.71--8.26 µm, 1210--1750 cm^-1): 869 channels at 0.625 cm^-1 + - **SWIR** (3.92--4.64 µm, 2155--2550 cm^-1): 637 channels at 0.625 cm^-1 + + When ``apodize=True`` (default in :class:`~earth2studio.data.JPSS_CRIS`), + the 2 guard channels at each end of each band (4 per band, 12 total) are + trimmed after apodization, yielding 2211 science channels with contiguous + ``channel_index`` 1--2211. + + Notes + ----- + CrIS Channel Indexing (GSI ``sensor_chan`` convention): + + The CRTM defines 2211 science channels (713 + 865 + 633) across the three + bands. Each band has 2 guard channels at the low-wavenumber end and 2 at + the high-wavenumber end that are excluded from CRTM/GSI. + + - sensor_chan 0 : LWIR guard channels 0--1 (not in GSI) + - sensor_chan 1--713 : LWIR band (713 of 717 physical channels) + - sensor_chan 0 : LWIR guard channels 715--716 (not in GSI) + - sensor_chan 0 : MWIR guard channels 717--718 (not in GSI) + - sensor_chan 714--1578 : MWIR band (865 of 869 physical channels) + - sensor_chan 0 : MWIR guard channels 1584--1585 (not in GSI) + - sensor_chan 0 : SWIR guard channels 1586--1587 (not in GSI) + - sensor_chan 1579--2211: SWIR band (633 of 637 physical channels) + - sensor_chan 0 : SWIR guard channels 2221--2222 (not in GSI) + + References + ---------- + - NOAA JPSS CrIS SDR: + https://www.star.nesdis.noaa.gov/jpss/CrIS.php + - AWS NOAA-20 open data: + https://registry.opendata.aws/noaa-nesdis-n20-pds/ + - CrIS SDR documentation: + https://ncc.nesdis.noaa.gov/documents/documentation/viirs-users-guide-tech-report-142a-v1.3.pdf + """ + + # Number of CrIS spectral channels per band (FSR mode) + CRIS_NUM_CHANNELS_LW: int = 717 + CRIS_NUM_CHANNELS_MW: int = 869 + CRIS_NUM_CHANNELS_SW: int = 637 + CRIS_NUM_CHANNELS: int = ( + CRIS_NUM_CHANNELS_LW + CRIS_NUM_CHANNELS_MW + CRIS_NUM_CHANNELS_SW + ) # 2223 + + # CrIS spatial geometry + CRIS_NUM_FOR: int = 30 # Fields of Regard per scan line + CRIS_NUM_FOV: int = 9 # Fields of View per FOR (3x3 detector array) + + # CrIS spectral band wavenumber ranges (cm^-1). + # These are the *full physical* band extents (including 4 guard channels at + # the high end of each band). CRTM/GSI science channels cover a slightly + # narrower range (650--1095, 1210--1750, 2155--2550 cm^-1) because the 4 + # highest-wavenumber guard channels per band are excluded. + CRIS_BAND_RANGES: dict[str, tuple[float, float]] = { + "LW": (650.0, 1097.5), + "MW": (1210.0, 1752.5), + "SW": (2155.0, 2552.5), + } + + VOCAB: dict[str, str] = { + "crisfsr": "spectralRadiance", + } + + @classmethod + def get_item(cls, val: str) -> tuple[str, Callable[[Any], Any]]: + """Get CrIS HDF5 dataset key for a standardised variable name. + + Parameters + ---------- + val : str + Standardised variable name (``crisfsr``) + + Returns + ------- + tuple[str, Callable] + Tuple containing: + - HDF5 dataset key for spectral radiance + - Modifier function (identity -- radiance is converted to + brightness temperature inside the data source) + """ + if val not in cls.VOCAB: + raise KeyError(f"Variable {val} not found in CrIS lexicon") + return cls.VOCAB[val], lambda x: x diff --git a/test/data/test_jpss_cris.py b/test/data/test_jpss_cris.py new file mode 100644 index 000000000..a19455243 --- /dev/null +++ b/test/data/test_jpss_cris.py @@ -0,0 +1,576 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pathlib +import shutil +from datetime import datetime, timedelta +from unittest.mock import MagicMock, patch + +import h5py +import numpy as np +import pyarrow as pa +import pytest + +from earth2studio.data import JPSS_CRIS +from earth2studio.data.jpss_cris import ( + _CRIS_GSI_SENSOR_CHAN, + _CRIS_WAVENUMBER, + _hamming_apodize, +) + + +# --------------------------------------------------------------------------- +# Unit tests for channel mapping +# --------------------------------------------------------------------------- +def test_guard_channel_layout(): + """Verify the 2+2 guard channel layout per band.""" + # Each band has 2 low-end guards (sensor_chan=0) and 2 high-end guards + # LWIR: indices 0,1 = guard; 2..714 = science (1..713); 715,716 = guard + assert _CRIS_GSI_SENSOR_CHAN[0] == 0 + assert _CRIS_GSI_SENSOR_CHAN[1] == 0 + assert _CRIS_GSI_SENSOR_CHAN[2] == 1 + assert _CRIS_GSI_SENSOR_CHAN[714] == 713 + assert _CRIS_GSI_SENSOR_CHAN[715] == 0 + assert _CRIS_GSI_SENSOR_CHAN[716] == 0 + + # MWIR: indices 717,718 = guard; 719..1583 = science (714..1578); 1584,1585 = guard + assert _CRIS_GSI_SENSOR_CHAN[717] == 0 + assert _CRIS_GSI_SENSOR_CHAN[718] == 0 + assert _CRIS_GSI_SENSOR_CHAN[719] == 714 + assert _CRIS_GSI_SENSOR_CHAN[1583] == 1578 + assert _CRIS_GSI_SENSOR_CHAN[1584] == 0 + assert _CRIS_GSI_SENSOR_CHAN[1585] == 0 + + # SWIR: indices 1586,1587 = guard; 1588..2220 = science (1579..2211); 2221,2222 = guard + assert _CRIS_GSI_SENSOR_CHAN[1586] == 0 + assert _CRIS_GSI_SENSOR_CHAN[1587] == 0 + assert _CRIS_GSI_SENSOR_CHAN[1588] == 1579 + assert _CRIS_GSI_SENSOR_CHAN[2220] == 2211 + assert _CRIS_GSI_SENSOR_CHAN[2221] == 0 + assert _CRIS_GSI_SENSOR_CHAN[2222] == 0 + + # Total guard channels + assert (_CRIS_GSI_SENSOR_CHAN == 0).sum() == 12 + # Total science channels + assert (_CRIS_GSI_SENSOR_CHAN > 0).sum() == 2211 + + +def test_wavenumber_grid_starts(): + """Verify wavenumber grid starts at correct values including guard channels.""" + # LWIR starts at 648.75 (2 guard channels below 650.0) + np.testing.assert_allclose(_CRIS_WAVENUMBER[0], 648.75) + np.testing.assert_allclose(_CRIS_WAVENUMBER[1], 649.375) + np.testing.assert_allclose(_CRIS_WAVENUMBER[2], 650.0) # first science channel + + # MWIR starts at 1208.75 + np.testing.assert_allclose(_CRIS_WAVENUMBER[717], 1208.75) + np.testing.assert_allclose(_CRIS_WAVENUMBER[719], 1210.0) # first science channel + + # SWIR starts at 2153.75 + np.testing.assert_allclose(_CRIS_WAVENUMBER[1586], 2153.75) + np.testing.assert_allclose(_CRIS_WAVENUMBER[1588], 2155.0) # first science channel + + +# --------------------------------------------------------------------------- +# Unit tests for apodization +# --------------------------------------------------------------------------- +def test_hamming_apodize_shape(): + """Verify _hamming_apodize trims guard channels correctly.""" + rng = np.random.default_rng(0) + # Single spectrum + spec_1d = rng.uniform(1, 50, 2223).astype(np.float32) + apod_1d = _hamming_apodize(spec_1d) + assert apod_1d.shape == (2211,) + + # Batch of spectra + spec_2d = rng.uniform(1, 50, (5, 2223)).astype(np.float32) + apod_2d = _hamming_apodize(spec_2d) + assert apod_2d.shape == (5, 2211) + + +def test_hamming_apodize_constant_spectrum(): + """For a spatially-constant spectrum the Hamming kernel is an identity + (0.23+0.54+0.23=1.0), so the output should equal the input at science + channels.""" + val = 42.0 + spec = np.full(2223, val, dtype=np.float64) + apod = _hamming_apodize(spec) + np.testing.assert_allclose(apod, val, atol=1e-12) + + +def test_hamming_apodize_kernel_weights(): + """Verify the 3-tap [0.23, 0.54, 0.23] kernel is applied correctly on + an interior LWIR channel.""" + spec = np.zeros(2223, dtype=np.float64) + # Set a single spike at LWIR channel 100 (interior, far from edge/guard). + # With 2 guard channels at the low end trimmed, input index 100 maps to + # apodized output index 98. + spec[100] = 1.0 + apod = _hamming_apodize(spec) + # The spike should spread to neighbours (output indices 97, 98, 99) + assert abs(apod[97] - 0.23) < 1e-14 + assert abs(apod[98] - 0.54) < 1e-14 + assert abs(apod[99] - 0.23) < 1e-14 + # All other science channels should be zero + mask = np.ones(2211, dtype=bool) + mask[97:100] = False + np.testing.assert_allclose(apod[mask], 0.0, atol=1e-14) + + +# --------------------------------------------------------------------------- +# Helper – generate a synthetic HDF5 file pair (SDR + GEO) +# --------------------------------------------------------------------------- +def _make_mock_hdf5_pair(sdr_path, geo_path, n_scan=2, n_for=30, n_fov=9, seed=42): + """Write minimal synthetic CrIS SDR and GEO HDF5 files.""" + n_lw, n_mw, n_sw = 717, 869, 637 + + # SDR file + with h5py.File(sdr_path, "w") as f: + grp = f.create_group("All_Data/CrIS-FS-SDR_All") + rng = np.random.default_rng(seed) + grp.create_dataset( + "ES_RealLW", + data=rng.uniform(5, 80, (n_scan, n_for, n_fov, n_lw)).astype(np.float32), + ) + grp.create_dataset( + "ES_RealMW", + data=rng.uniform(1, 30, (n_scan, n_for, n_fov, n_mw)).astype(np.float32), + ) + grp.create_dataset( + "ES_RealSW", + data=rng.uniform(0.1, 10, (n_scan, n_for, n_fov, n_sw)).astype(np.float32), + ) + grp.create_dataset( + "QF1_SCAN_CRISSDR", + data=np.zeros(n_scan, dtype=np.uint8), + ) + grp.create_dataset( + "QF3_CRISSDR", + data=np.zeros((n_scan, n_for, n_fov, 3), dtype=np.uint8), + ) + + # GEO file + # IET epoch: 1958-01-01T00:00:00. We need microseconds from there. + # 2024-06-01 12:00:00 UTC in IET microseconds: + base_dt = datetime(2024, 6, 1, 12, 0, 0) + iet_epoch = datetime(1958, 1, 1) + base_iet = int((base_dt - iet_epoch).total_seconds() * 1_000_000) + # Offset by seed so each granule gets distinct times/coords + time_offset = (seed - 42) * 32_000_000 # ~32s apart per seed step + + with h5py.File(geo_path, "w") as f: + grp = f.create_group("All_Data/CrIS-SDR-GEO_All") + lat_base = 30.0 + (seed - 42) * 5.0 # shift lat per granule + grp.create_dataset( + "Latitude", + data=np.linspace(lat_base, lat_base + 20, n_scan * n_for * n_fov) + .reshape(n_scan, n_for, n_fov) + .astype(np.float32), + ) + lon_base = 250.0 + (seed - 42) * 10.0 # shift lon per granule + grp.create_dataset( + "Longitude", + data=np.linspace(lon_base, lon_base + 40, n_scan * n_for * n_fov) + .reshape(n_scan, n_for, n_fov) + .astype(np.float32), + ) + grp.create_dataset( + "Height", + data=np.zeros((n_scan, n_for, n_fov), dtype=np.float32), + ) + grp.create_dataset( + "SatelliteZenithAngle", + data=np.full((n_scan, n_for, n_fov), 25.0, dtype=np.float32), + ) + grp.create_dataset( + "SatelliteAzimuthAngle", + data=np.full((n_scan, n_for, n_fov), 90.0, dtype=np.float32), + ) + grp.create_dataset( + "SolarZenithAngle", + data=np.full((n_scan, n_for, n_fov), 45.0, dtype=np.float32), + ) + grp.create_dataset( + "SolarAzimuthAngle", + data=np.full((n_scan, n_for, n_fov), 180.0, dtype=np.float32), + ) + # FORTime: (n_scan, n_for) – slight offset per FOR to simulate real data + for_time = np.full((n_scan, n_for), base_iet + time_offset, dtype=np.int64) + for s in range(n_scan): + for fi in range(n_for): + for_time[s, fi] = ( + base_iet + time_offset + (s * n_for + fi) * 200_000 + ) # 200 ms step + grp.create_dataset("FORTime", data=for_time) + + +# --------------------------------------------------------------------------- +# Network / slow tests +# --------------------------------------------------------------------------- +@pytest.mark.slow +@pytest.mark.xfail +@pytest.mark.timeout(60) +@pytest.mark.parametrize( + "time", + [ + datetime(year=2025, month=1, day=1, hour=0), + [datetime(year=2025, month=1, day=1, hour=0)], + ], +) +@pytest.mark.parametrize("variable", ["crisfsr", ["crisfsr"]]) +def test_jpss_cris_fetch(time, variable): + ds = JPSS_CRIS( + satellites=["n20"], + time_tolerance=timedelta(seconds=30), + cache=False, + verbose=False, + ) + df = ds(time, variable) + + assert list(df.columns) == ds.SCHEMA.names + assert set(df["variable"].unique()).issubset({"crisfsr"}) + assert "observation" in df.columns + assert "satellite" in df.columns + assert "channel_index" in df.columns + + if not df.empty: + # Apodized (default): contiguous sensor_chan 1..2211 + assert df["channel_index"].between(1, 2211).all() + assert df["lat"].between(-90, 90).all() + assert df["lon"].between(0, 360).all() + # Brightness temperature values should be finite and in a + # physically reasonable range (K). + assert df["observation"].notna().all() + assert (df["observation"] > 100).all() # > 100 K + assert (df["observation"] < 400).all() # < 400 K + + +@pytest.mark.slow +@pytest.mark.xfail +@pytest.mark.timeout(60) +def test_jpss_cris_schema_fields(): + ds = JPSS_CRIS( + satellites=["n20"], + time_tolerance=timedelta(seconds=30), + cache=False, + verbose=False, + ) + time = datetime(2025, 1, 1, 0) + + df_full = ds(time, ["crisfsr"], fields=None) + assert list(df_full.columns) == ds.SCHEMA.names + + subset_fields = ["time", "lat", "lon", "observation", "variable"] + df_subset = ds(time, ["crisfsr"], fields=subset_fields) + assert list(df_subset.columns) == subset_fields + + +@pytest.mark.slow +@pytest.mark.xfail +@pytest.mark.timeout(60) +@pytest.mark.parametrize("cache", [True, False]) +def test_jpss_cris_cache(cache): + ds = JPSS_CRIS( + satellites=["n20"], + time_tolerance=timedelta(seconds=30), + cache=cache, + verbose=False, + ) + df = ds(datetime(2025, 1, 1, 0), ["crisfsr"]) + assert list(df.columns) == ds.SCHEMA.names + assert pathlib.Path(ds.cache).is_dir() == cache + + try: + shutil.rmtree(ds.cache) + except FileNotFoundError: + pass + + +# --------------------------------------------------------------------------- +# Mock / offline tests (no network required) +# --------------------------------------------------------------------------- +def test_jpss_cris_call_mock(tmp_path): + """Exercise the full __call__ path with synthetic HDF5 files (apodized).""" + n_scan, n_for, n_fov = 2, 4, 9 # reduced to keep test fast + n_channels_apod = 713 + 865 + 633 # 2211 science channels after apodization + + sdr_file = tmp_path / "SCRIF_j01.h5" + geo_file = tmp_path / "GCRSO_j01.h5" + _make_mock_hdf5_pair(str(sdr_file), str(geo_file), n_scan, n_for, n_fov) + + sdr_uri = "s3://noaa-nesdis-n20-pds/CrIS-FS-SDR/2024/06/01/SCRIF_j01.h5" + geo_uri = "s3://noaa-nesdis-n20-pds/CrIS-SDR-GEO/2024/06/01/GCRSO_j01.h5" + + def fake_cache_path(self, s3_uri): + if "SCRIF" in s3_uri or "SDR" in s3_uri and "GEO" not in s3_uri: + return str(sdr_file) + return str(geo_file) + + mock_task = MagicMock( + sdr_uri=sdr_uri, + geo_uri=geo_uri, + datetime_min=datetime(2024, 6, 1, 11, 0), + datetime_max=datetime(2024, 6, 1, 13, 0), + satellite="n20", + variable="crisfsr", + modifier=lambda x: x, + ) + + with ( + patch.object(JPSS_CRIS, "_fetch_remote_file", return_value=None), + patch.object(JPSS_CRIS, "_cache_path", fake_cache_path), + patch.object(JPSS_CRIS, "_create_tasks", return_value=[mock_task]), + ): + ds = JPSS_CRIS(satellites=["n20"], cache=False, verbose=False) + df = ds(datetime(2024, 6, 1, 12), ["crisfsr"]) + + assert not df.empty + assert list(df.columns) == ds.SCHEMA.names + assert set(df["variable"].unique()) == {"crisfsr"} + # Apodized: contiguous sensor_chan 1..2211 (no guard channel sentinels) + assert df["channel_index"].between(1, 2211).all() + expected_rows = n_scan * n_for * n_fov * n_channels_apod + assert len(df) == expected_rows + assert df["satellite"].iloc[0] == "n20" + assert "quality" in df.columns + assert (df["quality"] == 0).all() + # Verify observation values are positive (our mock data uses positive values) + assert (df["observation"] > 0).all() + + +def test_jpss_cris_call_mock_unapodized(tmp_path): + """Exercise the full __call__ path with apodize=False (unapodized).""" + n_scan, n_for, n_fov = 2, 4, 9 + n_channels_raw = 717 + 869 + 637 # 2223 including guard channels + + sdr_file = tmp_path / "SCRIF_j01.h5" + geo_file = tmp_path / "GCRSO_j01.h5" + _make_mock_hdf5_pair(str(sdr_file), str(geo_file), n_scan, n_for, n_fov) + + sdr_uri = "s3://noaa-nesdis-n20-pds/CrIS-FS-SDR/2024/06/01/SCRIF_j01.h5" + geo_uri = "s3://noaa-nesdis-n20-pds/CrIS-SDR-GEO/2024/06/01/GCRSO_j01.h5" + + def fake_cache_path(self, s3_uri): + if "SCRIF" in s3_uri or "SDR" in s3_uri and "GEO" not in s3_uri: + return str(sdr_file) + return str(geo_file) + + mock_task = MagicMock( + sdr_uri=sdr_uri, + geo_uri=geo_uri, + datetime_min=datetime(2024, 6, 1, 11, 0), + datetime_max=datetime(2024, 6, 1, 13, 0), + satellite="n20", + variable="crisfsr", + modifier=lambda x: x, + ) + + with ( + patch.object(JPSS_CRIS, "_fetch_remote_file", return_value=None), + patch.object(JPSS_CRIS, "_cache_path", fake_cache_path), + patch.object(JPSS_CRIS, "_create_tasks", return_value=[mock_task]), + ): + ds = JPSS_CRIS(satellites=["n20"], apodize=False, cache=False, verbose=False) + df = ds(datetime(2024, 6, 1, 12), ["crisfsr"]) + + assert not df.empty + assert list(df.columns) == ds.SCHEMA.names + # Unapodized: sensor_chan includes 0 (guard) and 1..2211 + assert df["channel_index"].between(0, 2211).all() + expected_rows = n_scan * n_for * n_fov * n_channels_raw + assert len(df) == expected_rows + # Guard channels should have sensor_chan 0 + guard_rows = df[df["channel_index"] == 0] + assert len(guard_rows) > 0 # 12 guard channels per FOV + + +def test_jpss_cris_subsample_mock(tmp_path): + """Verify granule-level sub-sampling reduces data by the expected factor.""" + n_scan, n_for, n_fov = 2, 4, 9 + n_channels_apod = 713 + 865 + 633 # 2211 (default: apodized) + + # Create 3 distinct granule file pairs (different seeds → different coords) + granule_files = [] + for i in range(3): + sdr_file = tmp_path / f"SCRIF_j01_{i}.h5" + geo_file = tmp_path / f"GCRSO_j01_{i}.h5" + _make_mock_hdf5_pair( + str(sdr_file), str(geo_file), n_scan, n_for, n_fov, seed=42 + i + ) + granule_files.append((str(sdr_file), str(geo_file))) + + sdr_uris = [f"s3://bucket/SDR/granule_{i}.h5" for i in range(3)] + geo_uris = [f"s3://bucket/GEO/granule_{i}.h5" for i in range(3)] + + def fake_cache_path(self, s3_uri): + for i in range(3): + if s3_uri == sdr_uris[i]: + return granule_files[i][0] + if s3_uri == geo_uris[i]: + return granule_files[i][1] + return str(tmp_path / "missing.h5") + + mock_tasks = [ + MagicMock( + sdr_uri=sdr_uris[i], + geo_uri=geo_uris[i], + datetime_min=datetime(2024, 6, 1, 11, 0), + datetime_max=datetime(2024, 6, 1, 13, 0), + satellite="n20", + variable="crisfsr", + modifier=lambda x: x, + ) + for i in range(3) + ] + + rows_per_granule = n_scan * n_for * n_fov * n_channels_apod + + # subsample=1 → all 3 granules + with ( + patch.object(JPSS_CRIS, "_fetch_remote_file", return_value=None), + patch.object(JPSS_CRIS, "_cache_path", fake_cache_path), + patch.object(JPSS_CRIS, "_create_tasks", return_value=list(mock_tasks)), + ): + ds_full = JPSS_CRIS(satellites=["n20"], subsample=1, cache=False, verbose=False) + df_full = ds_full(datetime(2024, 6, 1, 12), ["crisfsr"]) + + assert len(df_full) == 3 * rows_per_granule + + # subsample=2 → granules [0, 2] (every 2nd), so 2 granules + with ( + patch.object(JPSS_CRIS, "_fetch_remote_file", return_value=None), + patch.object(JPSS_CRIS, "_cache_path", fake_cache_path), + patch.object(JPSS_CRIS, "_create_tasks", return_value=list(mock_tasks)), + ): + ds_sub = JPSS_CRIS(satellites=["n20"], subsample=2, cache=False, verbose=False) + df_sub = ds_sub(datetime(2024, 6, 1, 12), ["crisfsr"]) + + assert len(df_sub) == 2 * rows_per_granule + + # subsample=3 → only granule [0], so 1 granule + with ( + patch.object(JPSS_CRIS, "_fetch_remote_file", return_value=None), + patch.object(JPSS_CRIS, "_cache_path", fake_cache_path), + patch.object(JPSS_CRIS, "_create_tasks", return_value=list(mock_tasks)), + ): + ds_sub3 = JPSS_CRIS(satellites=["n20"], subsample=3, cache=False, verbose=False) + df_sub3 = ds_sub3(datetime(2024, 6, 1, 12), ["crisfsr"]) + + assert len(df_sub3) == 1 * rows_per_granule + + +# --------------------------------------------------------------------------- +# Validation / error tests (no network) +# --------------------------------------------------------------------------- +@pytest.mark.timeout(15) +def test_jpss_cris_available(): + assert JPSS_CRIS.available(datetime(2024, 6, 1, 12)) + assert not JPSS_CRIS.available(datetime(2015, 1, 1)) + assert JPSS_CRIS.available(np.datetime64("2024-06-01T12:00")) + assert not JPSS_CRIS.available(np.datetime64("2015-01-01T00:00")) + + +@pytest.mark.timeout(15) +def test_jpss_cris_validate_time(): + with pytest.raises(ValueError): + ds = JPSS_CRIS(satellites=["n20"], cache=False) + ds(datetime(2015, 1, 1), ["crisfsr"]) + + +@pytest.mark.timeout(15) +def test_jpss_cris_invalid_satellite(): + with pytest.raises(ValueError, match="Invalid satellite"): + JPSS_CRIS(satellites=["invalid"]) + + +def test_jpss_cris_exceptions(): + ds = JPSS_CRIS(satellites=["n20"], cache=False, verbose=False) + + with pytest.raises(KeyError): + ds(datetime(2024, 6, 1, 12), ["invalid_variable"]) + + with pytest.raises(KeyError): + ds( + datetime(2024, 6, 1, 12), + ["crisfsr"], + fields=["observation", "variable", "invalid_field"], + ) + + invalid_schema = pa.schema( + [ + pa.field("observation", pa.float32()), + pa.field("variable", pa.string()), + pa.field("nonexistent", pa.float32()), + ] + ) + with pytest.raises(KeyError): + ds(datetime(2024, 6, 1, 12), ["crisfsr"], fields=invalid_schema) + + wrong_type_schema = pa.schema( + [ + pa.field("observation", pa.float32()), + pa.field("variable", pa.string()), + pa.field("time", pa.string()), + ] + ) + with pytest.raises(TypeError): + ds(datetime(2024, 6, 1, 12), ["crisfsr"], fields=wrong_type_schema) + + +def test_jpss_cris_tolerance_conversion(): + ds_td = JPSS_CRIS(time_tolerance=timedelta(minutes=30), cache=False, verbose=False) + assert ds_td._tolerance_lower == timedelta(minutes=-30) + assert ds_td._tolerance_upper == timedelta(minutes=30) + + ds_np = JPSS_CRIS( + time_tolerance=np.timedelta64(30, "m"), cache=False, verbose=False + ) + assert ds_np._tolerance_lower == timedelta(minutes=-30) + assert ds_np._tolerance_upper == timedelta(minutes=30) + + ds_asym = JPSS_CRIS( + time_tolerance=(np.timedelta64(-10, "m"), np.timedelta64(60, "m")), + cache=False, + verbose=False, + ) + assert ds_asym._tolerance_lower == timedelta(minutes=-10) + assert ds_asym._tolerance_upper == timedelta(minutes=60) + + +def test_jpss_cris_parse_filename(): + # Valid CrIS SDR filename + t = JPSS_CRIS._parse_filename_time( + "SCRIF_j01_d20250101_t0000391_e0000392_b12345_c20250101010000_oebc_ops.h5" + ) + assert t == datetime(2025, 1, 1, 0, 0, 39) + + # Invalid filename + assert JPSS_CRIS._parse_filename_time("random_file.h5") is None + + # Truncated date + assert JPSS_CRIS._parse_filename_time("SCRIF_j01_d202.h5") is None + + +def test_jpss_cris_granule_key(): + sdr = "SCRIF_j01_d20250101_t0000391_e0000392_b12345_c20250101010000_oebc_ops.h5" + geo = "GCRSO_j01_d20250101_t0000391_e0000392_b12345_c20250101010099_oebc_ops.h5" + # Same granule → same key despite different creation timestamps + assert JPSS_CRIS._granule_key(sdr) == JPSS_CRIS._granule_key(geo) + assert JPSS_CRIS._granule_key(sdr) == "j01_d20250101_t0000391_e0000392_b12345" + + # Different orbit → different key + sdr2 = "SCRIF_j01_d20250101_t0000391_e0000392_b99999_c20250101010000_oebc_ops.h5" + assert JPSS_CRIS._granule_key(sdr) != JPSS_CRIS._granule_key(sdr2)