diff --git a/.claude/skills/create-data-source/SKILL.md b/.claude/skills/create-data-source/SKILL.md index c359fe6bd..08aef638c 100644 --- a/.claude/skills/create-data-source/SKILL.md +++ b/.claude/skills/create-data-source/SKILL.md @@ -1718,724 +1718,14 @@ Present: --- -## Step 13 — Run Tests +## Step 13 — Validate, Test, Submit & Review -### 13a. Run the new test file - -```bash -uv run python -m pytest test/data/test_.py -v --timeout=60 -``` - -All tests must pass (or `xfail` for network tests). Fix failures -and re-run until green. - -### 13b. Run coverage report with `--slow` tests - -Run the new test file **with coverage** and the `--slow` flag to -include network tests. The new data source file must achieve -**at least 90% line coverage**: - -```bash -uv run python -m pytest test/data/test_.py -v \ - --slow --timeout=300 \ - --cov=earth2studio/data/ \ - --cov-report=term-missing \ - --cov-fail-under=90 -``` - -- `--slow` enables network tests (marked `@pytest.mark.slow`) -- `--cov=earth2studio/data/` scopes coverage to the - new source module only -- `--cov-report=term-missing` shows which lines are not covered -- `--cov-fail-under=90` fails the run if coverage is below 90% - -If coverage is below 90%, add additional tests or mock tests to -cover the missing lines. Common gaps: - -- Error handling branches (e.g., empty products, invalid data) -- Edge cases in parsing (e.g., missing fields, corrupt records) -- Cache property paths (`cache=True` vs `cache=False`) -- `resolve_fields` with different input types - -Re-run until coverage is at or above 90%. - -### 13c. Run the full data test suite (optional but recommended) - -```bash -make pytest TOX_ENV=test-data -``` - -Confirm no regressions. - ---- - -## Step 14 — Validate Variables, Summarize, and Sanity-Check Plots - -Create a **standalone Python script** in the repo root. This is for -PR reviewer reference only and should **NOT** be committed to the -repo. - -### 14a. Script templates - -**For DataSource / ForecastSource** (gridded data): - -```python -"""Sanity-check plot for data source. - -This script is for PR review only — do NOT commit to the repo. -Run it to produce a quick visualization confirming the source works. -""" -import matplotlib.pyplot as plt -import numpy as np - -from earth2studio.data import SourceName - -# Fetch a sample -ds = SourceName(cache=False) -time = ... # pick a recent valid time -variables = ["t2m", "msl", "u10m"] # 1-3 representative variables -data = ds(time, variables) - -# Plot contours for each variable -fig, axes = plt.subplots(1, len(variables), figsize=(6 * len(variables), 5)) -if len(variables) == 1: - axes = [axes] - -for ax, var in zip(axes, variables): - im = data.sel(variable=var).isel(time=0).plot(ax=ax, cmap="turbo") - ax.set_title(f"{var}") - -plt.suptitle(f" — {time}", y=1.02) -plt.tight_layout() -plt.savefig("sanity_check_.png", dpi=150, bbox_inches="tight") -print("Saved: sanity_check_.png") -``` - -**For DataFrameSource / ForecastFrameSource** (sparse data): - -```python -"""Sanity-check plot for data source. - -This script is for PR review only — do NOT commit to the repo. -Run it to produce a quick visualization confirming the source works. -""" -import cartopy.crs as ccrs -import cartopy.feature as cfeature -import matplotlib.pyplot as plt -import numpy as np - -from earth2studio.data import SourceName - -# Fetch a sample -ds = SourceName(cache=False) -time = ... # pick a recent valid time -variables = ["var1", "var2"] # 1-3 representative variables -df = ds(time, variables) - -# Convert lon from [0,360] to [-180,180] for cartopy -df["lon_plt"] = df["lon"].where(df["lon"] <= 180, df["lon"] - 360) - -# Plot scatter on globe for each variable (cartopy projection) -fig, axes = plt.subplots( - 1, len(variables), - figsize=(8 * len(variables), 5), - subplot_kw={"projection": ccrs.Robinson()}, -) -if len(variables) == 1: - axes = [axes] - -for ax, var in zip(axes, variables): - subset = df[df["variable"] == var] - obs = subset["observation"].values - # Use percentile clipping to avoid outliers blowing out the colorbar - vmin, vmax = np.percentile(obs[np.isfinite(obs)], [2, 98]) - - ax.set_global() - ax.add_feature(cfeature.COASTLINE, linewidth=0.5) - ax.add_feature(cfeature.BORDERS, linewidth=0.3, alpha=0.5) - - sc = ax.scatter( - subset["lon_plt"], subset["lat"], - c=obs, s=2, cmap="turbo", alpha=0.8, - vmin=vmin, vmax=vmax, edgecolors="none", - transform=ccrs.PlateCarree(), - ) - ax.set_title(f"{var} ({len(subset)} obs)\nrange: {vmin:.0f}–{vmax:.0f} (p2–p98)") - plt.colorbar(sc, ax=ax, shrink=0.6, label="Observation", - orientation="horizontal", pad=0.05) - -plt.suptitle(f" — {time}", y=1.0) -plt.tight_layout() -plt.savefig("sanity_check_.png", dpi=150, bbox_inches="tight") -print("Saved: sanity_check_.png") -``` - -> **Tip:** Always use `cartopy` with a proper map projection -> (Robinson, PlateCarree, etc.) for geospatial scatter plots. -> Without a projection, satellite swath data looks distorted and -> hard to interpret. Use percentile-clipped `vmin`/`vmax` and -> `s >= 2` marker size — without clipping, outliers compress the -> colorbar and make the plot appear blank. - -### 14b. Validate all lexicon variables - -**Every variable in the lexicon must be validated against real data.** -A variable that exists in the lexicon but consistently produces missing -or invalid data should be **removed** from the lexicon and -`E2STUDIO_VOCAB`. - -Run a validation script that fetches a representative sample and -checks every variable: - -```python -"""Variable validation for . - -Checks every lexicon variable against real data to confirm it -returns valid observations. Variables with very low valid-data -rates should be removed from the lexicon. -""" -from datetime import datetime - -from earth2studio.data import SourceName -from earth2studio.lexicon import SourceNameLexicon - -ds = SourceName(cache=True) -time = ... # pick a recent valid time -all_vars = list(SourceNameLexicon.VOCAB.keys()) -df = ds(time, all_vars) - -print(f"{'Variable':<16} {'Obs Count':>10} {'Valid %':>8} {'Min':>10} {'Max':>10}") -print("-" * 60) -for var in sorted(all_vars): - sub = df[df["variable"] == var] - n_total = len(sub) - n_valid = sub["observation"].notna().sum() - pct = (n_valid / n_total * 100) if n_total > 0 else 0 - vmin = sub["observation"].min() if n_valid > 0 else float("nan") - vmax = sub["observation"].max() if n_valid > 0 else float("nan") - flag = " *** REMOVE" if pct < 10 else "" - print(f"{var:<16} {n_total:>10} {pct:>7.1f}% {vmin:>10.2f} {vmax:>10.2f}{flag}") -``` - -**Action required:** - -- Variables with **< 10% valid data** must be removed from: - 1. The lexicon class `VOCAB` dict - 2. `E2STUDIO_VOCAB` in `earth2studio/lexicon/base.py` - 3. The class docstring (note the removal and reason) -- Variables with **10-50% valid data** should be kept but documented - with a note explaining the low coverage (e.g., day/night switching, - quality filtering) -- After removing variables, re-run `make lint` and tests - -### 14c. Summarize variables and time range to user - -Before generating sanity-check plots, **present a summary table** to -the user covering: - -1. **All valid variables** — name, description, observation count, - value range -2. **Removed variables** — name, reason for removal (e.g., "> 97% - missing data") -3. **Valid time range** — earliest and latest data available from the - remote store -4. **Typical data density** — observations per orbit/file, - approximate global coverage cadence - -Example summary format: - -> **Variable Summary for MetOpAMSUA (14 channels):** -> -> | Variable | Freq (GHz) | Layer | Obs/orbit | BT Range (K) | -> |----------|-----------|-------|-----------|---------------| -> | amsua01 | 23.8 | Surface | 22,950 | 142–291 | -> | amsua02 | 31.4 | Surface | 22,950 | 145–290 | -> | ... | | | | | -> -> **Removed:** `amsua15` (89.0 GHz) — L1B product marks ~97% of -> measurements as missing due to quality filtering. -> -> **Time range:** 2007-10-22 (Metop-A launch) to present. -> Metop-B (2012-09-17 to present), Metop-C (2018-11-07 to present). -> -> **Data density:** ~767 scan lines × 30 FOVs = 23,010 obs per orbit, -> ~14 orbits/day, global coverage twice daily. - -This summary helps the user understand what they are getting from the -data source and verify it matches their expectations. - -### 14d. Create sanity-check plot script - -Create a **standalone Python script** in the repo root. This is for -PR reviewer reference only and should **NOT** be committed to the -repo. See 14a above for templates. - -### 14e. Run the script - -Execute the script and **verify the output images exist and look -reasonable**: - -```bash -python sanity_check_.py -``` - -Check that: - -- The script runs without errors -- Output PNGs are generated -- Data points appear in expected geographic regions -- Values are in physically reasonable ranges (e.g., temperatures - 200–320 K, not 0 or NaN) -- For DataFrame sources: observations form recognizable swath or - station patterns - -### 14f. **[CONFIRM — Sanity-Check Plots]** - -**You MUST ask the user to visually inspect the generated plot(s) -before proceeding.** Do not skip this step even if the script ran -without errors — a successful run does not guarantee the plots are -correct (e.g., empty axes, wrong colorbar range, garbled data). - -Tell the user the absolute path to the generated image file(s) and -ask them to open and inspect the output: - -> The sanity-check script ran successfully and saved the following -> plot: -> -> `/absolute/path/to/sanity_check_.png` -> -> **Please open this image and confirm it looks correct.** Check: +> **This step is handled by the `validate-data-source` skill.** > -> 1. Data points are visible on the axes (not blank/empty) -> 2. Geographic coverage matches expectations (global swaths, -> regional stations, etc.) -> 3. Colorbar values are in physically reasonable ranges -> 4. No obvious artifacts (all-NaN regions, garbled coordinates) -> -> Does the plot look correct? - -**Do not proceed to Step 15 until the user explicitly confirms the -plots look correct.** If the user reports problems (empty plots, -wrong ranges, missing coverage), debug and fix the issue, re-run -the script, and ask the user to inspect again. - -The output images will be uploaded to the PR description in -Step 15. - ---- - -## Step 15 — Branch, Commit & Open PR - -### **[CONFIRM — Ready to Submit]** - -Before proceeding, confirm with the user: - - > All implementation steps are complete: - > - > - Data source class implemented with correct method ordering - > - Lexicon class created with variable mappings - > - **All lexicon variables validated against real data** (low-validity - > variables removed) - > - **Variable summary and time range presented to user** - > - E2STUDIO_VOCAB / E2STUDIO_SCHEMA updated (if needed) - > - Registered in `__init__.py` files - > - Documentation RST files updated with badges - > - Reference URLs included in docstrings - > - CHANGELOG.md updated - > - Format, lint, and license checks pass - > - Unit tests written and passing - > - Dependencies in `data` extras group confirmed - > - Sanity-check plots generated and confirmed by user - > - > Ready to create a branch, commit, and prepare a PR? - -### 15a. Create branch and commit - -```bash -git checkout -b feat/data-source- -git add earth2studio/data/.py \ - earth2studio/data/__init__.py \ - earth2studio/data/utils.py \ - earth2studio/lexicon/.py \ - earth2studio/lexicon/__init__.py \ - earth2studio/lexicon/base.py \ - test/data/test_.py \ - docs/modules/datasources_*.rst \ - pyproject.toml \ - CHANGELOG.md -git commit -m "feat: add data source - -Add for . -Includes lexicon, unit tests, and documentation." -``` - -Do **NOT** add the sanity-check script or its output image. - -### 15b. Identify the fork remote and push branch - -The working repository is typically a **fork** of -`NVIDIA/earth2studio`. Before pushing, confirm which git remote -points to the user's fork: - -```bash -git remote -v -``` - -Ask the user: - -> Which git remote is your fork of `NVIDIA/earth2studio`? -> (Usually `origin` — e.g., `git@github.com:/earth2studio.git`) - -Then push the feature branch to the **fork** remote: - -```bash -git push -u feat/data-source- -``` - -### 15c. Open Pull Request (fork → NVIDIA/earth2studio) - -> **Important:** PRs must be opened **from the fork** to the -> **upstream source repository** `NVIDIA/earth2studio`. The branch -> lives on the fork; the PR targets `main` (or the appropriate -> base branch) on the upstream repo. - -Use `gh pr create` with explicit `--repo` and `--head` flags: - -```bash -gh pr create \ - --repo NVIDIA/earth2studio \ - --base main \ - --head :feat/data-source- \ - --title "feat: add data source" \ - --body "..." -``` - -Where `` is the GitHub username that owns the fork -(e.g., `NickGeneva`). - -The PR should follow the repo's template and -include the data licensing section, the sanity-check script inside -a `
` block, and the sanity-check images uploaded to the -PR body: - -````markdown -## Description - -Add `` for . - -Closes # (if applicable) - -### Data source details - -| Property | Value | -|---|---| -| **Source type** | DataSource / ForecastSource / DataFrameSource / ForecastFrameSource | -| **Remote store** | | -| **Format** | GRIB2 / NetCDF / Zarr / CSV / etc. | -| **Spatial resolution** | X deg x Y deg (NxM grid) | -| **Temporal resolution** | Hourly / 6-hourly / daily / etc. | -| **Date range** | YYYY-MM-DD to present | -| **Region** | Global / Regional | -| **Authentication** | Anonymous / API key / etc. | - -### Data licensing - -> **License**: -> **URL**: -> -> "Open data, freely available for commercial and -> non-commercial use" or "Requires attribution, non-commercial -> use only"> - -### Dependencies added - -| Package | Version | License | License URL | Reason | -|---|---|---|---|---| -| `` | `>=X.Y` | | [link]() | | - -*(or "No new dependencies needed")* - -When filling this table, look up each new dependency's license: - -1. Check the package's PyPI page (`https://pypi.org/project//`) - or its repository for the license type -2. Link directly to the license file (e.g., GitHub raw LICENSE or - PyPI license classifier URL) -3. Flag any **non-permissive licenses** (GPL, AGPL, SSPL) — these - may be incompatible with the project's Apache-2.0 license and - require team review before merging - -### Validation - -See sanity-check validation in PR comments below (posted in Step 15d). - -## Checklist - -- [x] I am familiar with the [Contributing Guidelines][contrib]. -- [x] New or existing tests cover these changes. -- [x] The documentation is up to date with these changes. -- [x] The [CHANGELOG.md][changelog] is up to date with these changes. -- [ ] An [issue][issues] is linked to this pull request. -- [ ] Assess and address Greptile feedback (AI code review bot). - -[contrib]: https://github.com/NVIDIA/earth2studio/blob/main/CONTRIBUTING.md -[changelog]: https://github.com/NVIDIA/earth2studio/blob/main/CHANGELOG.md -[issues]: https://github.com/NVIDIA/earth2studio/issues - -## Dependencies - - - -### 15d. Post sanity-check plot as PR comment - -After the PR is created, post the sanity-check visualization as a -separate **PR comment** so it is immediately visible to reviewers -without expanding a `
` block. This serves as quick visual -evidence that the data source produces correct output. - -#### Image upload limitation - -**GitHub has no CLI or REST API for uploading images to PR comments.** -The uploads endpoint (`uploads.github.com`) and GraphQL mutations do -not support attaching local image files to issue/PR comments. The -only way to embed an image is via the browser's drag-and-drop editor -or by referencing an already-hosted URL. - -**Practical workflow:** - -1. Post the PR comment **without** the image — include the - validation table, the full sanity-check script, and a placeholder - line: `> **TODO:** Attach sanity-check image by editing this - comment in the browser.` -2. Tell the user: *"The image is at ``. Edit the PR - comment in your browser and drag the file into the editor to - embed it."* -3. Use `gh api -X PATCH` to update the comment body (via - `-F body=@/tmp/comment.md`) if the text needs revision later. - -Do **not** waste time trying `curl` uploads, GraphQL file mutations, -or the `uploads.github.com` asset endpoint — they do not work for -issue/PR comment images. - -#### Writing the comment body - -Write the comment body to a temp file first, then post it. This -avoids shell quoting issues with heredocs containing backticks and -markdown: - -```bash -# 1. Write body to a temp file (use your editor tool, not heredoc) -# Include: summary table, key findings, script in
- -# 2. Post the comment -gh api -X POST repos/NVIDIA/earth2studio/issues//comments \ - -F "body=@/tmp/pr_comment_body.md" \ - --jq '.html_url' -``` - -#### Comment content template - -Adapt to the source type. Include only relevant rows — omit -satellite-specific rows for gridded sources, omit grid dimensions -for DataFrame sources, etc. - -```markdown -## Sanity-Check Validation - -**Source:** `` — -**Time:** YYYY-MM-DD HH:MM UTC -**Parameters:** - -| Metric | Value | -|--------|-------| -| Output shape / rows | | -| Variables | | -| Value range | | -| Lat range | ° | -| Lon range | ° | -| Missing / NaN | | - -*Add source-specific rows as needed (channels, satellites, grid -dimensions, lead times, etc.)* - -**Key findings:** -- -- -- - -> **TODO:** Attach sanity-check image by editing this comment in -> the browser. - -
-Sanity-check script (click to expand) - -PASTE THE FULL WORKING SCRIPT HERE — not a truncated excerpt. -The script must be copy-pasteable and produce the plot end-to-end. - -
-``` - -**Important:** Always paste the **complete, runnable** script — not -a shortened version. Reviewers should be able to reproduce the plot -by copying the script directly. - -#### Comparison sources - -If a **comparison data source** exists for the same physical -quantity (e.g., UFSObsSat for satellite BT, ERA5 for reanalysis -fields), include a side-by-side comparison in the same comment. -Briefly summarize expected differences: - -- Raw vs QC-subsetted data (different observation counts) -- Different spatial coverage or resolution -- Different time windows or update cadence -- Unit or coordinate convention differences - -#### Finalize - -After posting, inform the user of: - -1. The comment URL -2. The local path to the image file for manual attachment -3. Instructions: *"Edit the comment in your browser and drag the - image file into the editor to embed it."* - -> **Note:** The sanity-check image and script are for PR review -> purposes only — they must NOT be committed to the repository. - ---- - -## Step 16 — Automated Code Review (Greptile) - -After the PR is created and pushed, an automated code review from -**greptile-apps** (Greptile) will be posted as PR review comments. -Wait for this review, then process the feedback. - -### 16a. Wait for Greptile review - -Poll for review comments from `greptile-apps[bot]` every 30 seconds -for up to **5 minutes**. Time out gracefully if no review arrives: - -```bash -# Poll loop — check every 30s, timeout after 5 minutes (10 attempts) -for i in $(seq 1 10); do - REVIEW_ID=$(gh api repos/NVIDIA/earth2studio/pulls//reviews \ - --jq '.[] | select(.user.login == "greptile-apps[bot]") | .id' 2>/dev/null) - if [ -n "$REVIEW_ID" ]; then - echo "Greptile review found: $REVIEW_ID" - break - fi - echo "Attempt $i/10 — no review yet, waiting 30s..." - sleep 30 -done -``` - -If no review after 5 minutes, inform the user: - -> Greptile hasn't posted a review after 5 minutes. This can happen if -> the review bot is busy or the PR hasn't triggered it. You can: -> -> 1. Ask me to check again later -> 2. Skip this step and proceed without automated review -> 3. Manually request a review from Greptile on the PR page - -### 16b. Pull and parse review comments - -Once the review is posted, fetch all comments: - -```bash -# Get all review comments on the PR -gh api repos/NVIDIA/earth2studio/pulls//comments \ - --jq '.[] | select(.user.login == "greptile-apps[bot]") | - {path: .path, line: .diff_hunk, body: .body}' -``` - -Also fetch the top-level review body: - -```bash -gh api repos/NVIDIA/earth2studio/pulls//reviews \ - --jq '.[] | select(.user.login == "greptile-apps[bot]") | .body' -``` - -### 16c. Categorize and present to user - -Parse each comment and categorize it: - -| Category | Description | Default action | -|---|---|---| -| **Bug / correctness** | Logic errors, wrong behavior | Fix | -| **Style / convention** | Naming, formatting, patterns | Fix if valid | -| **Performance** | Inefficiency, resource waste | Evaluate | -| **Documentation** | Missing/wrong docs, docstrings | Fix | -| **Suggestion** | Alternative approach, nice-to-have | User decides | -| **False positive** | Incorrect or irrelevant feedback | Dismiss | - -### **[CONFIRM — Review Triage]** - -Present each comment to the user in a summary table: - -```markdown -| # | File | Line | Category | Summary | Proposed Action | -|---|------|------|----------|---------|-----------------| -| 1 | metop_amsua.py | 142 | Bug | Missing null check | Fix: add guard | -| 2 | metop_avhrr.py | 305 | Style | Use f-string | Fix: convert | -| 3 | metop.py | 45 | Suggestion | Add type alias | Skip: not needed | -| ... | ... | ... | ... | ... | ... | -``` - -For each comment, briefly explain: -- What Greptile flagged -- Whether you agree or disagree (with reasoning) -- Your proposed fix (or why to skip) - -Ask the user to confirm which comments to address. The user may: -- Accept all proposed fixes -- Select specific fixes -- Override your recommendation on any comment -- Add their own fixes - -### 16d. Implement fixes - -For each accepted fix: - -1. Make the code change -2. Run `make format && make lint` after all fixes -3. Run the relevant tests -4. Commit with a message like: - `fix: address code review feedback (Greptile)` - -### 16e. Respond to review comments - -For each Greptile comment, post a reply on the PR: - -**For fixed comments:** - -```bash -gh api repos/NVIDIA/earth2studio/pulls//comments//replies \ - -f body="Fixed in . " -``` - -**For dismissed comments (false positives / won't fix):** - -```bash -gh api repos/NVIDIA/earth2studio/pulls//comments//replies \ - -f body="Won't fix — " -``` - -### 16f. Push and resolve - -```bash -git push origin -``` - -After pushing, resolve all addressed review threads if possible. - -Inform the user of the final state: -- How many comments were fixed -- How many were dismissed (with reasons) -- Any remaining open threads +> Invoke the `validate-data-source` skill to complete the remaining +> steps: running tests with coverage, validating variables against +> real data, generating sanity-check plots, opening a PR, and +> processing automated code review. --- diff --git a/.claude/skills/validate-data-source/SKILL.md b/.claude/skills/validate-data-source/SKILL.md new file mode 100644 index 000000000..526a93a70 --- /dev/null +++ b/.claude/skills/validate-data-source/SKILL.md @@ -0,0 +1,793 @@ +--- +name: validate-data-source +description: Validate a newly created Earth2Studio data source by running tests, checking coverage, validating variables against real data, generating sanity-check plots, and opening a PR with automated code review. Use this skill after completing data source implementation (create-data-source skill Steps 0-12) to run the full validation, submission, and review pipeline. +argument-hint: Name of the data source class and test file (optional — will be inferred from recent changes if not provided) +--- + +# Validate Data Source + +> **Python Environment:** This project uses **uv** for dependency +> management. Always use the local `.venv` virtual environment +> (`source .venv/bin/activate` or prefix with `uv run python`) for all +> Python commands — installing packages, running tests, executing +> scripts, etc. Use `uv add` / `uv pip install` / `uv lock` instead of `pip install`. + +Validate a newly created Earth2Studio data source by running tests, +checking coverage, validating variables, generating sanity-check plots, +and opening a PR. This skill picks up after the `create-data-source` +skill completes implementation (Steps 0-12). + +Each confirmation gate marked by: + +```markdown +### **[CONFIRM — ]** +``` + +requires **explicit user approval** before proceeding. + +--- + +## Step 1 — Run Tests + +### 1a. Run the new test file + +```bash +uv run python -m pytest test/data/test_<filename>.py -v --timeout=60 +``` + +All tests must pass (or `xfail` for network tests). Fix failures +and re-run until green. + +### 1b. Run coverage report with `--slow` tests + +Run the new test file **with coverage** and the `--slow` flag to +include network tests. The new data source file must achieve +**at least 90% line coverage**: + +```bash +uv run python -m pytest test/data/test_<filename>.py -v \ + --slow --timeout=300 \ + --cov=earth2studio/data/<filename> \ + --cov-report=term-missing \ + --cov-fail-under=90 +``` + +- `--slow` enables network tests (marked `@pytest.mark.slow`) +- `--cov=earth2studio/data/<filename>` scopes coverage to the + new source module only +- `--cov-report=term-missing` shows which lines are not covered +- `--cov-fail-under=90` fails the run if coverage is below 90% + +If coverage is below 90%, add additional tests or mock tests to +cover the missing lines. Common gaps: + +- Error handling branches (e.g., empty products, invalid data) +- Edge cases in parsing (e.g., missing fields, corrupt records) +- Cache property paths (`cache=True` vs `cache=False`) +- `resolve_fields` with different input types + +Re-run until coverage is at or above 90%. + +### 1c. Run the full data test suite (optional but recommended) + +```bash +make pytest TOX_ENV=test-data +``` + +Confirm no regressions. + +--- + +## Step 2 — Validate Variables, Summarize, and Sanity-Check Plots + +Create a **standalone Python script** in the repo root. This is for +PR reviewer reference only and should **NOT** be committed to the +repo. + +### 2a. Script templates + +**For DataSource / ForecastSource** (gridded data): + +```python +"""Sanity-check plot for <SourceName> data source. + +This script is for PR review only — do NOT commit to the repo. +Run it to produce a quick visualization confirming the source works. +""" +import matplotlib.pyplot as plt +import numpy as np + +from earth2studio.data import SourceName + +# Fetch a sample +ds = SourceName(cache=False) +time = ... # pick a recent valid time +variables = ["t2m", "msl", "u10m"] # 1-3 representative variables +data = ds(time, variables) + +# Plot contours for each variable +fig, axes = plt.subplots(1, len(variables), figsize=(6 * len(variables), 5)) +if len(variables) == 1: + axes = [axes] + +for ax, var in zip(axes, variables): + im = data.sel(variable=var).isel(time=0).plot(ax=ax, cmap="turbo") + ax.set_title(f"{var}") + +plt.suptitle(f"<SourceName> — {time}", y=1.02) +plt.tight_layout() +plt.savefig("sanity_check_<source_name>.png", dpi=150, bbox_inches="tight") +print("Saved: sanity_check_<source_name>.png") +``` + +**For DataFrameSource / ForecastFrameSource** (sparse data): + +```python +"""Sanity-check plot for <SourceName> data source. + +This script is for PR review only — do NOT commit to the repo. +Run it to produce a quick visualization confirming the source works. +""" +import cartopy.crs as ccrs +import cartopy.feature as cfeature +import matplotlib.pyplot as plt +import numpy as np + +from earth2studio.data import SourceName + +# Fetch a sample +ds = SourceName(cache=False) +time = ... # pick a recent valid time +variables = ["var1", "var2"] # 1-3 representative variables +df = ds(time, variables) + +# Convert lon from [0,360] to [-180,180] for cartopy +df["lon_plt"] = df["lon"].where(df["lon"] <= 180, df["lon"] - 360) + +# Plot scatter on globe for each variable (cartopy projection) +fig, axes = plt.subplots( + 1, len(variables), + figsize=(8 * len(variables), 5), + subplot_kw={"projection": ccrs.Robinson()}, +) +if len(variables) == 1: + axes = [axes] + +for ax, var in zip(axes, variables): + subset = df[df["variable"] == var] + obs = subset["observation"].values + # Use percentile clipping to avoid outliers blowing out the colorbar + vmin, vmax = np.percentile(obs[np.isfinite(obs)], [2, 98]) + + ax.set_global() + ax.add_feature(cfeature.COASTLINE, linewidth=0.5) + ax.add_feature(cfeature.BORDERS, linewidth=0.3, alpha=0.5) + + sc = ax.scatter( + subset["lon_plt"], subset["lat"], + c=obs, s=2, cmap="turbo", alpha=0.8, + vmin=vmin, vmax=vmax, edgecolors="none", + transform=ccrs.PlateCarree(), + ) + ax.set_title(f"{var} ({len(subset)} obs)\nrange: {vmin:.0f}–{vmax:.0f} (p2–p98)") + plt.colorbar(sc, ax=ax, shrink=0.6, label="Observation", + orientation="horizontal", pad=0.05) + +plt.suptitle(f"<SourceName> — {time}", y=1.0) +plt.tight_layout() +plt.savefig("sanity_check_<source_name>.png", dpi=150, bbox_inches="tight") +print("Saved: sanity_check_<source_name>.png") +``` + +> **Tip:** Always use `cartopy` with a proper map projection +> (Robinson, PlateCarree, etc.) for geospatial scatter plots. +> Without a projection, satellite swath data looks distorted and +> hard to interpret. Use percentile-clipped `vmin`/`vmax` and +> `s >= 2` marker size — without clipping, outliers compress the +> colorbar and make the plot appear blank. + +### 2b. Validate all lexicon variables + +**Every variable in the lexicon must be validated against real data.** +A variable that exists in the lexicon but consistently produces missing +or invalid data should be **removed** from the lexicon and +`E2STUDIO_VOCAB`. + +Run a validation script that fetches a representative sample and +checks every variable: + +```python +"""Variable validation for <SourceName>. + +Checks every lexicon variable against real data to confirm it +returns valid observations. Variables with very low valid-data +rates should be removed from the lexicon. +""" +from datetime import datetime + +from earth2studio.data import SourceName +from earth2studio.lexicon import SourceNameLexicon + +ds = SourceName(cache=True) +time = ... # pick a recent valid time +all_vars = list(SourceNameLexicon.VOCAB.keys()) +df = ds(time, all_vars) + +print(f"{'Variable':<16} {'Obs Count':>10} {'Valid %':>8} {'Min':>10} {'Max':>10}") +print("-" * 60) +for var in sorted(all_vars): + sub = df[df["variable"] == var] + n_total = len(sub) + n_valid = sub["observation"].notna().sum() + pct = (n_valid / n_total * 100) if n_total > 0 else 0 + vmin = sub["observation"].min() if n_valid > 0 else float("nan") + vmax = sub["observation"].max() if n_valid > 0 else float("nan") + flag = " *** REMOVE" if pct < 10 else "" + print(f"{var:<16} {n_total:>10} {pct:>7.1f}% {vmin:>10.2f} {vmax:>10.2f}{flag}") +``` + +**Action required:** + +- Variables with **< 10% valid data** must be removed from: + 1. The lexicon class `VOCAB` dict + 2. `E2STUDIO_VOCAB` in `earth2studio/lexicon/base.py` + 3. The class docstring (note the removal and reason) +- Variables with **10-50% valid data** should be kept but documented + with a note explaining the low coverage (e.g., day/night switching, + quality filtering) +- After removing variables, re-run `make lint` and tests + +### 2c. Summarize variables and time range to user + +Before generating sanity-check plots, **present a summary table** to +the user covering: + +1. **All valid variables** — name, description, observation count, + value range +2. **Removed variables** — name, reason for removal (e.g., "> 97% + missing data") +3. **Valid time range** — earliest and latest data available from the + remote store +4. **Typical data density** — observations per orbit/file, + approximate global coverage cadence + +Example summary format: + +> **Variable Summary for MetOpAMSUA (14 channels):** +> +> | Variable | Freq (GHz) | Layer | Obs/orbit | BT Range (K) | +> |----------|-----------|-------|-----------|---------------| +> | amsua01 | 23.8 | Surface | 22,950 | 142–291 | +> | amsua02 | 31.4 | Surface | 22,950 | 145–290 | +> | ... | | | | | +> +> **Removed:** `amsua15` (89.0 GHz) — L1B product marks ~97% of +> measurements as missing due to quality filtering. +> +> **Time range:** 2007-10-22 (Metop-A launch) to present. +> Metop-B (2012-09-17 to present), Metop-C (2018-11-07 to present). +> +> **Data density:** ~767 scan lines × 30 FOVs = 23,010 obs per orbit, +> ~14 orbits/day, global coverage twice daily. + +This summary helps the user understand what they are getting from the +data source and verify it matches their expectations. + +### 2d. Create sanity-check plot script + +Create a **standalone Python script** in the repo root. This is for +PR reviewer reference only and should **NOT** be committed to the +repo. See 2a above for templates. + +### 2e. Run the script + +Execute the script and **verify the output images exist and look +reasonable**: + +```bash +python sanity_check_<source_name>.py +``` + +Check that: + +- The script runs without errors +- Output PNGs are generated +- Data points appear in expected geographic regions +- Values are in physically reasonable ranges (e.g., temperatures + 200–320 K, not 0 or NaN) +- For DataFrame sources: observations form recognizable swath or + station patterns + +### 2f. **[CONFIRM — Sanity-Check Plots]** + +**You MUST ask the user to visually inspect the generated plot(s) +before proceeding.** Do not skip this step even if the script ran +without errors — a successful run does not guarantee the plots are +correct (e.g., empty axes, wrong colorbar range, garbled data). + +Tell the user the absolute path to the generated image file(s) and +ask them to open and inspect the output: + +> The sanity-check script ran successfully and saved the following +> plot: +> +> `/absolute/path/to/sanity_check_<source_name>.png` +> +> **Please open this image and confirm it looks correct.** Check: +> +> 1. Data points are visible on the axes (not blank/empty) +> 2. Geographic coverage matches expectations (global swaths, +> regional stations, etc.) +> 3. Colorbar values are in physically reasonable ranges +> 4. No obvious artifacts (all-NaN regions, garbled coordinates) +> +> Does the plot look correct? + +**Do not proceed to Step 3 until the user explicitly confirms the +plots look correct.** If the user reports problems (empty plots, +wrong ranges, missing coverage), debug and fix the issue, re-run +the script, and ask the user to inspect again. + +The output images will be uploaded to the PR description in +Step 3. + +--- + +## Step 3 — Branch, Commit & Open PR + +### **[CONFIRM — Ready to Submit]** + +Before proceeding, confirm with the user: + + > All implementation steps are complete: + > + > - Data source class implemented with correct method ordering + > - Lexicon class created with variable mappings + > - **All lexicon variables validated against real data** (low-validity + > variables removed) + > - **Variable summary and time range presented to user** + > - E2STUDIO_VOCAB / E2STUDIO_SCHEMA updated (if needed) + > - Registered in `__init__.py` files + > - Documentation RST files updated with badges + > - Reference URLs included in docstrings + > - CHANGELOG.md updated + > - Format, lint, and license checks pass + > - Unit tests written and passing + > - Dependencies in `data` extras group confirmed + > - Sanity-check plots generated and confirmed by user + > + > Ready to create a branch, commit, and prepare a PR? + +### 3a. Create branch and commit + +```bash +git checkout -b feat/data-source-<name> +git add earth2studio/data/<filename>.py \ + earth2studio/data/__init__.py \ + earth2studio/data/utils.py \ + earth2studio/lexicon/<filename>.py \ + earth2studio/lexicon/__init__.py \ + earth2studio/lexicon/base.py \ + test/data/test_<filename>.py \ + docs/modules/datasources_*.rst \ + pyproject.toml \ + CHANGELOG.md +git commit -m "feat: add <SourceName> data source + +Add <SourceName> <source_type> for <brief description>. +Includes lexicon, unit tests, and documentation." +``` + +Do **NOT** add the sanity-check script or its output image. + +### 3b. Identify the fork remote and push branch + +The working repository is typically a **fork** of +`NVIDIA/earth2studio`. Before pushing, confirm which git remote +points to the user's fork: + +```bash +git remote -v +``` + +Ask the user: + +> Which git remote is your fork of `NVIDIA/earth2studio`? +> (Usually `origin` — e.g., `git@github.com:<user>/earth2studio.git`) + +Then push the feature branch to the **fork** remote: + +```bash +git push -u <fork-remote> feat/data-source-<name> +``` + +### 3c. Open Pull Request (fork → NVIDIA/earth2studio) + +> **Important:** PRs must be opened **from the fork** to the +> **upstream source repository** `NVIDIA/earth2studio`. The branch +> lives on the fork; the PR targets `main` (or the appropriate +> base branch) on the upstream repo. + +Use `gh pr create` with explicit `--repo` and `--head` flags: + +```bash +gh pr create \ + --repo NVIDIA/earth2studio \ + --base main \ + --head <fork-owner>:feat/data-source-<name> \ + --title "feat: add <SourceName> data source" \ + --body "..." +``` + +Where `<fork-owner>` is the GitHub username that owns the fork +(e.g., `NickGeneva`). + +The PR should follow the repo's template and +include the data licensing section, the sanity-check script inside +a `<details>` block, and the sanity-check images uploaded to the +PR body: + +````markdown +## Description + +Add `<ClassName>` <source_type> for <brief description of what +the data source provides>. + +Closes #<issue_number> (if applicable) + +### Data source details + +| Property | Value | +|---|---| +| **Source type** | DataSource / ForecastSource / DataFrameSource / ForecastFrameSource | +| **Remote store** | <URL to data documentation> | +| **Format** | GRIB2 / NetCDF / Zarr / CSV / etc. | +| **Spatial resolution** | X deg x Y deg (NxM grid) | +| **Temporal resolution** | Hourly / 6-hourly / daily / etc. | +| **Date range** | YYYY-MM-DD to present | +| **Region** | Global / Regional | +| **Authentication** | Anonymous / API key / etc. | + +### Data licensing + +> **License**: <Name of the data license> +> **URL**: <Link to the license terms> +> +> <Brief summary of key restrictions or permissions, e.g., +> "Open data, freely available for commercial and +> non-commercial use" or "Requires attribution, non-commercial +> use only"> + +### Dependencies added + +| Package | Version | License | License URL | Reason | +|---|---|---|---|---| +| `<package>` | `>=X.Y` | <License name> | [link](<URL to license>) | <brief reason> | + +*(or "No new dependencies needed")* + +When filling this table, look up each new dependency's license: + +1. Check the package's PyPI page (`https://pypi.org/project/<package>/`) + or its repository for the license type +2. Link directly to the license file (e.g., GitHub raw LICENSE or + PyPI license classifier URL) +3. Flag any **non-permissive licenses** (GPL, AGPL, SSPL) — these + may be incompatible with the project's Apache-2.0 license and + require team review before merging + +### Validation + +See sanity-check validation in PR comments below (posted in Step 3d). + +## Checklist + +- [x] I am familiar with the [Contributing Guidelines][contrib]. +- [x] New or existing tests cover these changes. +- [x] The documentation is up to date with these changes. +- [x] The [CHANGELOG.md][changelog] is up to date with these changes. +- [ ] An [issue][issues] is linked to this pull request. +- [ ] Assess and address Greptile feedback (AI code review bot). + +[contrib]: https://github.com/NVIDIA/earth2studio/blob/main/CONTRIBUTING.md +[changelog]: https://github.com/NVIDIA/earth2studio/blob/main/CHANGELOG.md +[issues]: https://github.com/NVIDIA/earth2studio/issues + +## Dependencies + +<List any new packages added to pyproject.toml, or "None"> +```` + +### 3d. Post sanity-check plot as PR comment + +After the PR is created, post the sanity-check visualization as a +separate **PR comment** so it is immediately visible to reviewers +without expanding a `<details>` block. This serves as quick visual +evidence that the data source produces correct output. + +#### Image upload limitation + +**GitHub has no CLI or REST API for uploading images to PR comments.** +The uploads endpoint (`uploads.github.com`) and GraphQL mutations do +not support attaching local image files to issue/PR comments. The +only way to embed an image is via the browser's drag-and-drop editor +or by referencing an already-hosted URL. + +**Practical workflow:** + +1. Post the PR comment **without** the image — include the + validation table, the full sanity-check script, and a placeholder + line: `> **TODO:** Attach sanity-check image by editing this + comment in the browser.` +2. Tell the user: *"The image is at `<local_path>`. Edit the PR + comment in your browser and drag the file into the editor to + embed it."* +3. Use `gh api -X PATCH` to update the comment body (via + `-F body=@/tmp/comment.md`) if the text needs revision later. + +Do **not** waste time trying `curl` uploads, GraphQL file mutations, +or the `uploads.github.com` asset endpoint — they do not work for +issue/PR comment images. + +#### Writing the comment body + +Write the comment body to a temp file first, then post it. This +avoids shell quoting issues with heredocs containing backticks and +markdown: + +```bash +# 1. Write body to a temp file (use your editor tool, not heredoc) +# Include: summary table, key findings, script in <details> + +# 2. Post the comment +gh api -X POST repos/NVIDIA/earth2studio/issues/<PR_NUMBER>/comments \ + -F "body=@/tmp/pr_comment_body.md" \ + --jq '.html_url' +``` + +#### Comment content template + +Adapt to the source type. Include only relevant rows — omit +satellite-specific rows for gridded sources, omit grid dimensions +for DataFrame sources, etc. + +```markdown +## Sanity-Check Validation + +**Source:** `<ClassName>` — <brief description> +**Time:** YYYY-MM-DD HH:MM UTC +**Parameters:** <source-specific context, e.g., tolerance, satellite, product> + +| Metric | Value | +|--------|-------| +| Output shape / rows | <shape for DataArray, row count for DataFrame> | +| Variables | <list or count> | +| Value range | <min>–<max> <unit> | +| Lat range | <min>–<max>° | +| Lon range | <min>–<max>° | +| Missing / NaN | <count or 0> | + +*Add source-specific rows as needed (channels, satellites, grid +dimensions, lead times, etc.)* + +**Key findings:** +- <bullet summarizing physical reasonableness> +- <bullet on spatial pattern / coverage> +- <bullet on comparison with reference source, if applicable> + +> **TODO:** Attach sanity-check image by editing this comment in +> the browser. + +<details> +<summary>Sanity-check script (click to expand)</summary> + +PASTE THE FULL WORKING SCRIPT HERE — not a truncated excerpt. +The script must be copy-pasteable and produce the plot end-to-end. + +</details> +``` + +**Important:** Always paste the **complete, runnable** script — not +a shortened version. Reviewers should be able to reproduce the plot +by copying the script directly. + +#### Comparison sources + +If a **comparison data source** exists for the same physical +quantity (e.g., UFSObsSat for satellite BT, ERA5 for reanalysis +fields), include a side-by-side comparison in the same comment. +Briefly summarize expected differences: + +- Raw vs QC-subsetted data (different observation counts) +- Different spatial coverage or resolution +- Different time windows or update cadence +- Unit or coordinate convention differences + +#### Finalize + +After posting, inform the user of: + +1. The comment URL +2. The local path to the image file for manual attachment +3. Instructions: *"Edit the comment in your browser and drag the + image file into the editor to embed it."* + +> **Note:** The sanity-check image and script are for PR review +> purposes only — they must NOT be committed to the repository. + +--- + +## Step 4 — Automated Code Review (Greptile) + +After the PR is created and pushed, an automated code review from +**greptile-apps** (Greptile) will be posted as PR review comments. +Wait for this review, then process the feedback. + +### 4a. Wait for Greptile review + +Poll for review comments from `greptile-apps[bot]` every 30 seconds +for up to **5 minutes**. Time out gracefully if no review arrives: + +```bash +# Poll loop — check every 30s, timeout after 5 minutes (10 attempts) +for i in $(seq 1 10); do + REVIEW_ID=$(gh api repos/NVIDIA/earth2studio/pulls/<PR_NUMBER>/reviews \ + --jq '.[] | select(.user.login == "greptile-apps[bot]") | .id' 2>/dev/null) + if [ -n "$REVIEW_ID" ]; then + echo "Greptile review found: $REVIEW_ID" + break + fi + echo "Attempt $i/10 — no review yet, waiting 30s..." + sleep 30 +done +``` + +If no review after 5 minutes, inform the user: + +> Greptile hasn't posted a review after 5 minutes. This can happen if +> the review bot is busy or the PR hasn't triggered it. You can: +> +> 1. Ask me to check again later +> 2. Skip this step and proceed without automated review +> 3. Manually request a review from Greptile on the PR page + +### 4b. Pull and parse review comments + +Once the review is posted, fetch all comments: + +```bash +# Get all review comments on the PR +gh api repos/NVIDIA/earth2studio/pulls/<PR_NUMBER>/comments \ + --jq '.[] | select(.user.login == "greptile-apps[bot]") | + {path: .path, line: .diff_hunk, body: .body}' +``` + +Also fetch the top-level review body: + +```bash +gh api repos/NVIDIA/earth2studio/pulls/<PR_NUMBER>/reviews \ + --jq '.[] | select(.user.login == "greptile-apps[bot]") | .body' +``` + +### 4c. Categorize and present to user + +Parse each comment and categorize it: + +| Category | Description | Default action | +|---|---|---| +| **Bug / correctness** | Logic errors, wrong behavior | Fix | +| **Style / convention** | Naming, formatting, patterns | Fix if valid | +| **Performance** | Inefficiency, resource waste | Evaluate | +| **Documentation** | Missing/wrong docs, docstrings | Fix | +| **Suggestion** | Alternative approach, nice-to-have | User decides | +| **False positive** | Incorrect or irrelevant feedback | Dismiss | + +### **[CONFIRM — Review Triage]** + +Present each comment to the user in a summary table: + +```markdown +| # | File | Line | Category | Summary | Proposed Action | +|---|------|------|----------|---------|-----------------| +| 1 | metop_amsua.py | 142 | Bug | Missing null check | Fix: add guard | +| 2 | metop_avhrr.py | 305 | Style | Use f-string | Fix: convert | +| 3 | metop.py | 45 | Suggestion | Add type alias | Skip: not needed | +| ... | ... | ... | ... | ... | ... | +``` + +For each comment, briefly explain: + +- What Greptile flagged +- Whether you agree or disagree (with reasoning) +- Your proposed fix (or why to skip) + +Ask the user to confirm which comments to address. The user may: + +- Accept all proposed fixes +- Select specific fixes +- Override your recommendation on any comment +- Add their own fixes + +### 4d. Implement fixes + +For each accepted fix: + +1. Make the code change +2. Run `make format && make lint` after all fixes +3. Run the relevant tests +4. Commit with a message like: + `fix: address code review feedback (Greptile)` + +### 4e. Respond to review comments + +For each Greptile comment, post a reply on the PR: + +**For fixed comments:** + +```bash +gh api repos/NVIDIA/earth2studio/pulls/<PR_NUMBER>/comments/<COMMENT_ID>/replies \ + -f body="Fixed in <commit_sha>. <brief description of fix>" +``` + +**For dismissed comments (false positives / won't fix):** + +```bash +gh api repos/NVIDIA/earth2studio/pulls/<PR_NUMBER>/comments/<COMMENT_ID>/replies \ + -f body="Won't fix — <brief justification>" +``` + +### 4f. Push and resolve + +```bash +git push origin <branch> +``` + +After pushing, resolve all addressed review threads if possible. + +Inform the user of the final state: + +- How many comments were fixed +- How many were dismissed (with reasons) +- Any remaining open threads + +--- + +## Reminders + +- **DO** use the repos local `uv` `.venv` to run python with `uv run python` +- **DO NOT** commit the sanity-check script or image to the repo +- **DO** use `loguru.logger` for logging, never `print()`, inside + `earth2studio/` +- **DO** ensure all public functions have full type hints (mypy-clean) +- **DO** maintain alphabetical order in `__init__.py` exports, + RST file entries, and CHANGELOG entries +- **DO** follow the canonical method ordering within the class +- **DO** use the async task dataclass pattern for parallel fetching +- **DO** use `prep_data_inputs` / `prep_forecast_inputs` to normalize + inputs +- **DO** use `nest_asyncio.apply()` in `__init__` for notebook compat +- **DO** use `datasource_cache_root()` for cache paths +- **DO** delete temporary cache when `cache=False` +- **DO** add all new dependencies to the `data` extras group in + `pyproject.toml` using `uv add --extra data <package>` +- **DO** include reference URLs in class docstrings and lexicon docs +- **DO** always update CHANGELOG.md under the current unreleased version +- **DO** use async utilities from `earth2studio/data/utils.py`: + `managed_session`, `gather_with_concurrency`, `async_retry` +- **DO** use pure async I/O (`fs._cat_file()`, async zarr, etc.) +- **PREFER** fsspec-compatible filesystems (s3fs, gcsfs, adlfs, etc.) + over dedicated client libraries +- **AVOID** `asyncio.to_thread` — pure async is ALWAYS preferred. + Only use `cancellable_to_thread` as a last resort when no async + alternative exists. +- **AVOID** bare `tqdm.gather(*tasks)` — use `gather_with_concurrency` +- **AVOID** using xarray for loading data — prefer direct file I/O +- **AVOID** downloading full files — use byte-range / slicing +- **AVOID** over-complicating constructor parameters +- **NEVER** call `loop.set_default_executor()` — mutates global state +- **NEVER** commit, hardcode, or include API keys, secrets, tokens, + or credentials in source code, sample scripts, commit messages, + PR descriptions, or any file tracked by git. Always read + credentials from environment variables at runtime. If the user + provides test credentials during the session, use them only in + ephemeral shell commands — never persist them. diff --git a/CHANGELOG.md b/CHANGELOG.md index 02635595d..72cb489b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added AMSU-A (channels 1–14) and AVHRR channel variables to `E2STUDIO_VOCAB` - Added JPSS ATMS Level 1 BUFR brightness-temperature data source (`JPSS_ATMS`) - Added `JPSSATMSLexicon` for ATMS variable mappings +- Added MetOp IASI Level 1C infrared brightness temperature data source (`MetOpIASI`) +- Added `MetOpIASILexicon` for IASI variable mappings (8461 spectral channels) ### Changed diff --git a/CLAUDE.md b/CLAUDE.md index c66084066..657c36c5b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -31,6 +31,17 @@ file(s) before writing or reviewing code: - **Formatting** — black; run `/format` or `make format`. - **Linting** — ruff + mypy; run `/lint` or `make lint` before opening a PR. +## Skills + +Custom skills live in `.claude/skills/`. Invoke the relevant skill before starting +a matching task: + +| Skill | Purpose | +|---|---| +| `create-data-source` | Create a new data source wrapper | +| `create-prognostic-wrapper` | Create a new prognostic model (px) wrapper | +| `validate-data-source` | Validate a newly implemented data source | + ## Custom Commands Use these slash commands (defined in `.claude/commands/`) to run common tasks: diff --git a/docs/modules/datasources_dataframe.rst b/docs/modules/datasources_dataframe.rst index 7630255d2..1e6b269df 100644 --- a/docs/modules/datasources_dataframe.rst +++ b/docs/modules/datasources_dataframe.rst @@ -24,6 +24,7 @@ Data sources that provide tabular data as DataFrames. data.JPSS_ATMS data.MetOpAMSUA data.MetOpAVHRR + data.MetOpIASI data.MetOpMHS data.RandomDataFrame data.UFSObsConv diff --git a/earth2studio/data/__init__.py b/earth2studio/data/__init__.py index 76adbf64e..be5e68d68 100644 --- a/earth2studio/data/__init__.py +++ b/earth2studio/data/__init__.py @@ -32,6 +32,7 @@ from .jpss_atms import JPSS_ATMS from .metop_amsua import MetOpAMSUA from .metop_avhrr import MetOpAVHRR +from .metop_iasi import MetOpIASI from .metop_mhs import MetOpMHS from .mrms import MRMS from .ncar import NCAR_ERA5 diff --git a/earth2studio/data/metop_iasi.py b/earth2studio/data/metop_iasi.py new file mode 100644 index 000000000..d8f9727bb --- /dev/null +++ b/earth2studio/data/metop_iasi.py @@ -0,0 +1,1182 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import asyncio +import os +import pathlib +import shutil +import struct +import uuid +from datetime import datetime, timedelta + +import nest_asyncio +import numpy as np +import pandas as pd +import pyarrow as pa +from loguru import logger + +from earth2studio.data.utils import datasource_cache_root, prep_data_inputs +from earth2studio.lexicon import MetOpIASILexicon +from earth2studio.lexicon.base import E2STUDIO_SCHEMA +from earth2studio.utils.imports import ( + OptionalDependencyFailure, + check_optional_dependencies, +) +from earth2studio.utils.time import normalize_time_tolerance +from earth2studio.utils.type import TimeArray, TimeTolerance, VariableArray + +try: + import eumdac +except ImportError: + OptionalDependencyFailure("data") + eumdac = None # type: ignore[assignment] + +# --------------------------------------------------------------------------- +# IASI L1C EPS native binary format constants +# --------------------------------------------------------------------------- +_GRH_SIZE = 20 # Generic Record Header +_MDR_RECORD_CLASS = 8 +_MDR_INSTRUMENT_GROUP = 8 # IASI instrument group +_MDR_RECORD_SUBCLASS = 2 # Earth-view MDR +_MPHR_RECORD_CLASS = 1 +_GIADR_RECORD_CLASS = 5 +_GIADR_SF_SUBCLASS = 1 # Scale Factors subclass + +_NUM_EFOVS = 30 # EFOVs per scan line +_NUM_IFOVS = 4 # IFOVs per EFOV (2×2) +_NUM_CHANNELS_ALLOC = 8700 # Allocated spectral samples per IFOV +_NUM_CHANNELS = 8461 # Valid spectral channels (645–2760 cm⁻¹) + +# Planck constants for radiance → brightness temperature conversion +# C1 in mW/(m²·sr·cm⁻⁴), C2 in cm·K +_C1 = 1.191042e-05 +_C2 = 1.4387752 + +# EPS epoch: 2000-01-01 00:00:00 UTC +_EPS_EPOCH = datetime(2000, 1, 1) + +# Spacecraft ID mapping (same as AMSU-A/MHS) +_SPACECRAFT_MAP = { + "1": "metop-b", + "2": "metop-a", + "3": "metop-c", + "4": "metop-sga1", + "M01": "metop-b", + "M02": "metop-a", + "M03": "metop-c", +} + +# Reverse map: user-facing lowercase → eumdac API display name +_EUMDAC_SAT_NAME = { + "metop-a": "Metop-A", + "metop-b": "Metop-B", + "metop-c": "Metop-C", + "metop-sga1": "Metop-SGA1", +} + + +# --------------------------------------------------------------------------- +# MDR field layout — computed offsets using known field sizes +# --------------------------------------------------------------------------- +# The IASI MDR has variable-size header fields at the start, making +# hardcoded absolute offsets unreliable across format versions. Instead, +# we compute offsets relative to the MDR payload start by scanning known +# field blocks sequentially. +# +# Strategy: We use the field sizes from the EPS PFS specification to build +# a cumulative offset table. Fields are listed in their MDR order. + + +def _decode_eps_datetime(data: bytes, offset: int) -> datetime: + """Decode a 6-byte EPS datetime to Python datetime. + + EPS datetime format: 2 bytes uint16 BE (days since 2000-01-01) + + 4 bytes uint32 BE (milliseconds of day). + + Parameters + ---------- + data : bytes + Raw data buffer + offset : int + Byte offset to start of the 6-byte field + + Returns + ------- + datetime + Decoded datetime (naive UTC) + """ + day = struct.unpack_from(">H", data, offset)[0] + ms = struct.unpack_from(">I", data, offset + 2)[0] + return _EPS_EPOCH + timedelta(days=day, milliseconds=ms) + + +def _radiance_to_bt(radiance: np.ndarray, wavenumber_cm: np.ndarray) -> np.ndarray: + """Convert calibrated radiance to brightness temperature. + + Uses the inverse Planck function (no band correction for IASI): + T_B = C2 * ν / ln(1 + C1 * ν³ / L) + + Parameters + ---------- + radiance : np.ndarray + Calibrated radiance in mW/(m²·sr·cm⁻¹). Shape (n_obs, n_channels). + wavenumber_cm : np.ndarray + Wavenumbers in cm⁻¹. Shape (n_channels,). + + Returns + ------- + np.ndarray + Brightness temperature in Kelvin, same shape as radiance. + """ + nu = wavenumber_cm[np.newaxis, :] # (1, n_ch) + valid = radiance > 0 + bt = np.full_like(radiance, np.nan, dtype=np.float64) + r = radiance[valid] + # Use broadcast: nu values need to match the valid mask + nu_bc = np.broadcast_to(nu, radiance.shape)[valid] + bt[valid] = _C2 * nu_bc / np.log(1.0 + _C1 * nu_bc**3 / r) + return bt + + +def _parse_mphr(data: bytes) -> dict[str, str]: + """Parse the Main Product Header Record (ASCII key=value pairs). + + Parameters + ---------- + data : bytes + Raw MPHR record bytes (including GRH) + + Returns + ------- + dict[str, str] + Key-value pairs from the header + """ + text = data[_GRH_SIZE:].decode("ascii", errors="replace") + result: dict[str, str] = {} + for line in text.split("\n"): + line = line.strip() + if "=" in line: + key, _, val = line.partition("=") + result[key.strip()] = val.strip() + return result + + +def _parse_grh(data: bytes, offset: int = 0) -> tuple[int, int, int, int]: + """Parse a Generic Record Header at the given offset. + + Returns + ------- + tuple + (record_class, instrument_group, record_subclass, record_size) + """ + record_class = data[offset] + instrument_group = data[offset + 1] + record_subclass = data[offset + 2] + record_size = struct.unpack_from(">I", data, offset + 4)[0] + return record_class, instrument_group, record_subclass, record_size + + +def _parse_sensing_time(mphr: dict[str, str]) -> tuple[datetime, datetime]: + """Extract sensing start/end times from MPHR. + + Parameters + ---------- + mphr : dict[str, str] + Parsed MPHR key-value pairs + + Returns + ------- + tuple[datetime, datetime] + (sensing_start, sensing_end) as naive UTC datetimes + """ + fmt = "%Y%m%d%H%M%S" + start_str = mphr.get("SENSING_START", "") + end_str = mphr.get("SENSING_END", "") + start = datetime.strptime(start_str[:14], fmt) + end = datetime.strptime(end_str[:14], fmt) + return start, end + + +def _parse_giadr_scale_factors( + data: bytes, offset: int +) -> tuple[np.ndarray, np.ndarray, np.ndarray, int]: + """Parse GIADR Scale Factors record for spectral radiance conversion. + + Extracts per-band scale factor information needed to convert int16 + spectral radiance values to physical units. The band_first/band_last + values are absolute spectral sample numbers (not 1-based IASI channel + numbers). The scale factor values are positive integers representing + the negative power of 10: physical = raw × 10^(-sf). + + Parameters + ---------- + data : bytes + Raw file data + offset : int + Byte offset to start of GIADR record (including GRH) + + Returns + ------- + tuple + (band_first_ch, band_last_ch, band_sf, nb_bands) where: + - band_first_ch: Absolute first sample number per band (shape nb_bands) + - band_last_ch: Absolute last sample number per band (shape nb_bands) + - band_sf: Positive scale factor per band (use as negative exponent) + - nb_bands: Number of scale factor bands + """ + # After GRH (20 bytes): + # IDefScaleSondNbScale: int16 (1) + # IDefScaleSondNsfirst: int16 (10) + # IDefScaleSondNslast: int16 (10) + # IDefScaleSondScaleFactor: int16 (10) + payload = offset + _GRH_SIZE + nb_bands = struct.unpack_from(">h", data, payload)[0] + + first_off = payload + 2 + first_ch = np.array(struct.unpack_from(f">{10}h", data, first_off), dtype=np.int32) + + last_off = first_off + 20 + last_ch = np.array(struct.unpack_from(f">{10}h", data, last_off), dtype=np.int32) + + sf_off = last_off + 20 + sf = np.array(struct.unpack_from(f">{10}h", data, sf_off), dtype=np.int32) + + return first_ch[:nb_bands], last_ch[:nb_bands], sf[:nb_bands], nb_bands + + +def _build_channel_scale_array( + band_first: np.ndarray, + band_last: np.ndarray, + band_sf: np.ndarray, + nsfirst: int = 2581, +) -> np.ndarray: + """Build per-channel scale factor array from per-band GIADR data. + + The GIADR band_first/band_last values are absolute sample numbers + (starting from nsfirst=2581 for standard IASI). These are converted + to 0-based IASI channel indices by subtracting nsfirst. + + The scale factor values are positive integers representing the + negative power of 10: physical = raw × 10^(-sf). + + Parameters + ---------- + band_first : np.ndarray + Absolute first sample number per band (from GIADR) + band_last : np.ndarray + Absolute last sample number per band (from GIADR) + band_sf : np.ndarray + Scale factor per band (positive integer; used as negative exponent) + nsfirst : int, optional + First sample number from IDefNsfirst1b, by default 2581 + + Returns + ------- + np.ndarray + Scale multiplier for each of the 8461 channels. Shape (8461,). + """ + scales = np.ones(_NUM_CHANNELS, dtype=np.float64) + for i in range(len(band_first)): + # Convert absolute sample numbers to 0-based channel indices + ch_start = int(band_first[i]) - nsfirst + ch_end = int(band_last[i]) - nsfirst + 1 # Inclusive → exclusive + ch_start = max(ch_start, 0) + ch_end = min(ch_end, _NUM_CHANNELS) + if ch_start < ch_end: + # Scale factor is positive int used as NEGATIVE exponent + scales[ch_start:ch_end] = 10.0 ** (-float(band_sf[i])) + return scales + + +def _compute_mdr_field_offsets(data: bytes, mdr_offset: int) -> dict[str, int]: + """Compute byte offsets for key fields within an IASI MDR. + + Because the IASI MDR has variable-size header fields, we walk the + known field structure sequentially to compute absolute byte offsets + for the fields we need. + + References + ---------- + https://github.com/stcorp/codadef-eps/blob/master/types/IASI_MDR_L1C_v5.xml + + Parameters + ---------- + data : bytes + Raw file data + mdr_offset : int + Byte offset to start of this MDR record (including GRH) + + Returns + ------- + dict[str, int] + Mapping of field name to absolute byte offset in data + """ + # Parse record size from GRH to validate + _, _, _, rec_size = _parse_grh(data, mdr_offset) + + # Start after GRH + pos = mdr_offset + _GRH_SIZE + + # DEGRADED_INST_MDR: uint8 (1 byte) + pos += 1 + # DEGRADED_PROC_MDR: uint8 (1 byte) + pos += 1 + + # GEPSIasiMode: 4 bytes (uint8 + 3-byte bitfield) + pos += 4 + # GEPSOPSProcessingMode: 4 bytes + pos += 4 + # GEPSIdConf: 32 bytes (instrument configuration) + pos += 32 + + # GEPSLocIasiAvhrr_IASI: VSFInt[30][4][2] = 30*4*2*5 = 1200 bytes + pos += 1200 + # GEPSLocIasiAvhrr_IIS: VSFInt[30][25][2] = 30*25*2*5 = 7500 bytes + pos += 7500 + + # OBT: raw48[30] = 30*6 = 180 bytes + pos += 180 + # OnboardUTC: EPSdatetime[30] = 30*6 = 180 bytes + pos += 180 + + # GEPSDatIasi: EPSdatetime[30] = 30*6 = 180 bytes + offsets: dict[str, int] = {} + offsets["GEPSDatIasi"] = pos + pos += 180 + + # GIsfLinOrigin: int32[2] = 8 bytes + pos += 8 + # GIsfColOrigin: int32[2] = 8 bytes + pos += 8 + # GIsfPds1: int32[2] = 8 bytes + pos += 8 + # GIsfPds2: int32[2] = 8 bytes + pos += 8 + # GIsfPds3: int32[2] = 8 bytes + pos += 8 + # GIsfPds4: int32[2] = 8 bytes + pos += 8 + + # GEPS_CCD: uint8[30] = 30 bytes + pos += 30 + # GEPS_SP: int32[30] = 120 bytes + pos += 120 + + # GIrcImage: uint16[30][64][64] = 30*64*64*2 = 245760 bytes + pos += 245760 + + # Quality flags section + # GQisFlagQual: uint8[30][4][3] = 360 bytes + pos += 360 + # GQisFlagQualDetailed: uint16[30][4] = 240 bytes + offsets["GQisFlagQualDetailed"] = pos + pos += 240 + + # GQisQualIndex: VSFInt = 5 bytes + pos += 5 + # GQisQualIndexIIS: VSFInt = 5 bytes + pos += 5 + # GQisQualIndexLoc: VSFInt = 5 bytes + pos += 5 + # GQisQualIndexRad: VSFInt = 5 bytes + pos += 5 + # GQisQualIndexSpect: VSFInt = 5 bytes + pos += 5 + # GQisSysTecIISQual: uint32 = 4 bytes + pos += 4 + # GQisSysTecSondQual: uint32 = 4 bytes + pos += 4 + + # Geolocation section + # GGeoSondLoc: int32[30][4][2] = 960 bytes + offsets["GGeoSondLoc"] = pos + pos += 960 + # GGeoSondAnglesMETOP: int32[30][4][2] = 960 bytes + offsets["GGeoSondAnglesMETOP"] = pos + pos += 960 + # GGeoIISAnglesMETOP: int32[30][25][2] = 6000 bytes + pos += 6000 + # GGeoSondAnglesSUN: int32[30][4][2] = 960 bytes + offsets["GGeoSondAnglesSUN"] = pos + pos += 960 + # GGeoIISAnglesSUN: int32[30][25][2] = 6000 bytes + pos += 6000 + # GGeoIISLoc: int32[30][25][2] = 6000 bytes + pos += 6000 + # EARTH_SATELLITE_DISTANCE: uint32 = 4 bytes + pos += 4 + + # Spectral calibration fields + # IDefSpectDWn1b: VSFInt = 5 bytes + offsets["IDefSpectDWn1b"] = pos + pos += 5 + # IDefNsfirst1b: int32 = 4 bytes + offsets["IDefNsfirst1b"] = pos + pos += 4 + # IDefNslast1b: int32 = 4 bytes + pos += 4 + + # GS1cSpect: int16[30][4][8700] = 2,088,000 bytes + offsets["GS1cSpect"] = pos + + return offsets + + +def _parse_native_iasi( + data: bytes, + channel_indices: np.ndarray | None = None, +) -> pd.DataFrame: + """Parse an IASI Level 1C EPS native format file. + + Extracts MDR (Measurement Data Record) scan lines and converts + calibrated spectral radiances to brightness temperatures. + + Parameters + ---------- + data : bytes + Complete file contents of an IASI L1C .nat file + channel_indices : np.ndarray | None, optional + 0-based channel indices to extract (subset of 0..8460). + If None, all 8461 channels are extracted. + + Returns + ------- + pd.DataFrame + One row per (scan_line, EFOV, IFOV, channel) observation with + columns matching MetOpIASI.SCHEMA. + """ + file_size = len(data) + offset = 0 + + # Step 1: Parse MPHR (first record) + if file_size < _GRH_SIZE: + return pd.DataFrame() + + rc, _, _, rec_size = _parse_grh(data, 0) + if rc != _MPHR_RECORD_CLASS or rec_size > file_size: + logger.warning("First record is not MPHR (class={})", rc) + return pd.DataFrame() + + mphr = _parse_mphr(data[:rec_size]) + spacecraft_id = mphr.get("SPACECRAFT_ID", "") + satellite = _SPACECRAFT_MAP.get(spacecraft_id, f"metop-{spacecraft_id.lower()}") + + # Step 2: Scan all records — collect GIADR scale factors and MDR offsets + offset = 0 + mdr_offsets: list[int] = [] + giadr_sf_offset: int | None = None + + while offset + _GRH_SIZE <= file_size: + rc, ig, sc, rec_size = _parse_grh(data, offset) + if rec_size < _GRH_SIZE or offset + rec_size > file_size: + break + if ( + rc == _MDR_RECORD_CLASS + and ig == _MDR_INSTRUMENT_GROUP + and sc == _MDR_RECORD_SUBCLASS + ): + mdr_offsets.append(offset) + elif rc == _GIADR_RECORD_CLASS and sc == _GIADR_SF_SUBCLASS: + giadr_sf_offset = offset + offset += rec_size + + n_scans = len(mdr_offsets) + if n_scans == 0: + logger.warning("No IASI MDR records found in file") + return pd.DataFrame() + + if giadr_sf_offset is None: + logger.warning("No GIADR Scale Factors record found — cannot decode radiances") + return pd.DataFrame() + + # Step 3: Parse GIADR scale factors + band_first, band_last, band_sf, _ = _parse_giadr_scale_factors( + data, giadr_sf_offset + ) + + # Read nsfirst from the first MDR to map GIADR absolute sample numbers + # to 0-based IASI channel indices + first_mdr_offsets = _compute_mdr_field_offsets(data, mdr_offsets[0]) + nsfirst = struct.unpack_from(">i", data, first_mdr_offsets["IDefNsfirst1b"])[0] + + channel_scales = _build_channel_scale_array( + band_first, band_last, band_sf, nsfirst=nsfirst + ) + + # Determine channels to extract + if channel_indices is None: + ch_idx = np.arange(_NUM_CHANNELS, dtype=np.int32) + else: + ch_idx = np.asarray(channel_indices, dtype=np.int32) + n_ch = len(ch_idx) + + # Step 4: Pre-allocate arrays for all scans × EFOVs × IFOVs + n_obs_per_scan = _NUM_EFOVS * _NUM_IFOVS # 30 × 4 = 120 + n_obs = n_scans * n_obs_per_scan + + lats = np.empty(n_obs, dtype=np.float32) + lons = np.empty(n_obs, dtype=np.float32) + solar_za = np.empty(n_obs, dtype=np.float32) + solar_azi = np.empty(n_obs, dtype=np.float32) + sat_za = np.empty(n_obs, dtype=np.float32) + sat_azi = np.empty(n_obs, dtype=np.float32) + quality = np.empty(n_obs, dtype=np.uint16) + scan_times = np.empty(n_obs, dtype="datetime64[ns]") + + # Radiances for selected channels: (n_obs, n_ch) + radiances = np.empty((n_obs, n_ch), dtype=np.float64) + + # We also need wavenumber info — extract from first MDR + wavenumber_cm: np.ndarray | None = None + + # Compute relative field offsets once from the first MDR and reuse + # (MDR layout is identical across all records in a conforming IASI L1C file) + rel_offsets = {k: v - mdr_offsets[0] for k, v in first_mdr_offsets.items()} + + for scan_idx, mdr_off in enumerate(mdr_offsets): + base = scan_idx * n_obs_per_scan + + # Reuse relative offsets — avoids recomputing per MDR + field_offsets = {k: mdr_off + rel for k, rel in rel_offsets.items()} + + # Extract wavenumber calibration from first MDR + if wavenumber_cm is None: + dwn_off = field_offsets["IDefSpectDWn1b"] + # VSFInt: 1 byte int8 scale, 4 byte int32 value + # Physical value = value × 10^(-scale) + dwn_scale = struct.unpack_from(">b", data, dwn_off)[0] + dwn_value = struct.unpack_from(">i", data, dwn_off + 1)[0] + dwn_m = dwn_value * (10.0 ** (-dwn_scale)) # in m⁻¹ (typically 25) + + nsfirst_off = field_offsets["IDefNsfirst1b"] + nsfirst = struct.unpack_from(">i", data, nsfirst_off)[0] + + # Wavenumber of channel n (0-based) = (nsfirst + n) * dwn in m⁻¹ + all_wn_m = (nsfirst + np.arange(_NUM_CHANNELS, dtype=np.float64)) * dwn_m + all_wn_cm = all_wn_m / 100.0 # Convert m⁻¹ to cm⁻¹ + wavenumber_cm = all_wn_cm[ch_idx] + + # GEPSDatIasi: EPSdatetime[30] — corrected UTC per EFOV + time_off = field_offsets["GEPSDatIasi"] + for efov in range(_NUM_EFOVS): + dt = _decode_eps_datetime(data, time_off + efov * 6) + dt_ns = np.datetime64(dt, "ns") + obs_base = base + efov * _NUM_IFOVS + scan_times[obs_base : obs_base + _NUM_IFOVS] = dt_ns + + # GGeoSondLoc: int32[30][4][2], SF=1e6 — [efov][ifov][lon=0,lat=1] + geo_off = field_offsets["GGeoSondLoc"] + raw_geo = np.frombuffer( + data, dtype=">i4", count=_NUM_EFOVS * _NUM_IFOVS * 2, offset=geo_off + ).reshape(_NUM_EFOVS, _NUM_IFOVS, 2) + raw_geo_f = raw_geo.astype(np.float64) / 1e6 + + # Flatten [30][4] → [120] and assign in one shot + flat_lats = raw_geo_f[:, :, 1].ravel().astype(np.float32) + flat_lons = raw_geo_f[:, :, 0].ravel() + # Convert longitude from [-180, 180] to [0, 360] + flat_lons = np.where(flat_lons < 0, flat_lons + 360.0, flat_lons).astype( + np.float32 + ) + lats[base : base + n_obs_per_scan] = flat_lats + lons[base : base + n_obs_per_scan] = flat_lons + + # GGeoSondAnglesMETOP: int32[30][4][2], SF=1e6 — [efov][ifov][zen=0,azi=1] + sat_ang_off = field_offsets["GGeoSondAnglesMETOP"] + raw_sat_ang = np.frombuffer( + data, + dtype=">i4", + count=_NUM_EFOVS * _NUM_IFOVS * 2, + offset=sat_ang_off, + ).reshape(_NUM_EFOVS, _NUM_IFOVS, 2) + raw_sat_ang_f = raw_sat_ang.astype(np.float64) / 1e6 + + sat_za[base : base + n_obs_per_scan] = ( + raw_sat_ang_f[:, :, 0].ravel().astype(np.float32) + ) + sat_azi[base : base + n_obs_per_scan] = ( + raw_sat_ang_f[:, :, 1].ravel().astype(np.float32) + ) + + # GGeoSondAnglesSUN: int32[30][4][2], SF=1e6 — [efov][ifov][zen=0,azi=1] + sun_ang_off = field_offsets["GGeoSondAnglesSUN"] + raw_sun_ang = np.frombuffer( + data, + dtype=">i4", + count=_NUM_EFOVS * _NUM_IFOVS * 2, + offset=sun_ang_off, + ).reshape(_NUM_EFOVS, _NUM_IFOVS, 2) + raw_sun_ang_f = raw_sun_ang.astype(np.float64) / 1e6 + + solar_za[base : base + n_obs_per_scan] = ( + raw_sun_ang_f[:, :, 0].ravel().astype(np.float32) + ) + solar_azi[base : base + n_obs_per_scan] = ( + raw_sun_ang_f[:, :, 1].ravel().astype(np.float32) + ) + + # GQisFlagQualDetailed: uint16[30][4] + qual_off = field_offsets["GQisFlagQualDetailed"] + raw_qual = np.frombuffer( + data, + dtype=">u2", + count=_NUM_EFOVS * _NUM_IFOVS, + offset=qual_off, + ).reshape(_NUM_EFOVS, _NUM_IFOVS) + + quality[base : base + n_obs_per_scan] = raw_qual.ravel() + + # GS1cSpect: int16[30][4][8700] — L1C calibrated spectra + spect_off = field_offsets["GS1cSpect"] + + # Vectorized extraction: read full spectral block and index channels + raw_spect = np.frombuffer( + data, + dtype=">i2", + count=_NUM_EFOVS * _NUM_IFOVS * _NUM_CHANNELS_ALLOC, + offset=spect_off, + ).reshape(_NUM_EFOVS, _NUM_IFOVS, _NUM_CHANNELS_ALLOC) + + # Select only requested channels and apply per-channel scale factors + selected = raw_spect[:, :, ch_idx].astype(np.float64) # (30, 4, n_ch) + selected *= channel_scales[ch_idx] + radiances[base : base + n_obs_per_scan, :] = selected.reshape( + n_obs_per_scan, n_ch + ) + + # Step 5: Convert radiance to mW/(m²·sr·cm⁻¹) for Planck function + # The GIADR-scaled values are in W/(m²·sr·m⁻¹). Convert: + # W/(m²·sr·m⁻¹) → mW/(m²·sr·cm⁻¹): + # × 1000 (W → mW) × 100 (m⁻¹ → cm⁻¹ spectral density) = × 1e5 + # Note: L[per cm⁻¹] = L[per m⁻¹] × 100 because 1 cm⁻¹ = 100 m⁻¹ + radiances *= 1e5 + + # Step 6: Convert radiances to brightness temperatures + if wavenumber_cm is None: + logger.warning("Could not determine wavenumber calibration") + return pd.DataFrame() + + bt = _radiance_to_bt(radiances, wavenumber_cm) + + # Step 7: Build long-format DataFrame (one row per IFOV × channel) + n_ch_total = n_ch + total_rows = n_obs * n_ch_total + + # Channel indices are 1-based in the output (1..8461) + ch_1based = ch_idx + 1 + + all_times = np.tile(scan_times, n_ch_total) + all_lats = np.tile(lats, n_ch_total) + all_lons = np.tile(lons, n_ch_total) + all_solza = np.tile(solar_za, n_ch_total) + all_solaza = np.tile(solar_azi, n_ch_total) + all_satza = np.tile(sat_za, n_ch_total) + all_sataza = np.tile(sat_azi, n_ch_total) + all_quality = np.tile(quality, n_ch_total) + all_scan_angle = np.tile(sat_za, n_ch_total) # scan angle ≈ sat zenith + + all_obs = np.empty(total_rows, dtype=np.float32) + all_channel_idx = np.empty(total_rows, dtype=np.uint16) + + for i in range(n_ch_total): + start = i * n_obs + end = start + n_obs + all_obs[start:end] = bt[:, i].astype(np.float32) + all_channel_idx[start:end] = ch_1based[i] + + df = pd.DataFrame( + { + "time": pd.to_datetime(all_times), + "class": "rad", + "lat": all_lats, + "lon": all_lons, + "elev": np.float32(0.0), # Satellite — no terrain elevation + "scan_angle": all_scan_angle, + "channel_index": all_channel_idx, + "solza": all_solza, + "solaza": all_solaza, + "satellite_za": all_satza, + "satellite_aza": all_sataza, + "quality": all_quality, + "satellite": satellite, + "observation": all_obs, + "variable": "iasi", + } + ) + + # Drop rows with invalid geolocation or NaN observations + df = df.dropna(subset=["observation", "lat", "lon"]) + df = df[(df["lat"] >= -90) & (df["lat"] <= 90)] + + return df + + +@check_optional_dependencies() +class MetOpIASI: + """EUMETSAT MetOp IASI Level 1C brightness temperature observations. + + The Infrared Atmospheric Sounding Interferometer (IASI) is a Fourier + transform infrared spectrometer aboard the MetOp series of polar-orbiting + satellites. It measures calibrated spectral radiances across 8461 channels + in the thermal infrared (645–2760 cm⁻¹, 3.6–15.5 µm), providing + atmospheric temperature and humidity profiles, trace gas columns, and + surface temperature at ~12 km spatial resolution. + + Each scan line contains 30 Extended Fields Of View (EFOVs), each + consisting of a 2×2 array of 4 Instantaneous Fields Of View (IFOVs), + yielding 120 spectra per scan line. A typical orbit pass contains + ~1400 scan lines (~168,000 IFOV spectra). + + To manage memory and processing time, a ``channel_indices`` parameter + allows selecting a subset of channels. By default only a representative + set of 100 channels spanning the three spectral bands is extracted. + + The returned :class:`~pandas.DataFrame` has one row per IFOV per channel, + following the same convention as :class:`~earth2studio.data.MetOpAMSUA`. + The ``channel_index`` column (1–8461) identifies each spectral channel. + + This data source downloads Level 1C products from the EUMETSAT Data Store + and parses the EPS native binary format to extract brightness temperatures, + geolocation, and viewing geometry. + + Parameters + ---------- + satellite : str, optional + Satellite platform filter for product search. One of "metop-a", + "metop-b", "metop-c", or None (all available). By default None. + channel_indices : list[int] | np.ndarray | None, optional + 0-based channel indices to extract (subset of 0..8460). If None, + a default set of 100 representative channels is used. Pass + ``np.arange(8461)`` to extract all channels (warning: very large + output). + time_tolerance : TimeTolerance, optional + Time tolerance window for filtering observations. Accepts a single + value (symmetric ± window) or a tuple (lower, upper) for asymmetric + windows, by default np.timedelta64(1, 'h') + cache : bool, optional + Cache data source on local memory, by default True + verbose : bool, optional + Print download progress and info, by default True + async_timeout : int, optional + Time in seconds after which download will be cancelled if not finished, + by default 600 + + Warning + ------- + This is a remote data source and can potentially download a large amount + of data to your local machine for large requests. IASI L1C files are + typically 100–200 MB each, with ~42–45 products per day across 3 + satellites. + + Note + ---- + Requires EUMETSAT Data Store credentials. Set the following environment + variables: + + - ``EUMETSAT_CONSUMER_KEY``: Your EUMETSAT API consumer key + - ``EUMETSAT_CONSUMER_SECRET``: Your EUMETSAT API consumer secret + + Register at https://eoportal.eumetsat.int/ to obtain credentials. + + Note + ---- + Additional information on the data repository: + + - https://data.eumetsat.int/product/EO:EUM:DAT:METOP:IASIL1C-ALL + - https://user.eumetsat.int/resources/user-guides/metop-iasi-level-1-data-guide + - Clerbaux et al. (2009), Atmos. Chem. Phys., 9, 6041–6054 + + Badges + ------ + region:global dataclass:observation product:sat + """ + + SOURCE_ID = "earth2studio.data.metop_iasi" + COLLECTION_ID = "EO:EUM:DAT:METOP:IASIL1C-ALL" + VALID_SATELLITES = frozenset(["metop-a", "metop-b", "metop-c"]) + + # Default representative channels: 100 channels spanning all 3 bands + # Band 1 (645–1144 cm⁻¹): channels 1–1997, 40 channels + # Band 2 (1210–1990 cm⁻¹): channels 1998–5116, 35 channels + # Band 3 (2000–2760 cm⁻¹): channels 5117–8461, 25 channels + DEFAULT_CHANNELS = np.concatenate( + [ + np.linspace(0, 1996, 40, dtype=np.int32), + np.linspace(1997, 5115, 35, dtype=np.int32), + np.linspace(5116, 8460, 25, dtype=np.int32), + ] + ) + + SCHEMA = pa.schema( + [ + E2STUDIO_SCHEMA.field("time"), + E2STUDIO_SCHEMA.field("class"), + E2STUDIO_SCHEMA.field("lat"), + E2STUDIO_SCHEMA.field("lon"), + E2STUDIO_SCHEMA.field("elev"), + E2STUDIO_SCHEMA.field("scan_angle"), + E2STUDIO_SCHEMA.field("channel_index"), + E2STUDIO_SCHEMA.field("solza"), + E2STUDIO_SCHEMA.field("solaza"), + E2STUDIO_SCHEMA.field("satellite_za"), + E2STUDIO_SCHEMA.field("satellite_aza"), + E2STUDIO_SCHEMA.field("quality"), + pa.field("satellite", pa.string()), + pa.field("observation", pa.float32()), + pa.field("variable", pa.string()), + ] + ) + + def __init__( + self, + satellite: str | None = None, + channel_indices: list[int] | np.ndarray | None = None, + time_tolerance: TimeTolerance = np.timedelta64(1, "h"), + cache: bool = True, + verbose: bool = True, + async_timeout: int = 600, + ) -> None: + if satellite is not None and satellite not in self.VALID_SATELLITES: + raise ValueError( + f"Invalid satellite '{satellite}'. " + f"Valid options: {sorted(self.VALID_SATELLITES)}" + ) + self._satellite = satellite + self._eumdac_satellite = _EUMDAC_SAT_NAME[satellite] if satellite else None + + # Channel selection + if channel_indices is not None: + self._channel_indices = np.asarray(channel_indices, dtype=np.int32) + else: + self._channel_indices = self.DEFAULT_CHANNELS.copy() + + self._cache = cache + self._verbose = verbose + self._tmp_cache_hash: str | None = None + self.async_timeout = async_timeout + + lower, upper = normalize_time_tolerance(time_tolerance) + self._tolerance_lower = pd.to_timedelta(lower).to_pytimedelta() + self._tolerance_upper = pd.to_timedelta(upper).to_pytimedelta() + + # Validate credentials early + self._consumer_key = os.environ.get("EUMETSAT_CONSUMER_KEY", "") + self._consumer_secret = os.environ.get("EUMETSAT_CONSUMER_SECRET", "") + if not self._consumer_key or not self._consumer_secret: + logger.warning( + "EUMETSAT_CONSUMER_KEY and/or EUMETSAT_CONSUMER_SECRET not set. " + "Data fetching will fail." + ) + + def __call__( + self, + time: datetime | list[datetime] | TimeArray, + variable: str | list[str] | VariableArray, + fields: str | list[str] | pa.Schema | None = None, + ) -> pd.DataFrame: + """Function to get IASI brightness temperature observations. + + Parameters + ---------- + time : datetime | list[datetime] | TimeArray + Timestamps to return data for (UTC). + variable : str | list[str] | VariableArray + Variables to return. Must be in MetOpIASILexicon + (e.g. ``["iasi"]``). + fields : str | list[str] | pa.Schema | None, optional + Fields to include in output, by default None (all fields). + + Returns + ------- + pd.DataFrame + IASI observation data in long format with one row per IFOV + per channel. + """ + try: + nest_asyncio.apply() + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + df = loop.run_until_complete( + asyncio.wait_for( + self.fetch(time, variable, fields), timeout=self.async_timeout + ) + ) + finally: + if not self._cache: + shutil.rmtree(self.cache, ignore_errors=True) + + return df + + async def fetch( + self, + time: datetime | list[datetime] | TimeArray, + variable: str | list[str] | VariableArray, + fields: str | list[str] | pa.Schema | None = None, + ) -> pd.DataFrame: + """Async function to get IASI data. + + Parameters + ---------- + time : datetime | list[datetime] | TimeArray + Timestamps to return data for (UTC). + variable : str | list[str] | VariableArray + Variables to return. Must be in MetOpIASILexicon + (e.g. ``["iasi"]``). + fields : str | list[str] | pa.Schema | None, optional + Fields to include in output, by default None (all fields). + + Returns + ------- + pd.DataFrame + IASI observation data in long format with one row per IFOV + per channel. + """ + time_list, variable_list = prep_data_inputs(time, variable) + schema = self.resolve_fields(fields) + + # Validate variables against lexicon + for v in variable_list: + if v not in MetOpIASILexicon.VOCAB: + raise KeyError( + f"Variable '{v}' not found in MetOpIASILexicon. " + f"Available: {list(MetOpIASILexicon.VOCAB.keys())}" + ) + + pathlib.Path(self.cache).mkdir(parents=True, exist_ok=True) + + # Compute overall time window for product search + all_times = [ + t.replace(tzinfo=None) if hasattr(t, "tzinfo") and t.tzinfo else t + for t in time_list + ] + dt_min = min(all_times) + self._tolerance_lower + dt_max = max(all_times) + self._tolerance_upper + + # Download products from EUMETSAT Data Store + product_files = await asyncio.to_thread(self._download_products, dt_min, dt_max) + + if not product_files: + logger.warning( + "No IASI products found for time range {} to {}", dt_min, dt_max + ) + return self._empty_dataframe(schema) + + # Parse each product file + frames: list[pd.DataFrame] = [] + for fpath in product_files: + with open(fpath, "rb") as f: + raw = f.read() + df = _parse_native_iasi(raw, channel_indices=self._channel_indices) + if not df.empty: + frames.append(df) + + if not frames: + return self._empty_dataframe(schema) + + result = pd.concat(frames, ignore_index=True) + + # Deduplicate rows that may appear from overlapping tolerance windows + result = result.drop_duplicates( + subset=[ + "time", + "lat", + "lon", + "channel_index", + "satellite", + "variable", + ] + ) + + # Filter by time window for each requested timestamp + time_masks = [] + for t in all_times: + t_min = t + self._tolerance_lower + t_max = t + self._tolerance_upper + mask = (result["time"] >= pd.Timestamp(t_min)) & ( + result["time"] <= pd.Timestamp(t_max) + ) + time_masks.append(mask) + + if time_masks: + combined_mask = time_masks[0] + for mask in time_masks[1:]: + combined_mask = combined_mask | mask + result = result[combined_mask] + + # Select requested fields + available_cols = [c for c in schema.names if c in result.columns] + result = result[available_cols].reset_index(drop=True) + + return result + + def _download_products(self, dt_start: datetime, dt_end: datetime) -> list[str]: + """Download IASI L1C products from EUMETSAT Data Store. + + Parameters + ---------- + dt_start : datetime + Search window start (UTC) + dt_end : datetime + Search window end (UTC) + + Returns + ------- + list[str] + Paths to downloaded native format files + """ + token = eumdac.AccessToken( + credentials=(self._consumer_key, self._consumer_secret) + ) + datastore = eumdac.DataStore(token) + collection = datastore.get_collection(self.COLLECTION_ID) + + search_kwargs: dict = { + "dtstart": dt_start, + "dtend": dt_end, + } + if self._eumdac_satellite: + search_kwargs["sat"] = self._eumdac_satellite + + products = collection.search(**search_kwargs) + + downloaded: list[str] = [] + for product in products: + if self._verbose: + logger.info( + "Downloading IASI product: {} ({}–{})", + product, + getattr(product, "sensing_start", "?"), + getattr(product, "sensing_end", "?"), + ) + + # Find the .nat file entry + nat_entry = None + try: + entries = product.entries + for entry in entries: + if str(entry).endswith(".nat"): + nat_entry = entry + break + except Exception as exc: # noqa: S110 + logger.debug( + "Could not list entries for product {}: {} — using default entry", + product, + exc, + ) + + # Download to cache + cache_name = f"{product}.nat" + cache_path = os.path.join(self.cache, cache_name) + if os.path.isfile(cache_path): + downloaded.append(cache_path) + continue + + try: + with product.open(entry=nat_entry) as stream: + raw = stream.read() + with open(cache_path, "wb") as f: + f.write(raw) + downloaded.append(cache_path) + except Exception as exc: + logger.warning("Failed to download product {}: {}", product, exc) + + return downloaded + + def _empty_dataframe(self, schema: pa.Schema) -> pd.DataFrame: + """Create an empty DataFrame matching the schema. + + Parameters + ---------- + schema : pa.Schema + Target schema + + Returns + ------- + pd.DataFrame + Empty DataFrame with correct columns + """ + return pd.DataFrame({name: pd.Series(dtype="object") for name in schema.names}) + + @classmethod + def resolve_fields(cls, fields: str | list[str] | pa.Schema | None) -> pa.Schema: + """Convert fields parameter into a validated PyArrow schema. + + Parameters + ---------- + fields : str | list[str] | pa.Schema | None + Field specification. None returns the full SCHEMA. + + Returns + ------- + pa.Schema + A PyArrow schema containing only the requested fields + + Raises + ------ + KeyError + If a requested field name is not in the SCHEMA + TypeError + If a field type doesn't match the SCHEMA + """ + if fields is None: + return cls.SCHEMA + + if isinstance(fields, str): + fields = [fields] + + if isinstance(fields, pa.Schema): + for field in fields: + if field.name not in cls.SCHEMA.names: + raise KeyError( + f"Field '{field.name}' not found in class SCHEMA. " + f"Available fields: {cls.SCHEMA.names}" + ) + expected_type = cls.SCHEMA.field(field.name).type + if field.type != expected_type: + raise TypeError( + f"Field '{field.name}' has type {field.type}, " + f"expected {expected_type} from class SCHEMA" + ) + return fields + + selected_fields = [] + for name in fields: + if name not in cls.SCHEMA.names: + raise KeyError( + f"Field '{name}' not found in class SCHEMA. " + f"Available fields: {cls.SCHEMA.names}" + ) + selected_fields.append(cls.SCHEMA.field(name)) + + return pa.schema(selected_fields) + + @property + def cache(self) -> str: + """Get the appropriate cache location.""" + cache_location = os.path.join(datasource_cache_root(), "metop_iasi") + if not self._cache: + if self._tmp_cache_hash is None: + self._tmp_cache_hash = uuid.uuid4().hex[:8] + cache_location = os.path.join( + cache_location, f"tmp_metop_iasi_{self._tmp_cache_hash}" + ) + return cache_location diff --git a/earth2studio/lexicon/__init__.py b/earth2studio/lexicon/__init__.py index e914d4802..c623f993a 100644 --- a/earth2studio/lexicon/__init__.py +++ b/earth2studio/lexicon/__init__.py @@ -28,6 +28,7 @@ from .isd import ISDLexicon from .jpss import JPSSATMSLexicon, JPSSLexicon from .metop import MetOpAMSUALexicon, MetOpAVHRRLexicon, MetOpMHSLexicon +from .metop_iasi import MetOpIASILexicon from .mrms import MRMSLexicon from .ncar import NCAR_ERA5Lexicon from .planetary_computer import ( diff --git a/earth2studio/lexicon/metop_iasi.py b/earth2studio/lexicon/metop_iasi.py new file mode 100644 index 000000000..8b057b284 --- /dev/null +++ b/earth2studio/lexicon/metop_iasi.py @@ -0,0 +1,107 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Callable +from typing import Any + +from earth2studio.lexicon.base import LexiconType + + +class MetOpIASILexicon(metaclass=LexiconType): + """Lexicon for MetOp IASI Level 1C brightness temperature data. + + This lexicon maps the ``iasi`` variable to an identity modifier for + brightness temperature observations in Kelvin. Individual channels (1-8461) + are distinguished by the ``channel_index`` column of the returned DataFrame, + following the same convention used by :class:`~earth2studio.data.MetOpAMSUA` + and :class:`~earth2studio.data.UFSObsSat`. + + IASI (Infrared Atmospheric Sounding Interferometer) is a Fourier-transform + infrared spectrometer on the MetOp satellite series providing 8461 spectral + channels across the thermal infrared (645--2760 cm-1). The instrument + measures calibrated spectral radiances that are converted to brightness + temperature (K) via the Planck function. + + The data source returns calibrated brightness temperatures (K) for a + user-selectable subset of the 8461 channels. Each IASI field of regard + (EFOV) comprises a 2x2 array of instantaneous fields of view (IFOVs), + each approximately 12 km diameter at nadir. + + Notes + ----- + IASI Spectral Band Specifications: + + ====== =============== =============== ================= + Band Channel Range Wavenumber (cm-1) Primary Application + ====== =============== =============== ================= + 1 1 -- 1997 645 -- 1210 CO2 / temperature sounding + 2 1998 -- 5116 1210 -- 1990 Water vapor / ozone + 3 5117 -- 8461 1990 -- 2760 Shortwave IR / trace gases + ====== =============== =============== ================= + + Spectral sampling is 0.25 cm-1 with 0.5 cm-1 apodised resolution. + + Each scan line contains 30 EFOV positions x 4 IFOV pixels = 120 spectra. + Spatial resolution is approximately 12 km at nadir per IFOV. + + Variable documentation: + https://data.eumetsat.int/product/EO:EUM:DAT:METOP:IASIL1C-ALL + + References + ---------- + - Clerbaux, C. et al. (2009). Monitoring of atmospheric composition using + the thermal infrared IASI/MetOp sounder. Atmos. Chem. Phys., 9, 6041-6054. + - EUMETSAT. IASI Level 1 Product Guide (EUM/OPS-EPS/MAN/04/0032). + """ + + # Number of IASI cross-track extended field-of-view positions per scan line + IASI_NUM_EFOVS = 30 + # Number of instantaneous fields of view per EFOV (2x2 array) + IASI_NUM_IFOVS = 4 + # Total number of IASI spectral channels + IASI_NUM_CHANNELS = 8461 + # Spectral sampling interval (cm-1) + IASI_SPECTRAL_SAMPLING = 0.25 + # IASI spectral band definitions: (first_channel, last_channel, start_wn, end_wn) + IASI_BANDS: dict[int, tuple[int, int, float, float]] = { + 1: (1, 1997, 645.0, 1144.0), + 2: (1998, 5116, 1210.0, 1989.75), + 3: (5117, 8461, 2000.0, 2760.0), + } + + VOCAB: dict[str, str] = { + "iasi": "brightnessTemperature", + } + + @classmethod + def get_item(cls, val: str) -> tuple[str, Callable[[Any], Any]]: + """Get IASI data key and modifier for a variable name. + + Parameters + ---------- + val : str + Standardized variable name (``iasi``) + + Returns + ------- + tuple[str, Callable] + Tuple containing: + - Data key for brightness temperature + - Modifier function (identity -- brightness temperature in K) + """ + if val not in cls.VOCAB: + raise KeyError(f"Variable {val} not found in MetOp IASI lexicon") + return cls.VOCAB[val], lambda x: x diff --git a/test/data/test_metop_iasi.py b/test/data/test_metop_iasi.py new file mode 100644 index 000000000..28dab50d4 --- /dev/null +++ b/test/data/test_metop_iasi.py @@ -0,0 +1,631 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import struct +from datetime import datetime, timedelta +from unittest.mock import patch + +import numpy as np +import pyarrow as pa +import pytest + +from earth2studio.data import MetOpIASI +from earth2studio.data.metop_iasi import ( + _C1, + _C2, + _GIADR_RECORD_CLASS, + _GIADR_SF_SUBCLASS, + _GRH_SIZE, + _MDR_INSTRUMENT_GROUP, + _MDR_RECORD_CLASS, + _MDR_RECORD_SUBCLASS, + _MPHR_RECORD_CLASS, + _NUM_CHANNELS, + _NUM_CHANNELS_ALLOC, + _NUM_EFOVS, + _NUM_IFOVS, + _build_channel_scale_array, + _compute_mdr_field_offsets, + _decode_eps_datetime, + _parse_giadr_scale_factors, + _parse_grh, + _parse_mphr, + _parse_native_iasi, + _radiance_to_bt, +) +from earth2studio.lexicon import MetOpIASILexicon + + +# --------------------------------------------------------------------------- +# Helpers to build synthetic EPS native binary data for IASI +# --------------------------------------------------------------------------- +def _build_grh( + record_class: int, + instrument_group: int, + record_subclass: int, + record_size: int, +) -> bytes: + header = bytearray(20) + header[0] = record_class + header[1] = instrument_group + header[2] = record_subclass + struct.pack_into(">I", header, 4, record_size) + return bytes(header) + + +def _build_mphr(fields: dict[str, str]) -> bytes: + text = "\n".join(f"{k}={v}" for k, v in fields.items()) + "\n" + payload = text.encode("ascii") + grh = _build_grh(_MPHR_RECORD_CLASS, 0, 0, _GRH_SIZE + len(payload)) + return grh + payload + + +def _build_giadr_sf( + nb_bands: int = 3, + first_ch: list[int] | None = None, + last_ch: list[int] | None = None, + scale_factors: list[int] | None = None, +) -> bytes: + """Build a GIADR Scale Factors record. + + Default bands use absolute sample numbers (matching nsfirst=2581): + Band 1: samples 2581-4577 (channels 0-1996), SF=5 + Band 2: samples 4578-6696 (channels 1997-4115), SF=5 + Band 3: samples 6697-11041 (channels 4116-8460), SF=5 + + Scale factors are positive integers used as negative exponents: + physical = raw × 10^(-sf). + """ + if first_ch is None: + first_ch = [2581, 4578, 6697] + if last_ch is None: + last_ch = [4577, 6696, 11041] + if scale_factors is None: + scale_factors = [5, 5, 5] + + # Payload: nb_bands(int16) + first[10](int16) + last[10](int16) + sf[10](int16) + iis_sf(int16) + payload_size = 2 + 20 + 20 + 20 + 2 + payload = bytearray(payload_size) + struct.pack_into(">h", payload, 0, nb_bands) + for i in range(10): + val = first_ch[i] if i < len(first_ch) else 0 + struct.pack_into(">h", payload, 2 + i * 2, val) + for i in range(10): + val = last_ch[i] if i < len(last_ch) else 0 + struct.pack_into(">h", payload, 22 + i * 2, val) + for i in range(10): + val = scale_factors[i] if i < len(scale_factors) else 0 + struct.pack_into(">h", payload, 42 + i * 2, val) + # IDefScaleIISScaleFactor: int16 + struct.pack_into(">h", payload, 62, -5) + + grh = _build_grh( + _GIADR_RECORD_CLASS, + _MDR_INSTRUMENT_GROUP, + _GIADR_SF_SUBCLASS, + _GRH_SIZE + payload_size, + ) + return grh + bytes(payload) + + +def _build_mdr( + lat_deg: float = 45.0, + lon_deg: float = 10.0, + quality_val: int = 0, + radiance_raw: int = 5000, +) -> bytes: + """Build a synthetic IASI MDR record. + + The binary layout follows _compute_mdr_field_offsets exactly: + GRH(20) + DEGRADED(2) + GEPSIasiMode(4) + GEPSOPSProcessingMode(4) + + GEPSIdConf(32) + GEPSLocIasiAvhrr_IASI(1200) + GEPSLocIasiAvhrr_IIS(7500) + + OBT(180) + OnboardUTC(180) + GEPSDatIasi(180) + GIsfLin(8) + GIsfCol(8) + + GIsfPds1-4(32) + GEPS_CCD(30) + GEPS_SP(120) + GIrcImage(245760) + + GQisFlagQual(360) + GQisFlagQualDetailed(240) + QualIndices(5*5+4+4=33) + + GGeoSondLoc(960) + GGeoSondAnglesMETOP(960) + GGeoIISAnglesMETOP(6000) + + GGeoSondAnglesSUN(960) + GGeoIISAnglesSUN(6000) + GGeoIISLoc(6000) + + EARTH_SATELLITE_DISTANCE(4) + IDefSpectDWn1b(5) + IDefNsfirst1b(4) + + IDefNslast1b(4) + GS1cSpect(2088000) + trailing(...) + """ + # Compute payload size — must match _compute_mdr_field_offsets walk + # Payload after GRH: + payload_size = ( + 2 # DEGRADED_INST + DEGRADED_PROC + + 4 # GEPSIasiMode + + 4 # GEPSOPSProcessingMode + + 32 # GEPSIdConf + + 1200 # GEPSLocIasiAvhrr_IASI + + 7500 # GEPSLocIasiAvhrr_IIS + + 180 # OBT + + 180 # OnboardUTC + + 180 # GEPSDatIasi + + 8 # GIsfLinOrigin + + 8 # GIsfColOrigin + + 32 # GIsfPds1-4 + + 30 # GEPS_CCD + + 120 # GEPS_SP + + 245760 # GIrcImage + + 360 # GQisFlagQual + + 240 # GQisFlagQualDetailed + + 33 # Quality indices (5*5+4+4) + + 960 # GGeoSondLoc + + 960 # GGeoSondAnglesMETOP + + 6000 # GGeoIISAnglesMETOP + + 960 # GGeoSondAnglesSUN + + 6000 # GGeoIISAnglesSUN + + 6000 # GGeoIISLoc + + 4 # EARTH_SATELLITE_DISTANCE + + 5 # IDefSpectDWn1b + + 4 # IDefNsfirst1b + + 4 # IDefNslast1b + + _NUM_EFOVS * _NUM_IFOVS * _NUM_CHANNELS_ALLOC * 2 # GS1cSpect + ) + + record_size = _GRH_SIZE + payload_size + grh = _build_grh( + _MDR_RECORD_CLASS, + _MDR_INSTRUMENT_GROUP, + _MDR_RECORD_SUBCLASS, + record_size, + ) + payload = bytearray(payload_size) + + # Walk through payload filling key fields + pos = 0 + + # DEGRADED (2 bytes) + pos += 2 + # GEPSIasiMode (4), GEPSOPSProcessingMode (4), GEPSIdConf (32) + pos += 40 + # GEPSLocIasiAvhrr_IASI (1200), GEPSLocIasiAvhrr_IIS (7500) + pos += 8700 + # OBT (180), OnboardUTC (180) + pos += 360 + + # GEPSDatIasi: EPSdatetime[30] — fill with 2025-01-15T10:00:00Z + # days since 2000-01-01 = 9145 (2025-01-15), ms_of_day = 36_000_000 (10:00:00) + days_since_epoch = (datetime(2025, 1, 15) - datetime(2000, 1, 1)).days + ms_of_day = 10 * 3600 * 1000 # 10:00:00 UTC + for efov in range(_NUM_EFOVS): + off = pos + efov * 6 + struct.pack_into(">H", payload, off, days_since_epoch) + struct.pack_into(">I", payload, off + 2, ms_of_day) + pos += 180 + + # GIsf* fields (8+8+32=48) + GEPS_CCD (30) + GEPS_SP (120) = 198 + pos += 198 + # GIrcImage (245760) + pos += 245760 + # GQisFlagQual (360) + pos += 360 + + # GQisFlagQualDetailed: uint16[30][4] + for i in range(_NUM_EFOVS * _NUM_IFOVS): + struct.pack_into(">H", payload, pos + i * 2, quality_val) + pos += 240 + + # Quality indices (33 bytes) + pos += 33 + + # GGeoSondLoc: int32[30][4][2], SF=1e6 — [efov][ifov][lon=0, lat=1] + lat_raw = int(lat_deg * 1e6) + lon_raw = int(lon_deg * 1e6) + for efov in range(_NUM_EFOVS): + for ifov in range(_NUM_IFOVS): + idx = (efov * _NUM_IFOVS + ifov) * 2 + struct.pack_into(">i", payload, pos + idx * 4, lon_raw) + struct.pack_into(">i", payload, pos + (idx + 1) * 4, lat_raw) + pos += 960 + + # GGeoSondAnglesMETOP: int32[30][4][2], SF=1e6 — [zen=0, azi=1] + ang_raw = int(20.0 * 1e6) # 20 degrees + for i in range(_NUM_EFOVS * _NUM_IFOVS * 2): + struct.pack_into(">i", payload, pos + i * 4, ang_raw) + pos += 960 + + # GGeoIISAnglesMETOP (6000) + pos += 6000 + + # GGeoSondAnglesSUN: int32[30][4][2], SF=1e6 + for i in range(_NUM_EFOVS * _NUM_IFOVS * 2): + struct.pack_into(">i", payload, pos + i * 4, ang_raw) + pos += 960 + + # GGeoIISAnglesSUN (6000), GGeoIISLoc (6000) + pos += 12000 + # EARTH_SATELLITE_DISTANCE (4) + pos += 4 + + # IDefSpectDWn1b: VSFInt (1 byte scale + 4 byte value) + # Spectral sampling: 25 m⁻¹ = 0.25 cm⁻¹ + # Encode as scale=0, value=25 + struct.pack_into(">b", payload, pos, 0) + struct.pack_into(">i", payload, pos + 1, 25) + pos += 5 + + # IDefNsfirst1b: int32 = 2581 (645 cm⁻¹ / 0.25 cm⁻¹ ≈ 2580, rounded to 2581) + struct.pack_into(">i", payload, pos, 2581) + pos += 4 + + # IDefNslast1b: int32 = 11041 + struct.pack_into(">i", payload, pos, 11041) + pos += 4 + + # GS1cSpect: int16[30][4][8700] + # Fill selected channels with a known positive value + for efov in range(_NUM_EFOVS): + for ifov in range(_NUM_IFOVS): + spect_base = pos + (efov * _NUM_IFOVS + ifov) * _NUM_CHANNELS_ALLOC * 2 + # Fill first 8461 channels with radiance_raw + for ch in range(_NUM_CHANNELS): + struct.pack_into(">h", payload, spect_base + ch * 2, radiance_raw) + + return grh + bytes(payload) + + +def _build_native_file( + spacecraft_id: str = "M01", + n_scans: int = 1, + lat_deg: float = 45.0, + lon_deg: float = 10.0, + quality_val: int = 0, + radiance_raw: int = 5000, +) -> bytes: + mphr = _build_mphr( + { + "SPACECRAFT_ID": spacecraft_id, + "SENSING_START": "20250115100000Z", + "SENSING_END": "20250115100800Z", + } + ) + giadr = _build_giadr_sf() + mdrs = b"".join( + _build_mdr( + lat_deg=lat_deg, + lon_deg=lon_deg, + quality_val=quality_val, + radiance_raw=radiance_raw, + ) + for _ in range(n_scans) + ) + return mphr + giadr + mdrs + + +# --------------------------------------------------------------------------- +# Mock tests — exercises __call__ end-to-end without network +# --------------------------------------------------------------------------- +def test_metop_iasi_call_mock(tmp_path): + nat_data = _build_native_file(spacecraft_id="M01", n_scans=1) + nat_file = tmp_path / "mock_iasi.nat" + nat_file.write_bytes(nat_data) + + with patch.object(MetOpIASI, "_download_products") as mock_dl: + mock_dl.return_value = [str(nat_file)] + ds = MetOpIASI( + channel_indices=np.array([0, 100, 500]), + time_tolerance=timedelta(hours=24), + cache=False, + verbose=False, + ) + df = ds(datetime(2025, 1, 15, 10), ["iasi"]) + + assert list(df.columns) == ds.SCHEMA.names + assert not df.empty + assert set(df["variable"].unique()) == {"iasi"} + assert set(df["class"].unique()) == {"rad"} + assert set(df["satellite"].unique()) == {"metop-b"} + assert df["observation"].notna().all() + assert df["lat"].between(-90, 90).all() + assert df["lon"].between(0, 360).all() + mock_dl.assert_called_once() + + +def test_metop_iasi_call_mock_fields_subset(tmp_path): + nat_data = _build_native_file(n_scans=1) + nat_file = tmp_path / "mock_iasi.nat" + nat_file.write_bytes(nat_data) + + with patch.object(MetOpIASI, "_download_products") as mock_dl: + mock_dl.return_value = [str(nat_file)] + ds = MetOpIASI( + channel_indices=np.array([0, 1000]), + time_tolerance=timedelta(hours=24), + cache=False, + verbose=False, + ) + + subset = ["time", "lat", "lon", "observation", "variable"] + df = ds(datetime(2025, 1, 15, 10), ["iasi"], fields=subset) + assert list(df.columns) == subset + assert not df.empty + + +def test_metop_iasi_call_mock_empty(tmp_path): + with patch.object(MetOpIASI, "_download_products") as mock_dl: + mock_dl.return_value = [] + ds = MetOpIASI(time_tolerance=timedelta(minutes=5), cache=False, verbose=False) + df = ds(datetime(2025, 1, 15, 10), ["iasi"]) + assert df.empty + assert list(df.columns) == ds.SCHEMA.names + + +# --------------------------------------------------------------------------- +# Network integration tests (slow, xfail) +# --------------------------------------------------------------------------- +@pytest.mark.slow +@pytest.mark.xfail +@pytest.mark.timeout(120) +@pytest.mark.parametrize( + "time,variable,tol", + [ + ( + datetime(2025, 1, 15, 11), + ["iasi"], + timedelta(minutes=5), + ), + ( + [datetime(2025, 1, 15, 10), datetime(2025, 1, 15, 11)], + ["iasi"], + timedelta(minutes=5), + ), + ], +) +def test_metop_iasi_fetch(time, variable, tol): + ds = MetOpIASI(satellite="metop-c", time_tolerance=tol, cache=False, verbose=False) + df = ds(time, variable) + assert list(df.columns) == ds.SCHEMA.names + assert set(df["variable"].unique()).issubset(set(variable)) + + +@pytest.mark.slow +@pytest.mark.xfail +@pytest.mark.timeout(120) +@pytest.mark.parametrize("cache", [True, False]) +def test_metop_iasi_cache(cache): + ds = MetOpIASI( + satellite="metop-c", + time_tolerance=timedelta(minutes=5), + cache=cache, + verbose=False, + ) + df = ds(datetime(2025, 1, 15, 11), ["iasi"]) + assert list(df.columns) == ds.SCHEMA.names + + +# --------------------------------------------------------------------------- +# Unit tests — exceptions, resolve_fields, binary parsers +# --------------------------------------------------------------------------- +def test_metop_iasi_exceptions(): + ds = MetOpIASI(time_tolerance=timedelta(minutes=5), cache=False, verbose=False) + with pytest.raises(KeyError): + ds(datetime(2025, 1, 15, 11), ["invalid_variable"]) + + with pytest.raises(KeyError): + MetOpIASI.resolve_fields(["nonexistent"]) + + with pytest.raises(TypeError): + MetOpIASI.resolve_fields(pa.schema([pa.field("time", pa.string())])) + + +def test_metop_iasi_resolve_fields(): + assert MetOpIASI.resolve_fields(None) == MetOpIASI.SCHEMA + assert MetOpIASI.resolve_fields("time").names == ["time"] + assert MetOpIASI.resolve_fields(["time", "lat"]).names == ["time", "lat"] + + +def test_parse_grh(): + grh = _build_grh(8, 8, 2, 2723000) + rc, ig, sc, sz = _parse_grh(grh, 0) + assert (rc, ig, sc, sz) == (8, 8, 2, 2723000) + + +def test_parse_mphr(): + data = _build_mphr({"SPACECRAFT_ID": "M03", "SENSING_START": "20250115100000Z"}) + mphr = _parse_mphr(data) + assert mphr["SPACECRAFT_ID"] == "M03" + assert mphr["SENSING_START"] == "20250115100000Z" + + +def test_decode_eps_datetime(): + buf = bytearray(6) + # 2025-01-15 = 9145 days since 2000-01-01 + days = (datetime(2025, 1, 15) - datetime(2000, 1, 1)).days + struct.pack_into(">H", buf, 0, days) + # 10:00:00 = 36_000_000 ms + struct.pack_into(">I", buf, 2, 36_000_000) + dt = _decode_eps_datetime(bytes(buf), 0) + assert dt == datetime(2025, 1, 15, 10, 0, 0) + + +def test_radiance_to_bt(): + # Known physical values: wavenumber 900 cm⁻¹ (~11 µm, window channel) + wn = np.array([900.0], dtype=np.float64) + # A typical radiance for 280K at 900 cm⁻¹ + # BT = C2 * nu / ln(1 + C1 * nu^3 / L) → solve for L + bt_expected = 280.0 + l_expected = _C1 * 900.0**3 / (np.exp(_C2 * 900.0 / bt_expected) - 1.0) + rad = np.array([[l_expected]], dtype=np.float64) + bt = _radiance_to_bt(rad, wn) + assert np.isfinite(bt[0, 0]) + np.testing.assert_allclose(bt[0, 0], bt_expected, rtol=1e-6) + + # Zero/negative radiance → NaN + bt_zero = _radiance_to_bt(np.array([[0.0, -1.0]], dtype=np.float64), wn) + assert np.all(np.isnan(bt_zero)) + + # Higher radiance → higher BT + rads = np.array([[0.01, 0.05, 0.1]], dtype=np.float64) + bt_multi = _radiance_to_bt(rads, wn) + assert np.all(np.isfinite(bt_multi)) + assert bt_multi[0, 0] < bt_multi[0, 1] < bt_multi[0, 2] + + +def test_parse_giadr_scale_factors(): + giadr = _build_giadr_sf( + nb_bands=3, + first_ch=[2581, 4578, 6697], + last_ch=[4577, 6696, 11041], + scale_factors=[5, 5, 5], + ) + # Parse starting at offset 0 (this is a standalone record) + first, last, sf, nb = _parse_giadr_scale_factors(giadr, 0) + assert nb == 3 + np.testing.assert_array_equal(first, [2581, 4578, 6697]) + np.testing.assert_array_equal(last, [4577, 6696, 11041]) + np.testing.assert_array_equal(sf, [5, 5, 5]) + + +def test_build_channel_scale_array(): + # Absolute sample numbers with nsfirst=2581 + first = np.array([2581, 4578, 6697]) + last = np.array([4577, 6696, 11041]) + sf = np.array([5, 5, 5]) # Positive: used as negative exponent → 10^(-5) + scales = _build_channel_scale_array(first, last, sf, nsfirst=2581) + assert scales.shape == (_NUM_CHANNELS,) + assert scales[0] == 1e-5 # Channel 0 → sample 2581 → Band 1 + assert scales[1996] == 1e-5 # Last channel of band 1 + assert scales[1997] == 1e-5 # First channel of band 2 + assert scales[8460] == 1e-5 # Last valid channel + + +def test_parse_native_iasi(): + # 1 scan → 30 EFOVs × 4 IFOVs = 120 obs, extract 3 channels → 360 rows + ch_idx = np.array([0, 100, 500]) + data = _build_native_file(n_scans=1) + df = _parse_native_iasi(data, channel_indices=ch_idx) + expected_rows = _NUM_EFOVS * _NUM_IFOVS * len(ch_idx) + assert len(df) == expected_rows + assert set(df["satellite"].unique()) == {"metop-b"} + assert (df["variable"] == "iasi").all() + assert (df["class"] == "rad").all() + assert "quality" in df.columns + assert (df["quality"] == 0).all() + + # Channel indices should be 1-based + assert set(df["channel_index"].unique()) == {1, 101, 501} + + +def test_parse_native_iasi_multiple_scans(): + ch_idx = np.array([0, 1000, 4000]) + data = _build_native_file(spacecraft_id="M03", n_scans=3) + df = _parse_native_iasi(data, channel_indices=ch_idx) + expected_rows = 3 * _NUM_EFOVS * _NUM_IFOVS * len(ch_idx) + assert len(df) == expected_rows + assert set(df["satellite"].unique()) == {"metop-c"} + + +def test_parse_native_iasi_empty(): + assert _parse_native_iasi(b"").empty + # MPHR only (no GIADR or MDR) + assert _parse_native_iasi( + _build_mphr( + { + "SPACECRAFT_ID": "M01", + "SENSING_START": "20250115100000Z", + "SENSING_END": "20250115100800Z", + } + ) + ).empty + + +def test_parse_native_iasi_quality(): + ch_idx = np.array([0]) + data = _build_native_file(quality_val=42) + df = _parse_native_iasi(data, channel_indices=ch_idx) + assert not df.empty + assert (df["quality"] == 42).all() + + +def test_parse_native_iasi_geolocation(): + ch_idx = np.array([0]) + data = _build_native_file(lat_deg=45.0, lon_deg=10.0) + df = _parse_native_iasi(data, channel_indices=ch_idx) + # All observations should have lat=45, lon=10 + np.testing.assert_allclose(df["lat"].values, 45.0, atol=0.01) + np.testing.assert_allclose(df["lon"].values, 10.0, atol=0.01) + + +def test_parse_native_iasi_negative_longitude(): + ch_idx = np.array([0]) + # Negative longitude should be converted to [0, 360] + data = _build_native_file(lat_deg=45.0, lon_deg=-10.0) + df = _parse_native_iasi(data, channel_indices=ch_idx) + np.testing.assert_allclose(df["lon"].values, 350.0, atol=0.01) + + +def test_compute_mdr_field_offsets(): + data = _build_native_file(n_scans=1) + # Find MDR offset by scanning records + offset = 0 + mdr_offset = None + while offset + _GRH_SIZE <= len(data): + rc, ig, sc, sz = _parse_grh(data, offset) + if rc == _MDR_RECORD_CLASS and ig == _MDR_INSTRUMENT_GROUP: + mdr_offset = offset + break + offset += sz + assert mdr_offset is not None + + offsets = _compute_mdr_field_offsets(data, mdr_offset) + assert "GEPSDatIasi" in offsets + assert "GGeoSondLoc" in offsets + assert "GGeoSondAnglesMETOP" in offsets + assert "GGeoSondAnglesSUN" in offsets + assert "GQisFlagQualDetailed" in offsets + assert "IDefSpectDWn1b" in offsets + assert "IDefNsfirst1b" in offsets + assert "GS1cSpect" in offsets + # Offsets should be increasing + assert offsets["GEPSDatIasi"] < offsets["GGeoSondLoc"] + assert offsets["GGeoSondLoc"] < offsets["GS1cSpect"] + + +def test_parse_native_iasi_skip_non_mdr(): + # Insert an IPR record between GIADR and MDR — it should be skipped + mphr = _build_mphr( + { + "SPACECRAFT_ID": "M01", + "SENSING_START": "20250115100000Z", + "SENSING_END": "20250115100800Z", + } + ) + giadr = _build_giadr_sf() + ipr = _build_grh(3, 0, 1, 27) + bytes(7) # IPR record + mdr = _build_mdr() + data = mphr + giadr + ipr + mdr + ch_idx = np.array([0]) + df = _parse_native_iasi(data, channel_indices=ch_idx) + assert len(df) == _NUM_EFOVS * _NUM_IFOVS + + +def test_metop_iasi_default_channels(): + assert hasattr(MetOpIASI, "DEFAULT_CHANNELS") + assert len(MetOpIASI.DEFAULT_CHANNELS) == 100 + assert all(0 <= ch < _NUM_CHANNELS for ch in MetOpIASI.DEFAULT_CHANNELS) + + +def test_lexicon_vocab(): + assert "iasi" in MetOpIASILexicon.VOCAB + key, modifier = MetOpIASILexicon.get_item("iasi") + assert key == "brightnessTemperature" + # Identity modifier + val = np.array([1.0, 2.0]) + np.testing.assert_array_equal(modifier(val), val) + + +def test_lexicon_invalid_variable(): + with pytest.raises(KeyError): + MetOpIASILexicon["nonexistent"]