Update patch_steradians function for better accuracy#25
Update patch_steradians function for better accuracy#25bbrangeo wants to merge 1 commit intoUMEP-dev:mainfrom
Conversation
Refactor steradian calculation to use scalar values and improve clarity.
PR #25 Review Summary:
|
| Aspect | Original (main) | PR #25 | This Review |
|---|---|---|---|
| Lookup method | skyalt_c[skyalt == x] |
np.any() + [0] |
dict |
patch_alt_0 |
Inline | Inside loop | Outside loop |
| Error handling | Errors on bad data | Fallback to 1.0 |
KeyError |
float() calls |
None | Added | Removed |
| Docstring | '''' (typo) |
Unchanged | Fixed |
| Tests | None | None | Added |
What PR #25 Did
patch_alt_i = float(patch_altitude[i])
match_idx = skyalt == patch_alt_i
if np.any(match_idx):
skyalt_count = float(skyalt_c[match_idx][0])
else:
skyalt_count = 1.0 # Fallback if no matchProposed Edits
Changes to umep/functions/SOLWEIGpython/patch_radiation.py:
1. Dict Lookup
alt_to_count = dict(zip(skyalt, skyalt_c))
skyalt_count = alt_to_count[patch_alt_i]2. Move Constant Outside Loop
patch_alt_0 = patch_altitude[0] # Computed once3. Remove Silent Fallback
The else: skyalt_count = 1.0 fallback would silently produce wrong results if data was corrupted. The dict lookup raises KeyError instead.
4. Remove float() Calls
Not needed - dict values are already scalars.
5. Docstring
Added parameter and return documentation.
6. Tests
Added tests for all 4 patch configurations and edge cases. See Suggested Code below for the full test file.
Proposed Code
See the Suggested Code section at the bottom of this comment for copy-paste ready versions.
def patch_steradians(L_patches):
"""Calculate the steradian (solid angle) for each sky patch.
Parameters
----------
L_patches : ndarray
Array of shape (n_patches, 3) where column 0 contains patch altitudes.
Patches must be sorted by altitude with each band grouped together.
Returns
-------
steradian : ndarray
Solid angle for each patch in steradians.
skyalt : ndarray
Unique altitude values.
patch_altitude : ndarray
Altitude of each patch.
"""
deg2rad = np.pi / 180
# Map each altitude to its patch count using a dict for O(1) lookup.
# This replaces the original loop logic that used np.any(skyalt == patch_alt_i)
# followed by skyalt_c[match_idx][0] for each iteration.
skyalt, skyalt_c = np.unique(L_patches[:, 0], return_counts=True)
alt_to_count = dict(zip(skyalt, skyalt_c))
patch_altitude = L_patches[:, 0]
patch_alt_0 = patch_altitude[0] # Computed once, was inside loop before
steradian = np.zeros(patch_altitude.shape[0])
for i in range(patch_altitude.shape[0]):
patch_alt_i = patch_altitude[i]
skyalt_count = alt_to_count[patch_alt_i] # O(1) dict lookup
if skyalt_count > 1:
# Multiple patches in this altitude band
steradian[i] = ((360 / skyalt_count) * deg2rad) * (np.sin((patch_alt_i + patch_alt_0) * deg2rad) \
- np.sin((patch_alt_i - patch_alt_0) * deg2rad))
else:
# Single patch in band (zenith)
patch_alt_prev = patch_altitude[i - 1]
steradian[i] = ((360 / skyalt_count) * deg2rad) * (np.sin((patch_alt_i) * deg2rad) \
- np.sin((patch_alt_prev + patch_alt_0) * deg2rad))
return steradian, skyalt, patch_altitudeKnown Limitation
The function requires patches to be sorted by altitude with each band grouped together. This matches create_patches() output.
Next Steps
If you're happy with these changes, I can either:
- Push a commit to your branch with the updated function and tests, or
- You can copy the code from the collapsible sections below.
Let me know which works best. It would also be good to get a review from @biglimp.
Suggested Code
patch_steradians function (click to expand)
def patch_steradians(L_patches):
"""Calculate the steradian (solid angle) for each sky patch.
Parameters
----------
L_patches : ndarray
Array of shape (n_patches, 3) where column 0 contains patch altitudes.
Patches must be sorted by altitude with each band grouped together.
Returns
-------
steradian : ndarray
Solid angle for each patch in steradians.
skyalt : ndarray
Unique altitude values.
patch_altitude : ndarray
Altitude of each patch.
"""
deg2rad = np.pi / 180
# Map each altitude to its patch count using a dict for O(1) lookup.
# This replaces the original loop logic that used np.any(skyalt == patch_alt_i)
# followed by skyalt_c[match_idx][0] for each iteration.
skyalt, skyalt_c = np.unique(L_patches[:, 0], return_counts=True)
alt_to_count = dict(zip(skyalt, skyalt_c))
patch_altitude = L_patches[:, 0]
patch_alt_0 = patch_altitude[0] # Computed once, was inside loop before
steradian = np.zeros(patch_altitude.shape[0])
for i in range(patch_altitude.shape[0]):
patch_alt_i = patch_altitude[i]
skyalt_count = alt_to_count[patch_alt_i] # O(1) dict lookup
if skyalt_count > 1:
# Multiple patches in this altitude band
steradian[i] = ((360 / skyalt_count) * deg2rad) * (np.sin((patch_alt_i + patch_alt_0) * deg2rad) \
- np.sin((patch_alt_i - patch_alt_0) * deg2rad))
else:
# Single patch in band (zenith)
patch_alt_prev = patch_altitude[i - 1]
steradian[i] = ((360 / skyalt_count) * deg2rad) * (np.sin((patch_alt_i) * deg2rad) \
- np.sin((patch_alt_prev + patch_alt_0) * deg2rad))
return steradian, skyalt, patch_altitudeTest file: tests/test_patch_steradians.py (click to expand)
import numpy as np
import pytest
from umep.functions.SOLWEIGpython.patch_radiation import patch_steradians
from umep.util.SEBESOLWEIGCommonFiles.create_patches import create_patches
def _build_L_patches(patch_option):
"""Helper to build L_patches array from a patch configuration."""
skyvaultalt, skyvaultazi, *_ = create_patches(patch_option=patch_option)
return np.column_stack([skyvaultalt.flatten(), skyvaultazi.flatten(), np.zeros(skyvaultalt.size)])
# =============================================================================
# Patch count tests
# =============================================================================
def test_patch_count_option1_robinson_stone():
"""Test 145-patch Robinson & Stone (2004) configuration."""
L_patches = _build_L_patches(patch_option=1)
steradian, _, patch_altitude = patch_steradians(L_patches)
assert len(steradian) == 145
assert len(patch_altitude) == 145
def test_patch_count_option2_wallenberg():
"""Test 153-patch Wallenberg et al. (2022) configuration."""
L_patches = _build_L_patches(patch_option=2)
steradian, _, patch_altitude = patch_steradians(L_patches)
assert len(steradian) == 153
assert len(patch_altitude) == 153
def test_patch_count_option3():
"""Test 305-patch configuration."""
L_patches = _build_L_patches(patch_option=3)
steradian, _, patch_altitude = patch_steradians(L_patches)
assert len(steradian) == 305
assert len(patch_altitude) == 305
def test_patch_count_option4():
"""Test 609-patch configuration."""
L_patches = _build_L_patches(patch_option=4)
steradian, _, patch_altitude = patch_steradians(L_patches)
assert len(steradian) == 609
assert len(patch_altitude) == 609
# =============================================================================
# Positive values tests
# =============================================================================
def test_positive_values_option1():
"""Test all steradians are positive for option 1."""
L_patches = _build_L_patches(patch_option=1)
steradian, _, _ = patch_steradians(L_patches)
assert (steradian > 0).all()
assert not np.isnan(steradian).any()
def test_positive_values_option2():
"""Test all steradians are positive for option 2."""
L_patches = _build_L_patches(patch_option=2)
steradian, _, _ = patch_steradians(L_patches)
assert (steradian > 0).all()
assert not np.isnan(steradian).any()
def test_positive_values_option3():
"""Test all steradians are positive for option 3."""
L_patches = _build_L_patches(patch_option=3)
steradian, _, _ = patch_steradians(L_patches)
assert (steradian > 0).all()
assert not np.isnan(steradian).any()
def test_positive_values_option4():
"""Test all steradians are positive for option 4."""
L_patches = _build_L_patches(patch_option=4)
steradian, _, _ = patch_steradians(L_patches)
assert (steradian > 0).all()
assert not np.isnan(steradian).any()
# =============================================================================
# Hemisphere sum tests (total solid angle ≈ 2π)
# =============================================================================
def test_hemisphere_sum_option1():
"""Test total solid angle approximates 2*pi for option 1."""
L_patches = _build_L_patches(patch_option=1)
steradian, _, _ = patch_steradians(L_patches)
total = steradian.sum()
assert np.isclose(total, 2 * np.pi, rtol=0.05), f"Total {total:.4f} not close to 2π"
def test_hemisphere_sum_option2():
"""Test total solid angle approximates 2*pi for option 2."""
L_patches = _build_L_patches(patch_option=2)
steradian, _, _ = patch_steradians(L_patches)
total = steradian.sum()
assert np.isclose(total, 2 * np.pi, rtol=0.05), f"Total {total:.4f} not close to 2π"
def test_hemisphere_sum_option3():
"""Test total solid angle approximates 2*pi for option 3."""
L_patches = _build_L_patches(patch_option=3)
steradian, _, _ = patch_steradians(L_patches)
total = steradian.sum()
assert np.isclose(total, 2 * np.pi, rtol=0.05), f"Total {total:.4f} not close to 2π"
def test_hemisphere_sum_option4():
"""Test total solid angle approximates 2*pi for option 4."""
L_patches = _build_L_patches(patch_option=4)
steradian, _, _ = patch_steradians(L_patches)
total = steradian.sum()
assert np.isclose(total, 2 * np.pi, rtol=0.05), f"Total {total:.4f} not close to 2π"
# =============================================================================
# Zenith patch tests
# =============================================================================
def test_zenith_patch_smallest_option1():
"""Test zenith patch (90°) has smallest steradian for option 1."""
L_patches = _build_L_patches(patch_option=1)
steradian, _, patch_altitude = patch_steradians(L_patches)
zenith_idx = np.where(patch_altitude == 90)[0]
assert len(zenith_idx) == 1
assert steradian[zenith_idx[0]] == steradian.min()
def test_zenith_patch_smallest_option2():
"""Test zenith patch (90°) has smallest steradian for option 2."""
L_patches = _build_L_patches(patch_option=2)
steradian, _, patch_altitude = patch_steradians(L_patches)
zenith_idx = np.where(patch_altitude == 90)[0]
assert len(zenith_idx) == 1
assert steradian[zenith_idx[0]] == steradian.min()
def test_zenith_patch_exists_option1():
"""Test exactly one zenith patch exists for option 1."""
L_patches = _build_L_patches(patch_option=1)
steradian, _, patch_altitude = patch_steradians(L_patches)
zenith_idx = np.where(patch_altitude == 90)[0]
assert len(zenith_idx) == 1
assert steradian[zenith_idx[0]] > 0
def test_zenith_patch_exists_option2():
"""Test exactly one zenith patch exists for option 2."""
L_patches = _build_L_patches(patch_option=2)
steradian, _, patch_altitude = patch_steradians(L_patches)
zenith_idx = np.where(patch_altitude == 90)[0]
assert len(zenith_idx) == 1
assert steradian[zenith_idx[0]] > 0
def test_zenith_patch_exists_option3():
"""Test exactly one zenith patch exists for option 3."""
L_patches = _build_L_patches(patch_option=3)
steradian, _, patch_altitude = patch_steradians(L_patches)
zenith_idx = np.where(patch_altitude == 90)[0]
assert len(zenith_idx) == 1
assert steradian[zenith_idx[0]] > 0
def test_zenith_patch_exists_option4():
"""Test exactly one zenith patch exists for option 4."""
L_patches = _build_L_patches(patch_option=4)
steradian, _, patch_altitude = patch_steradians(L_patches)
zenith_idx = np.where(patch_altitude == 90)[0]
assert len(zenith_idx) == 1
assert steradian[zenith_idx[0]] > 0
# =============================================================================
# Return type and basic behavior tests
# =============================================================================
def test_returns_correct_types():
"""Test that return types are correct numpy arrays."""
L_patches = _build_L_patches(patch_option=1)
steradian, skyalt, patch_altitude = patch_steradians(L_patches)
assert isinstance(steradian, np.ndarray)
assert isinstance(skyalt, np.ndarray)
assert isinstance(patch_altitude, np.ndarray)
def test_empty_input_raises_error():
"""Test that empty input raises an appropriate error."""
L_patches_empty = np.zeros((0, 3))
with pytest.raises(IndexError):
patch_steradians(L_patches_empty)
def test_single_patch():
"""Test behavior with a single patch (edge case)."""
L_patches = np.array([[90.0, 0.0, 0.0]])
steradian, skyalt, patch_altitude = patch_steradians(L_patches)
assert len(steradian) == 1
assert steradian[0] > 0
assert skyalt[0] == 90.0
def test_no_inf_values():
"""Test that no infinite values are produced for any configuration."""
for patch_option in [1, 2, 3, 4]:
L_patches = _build_L_patches(patch_option=patch_option)
steradian, _, _ = patch_steradians(L_patches)
assert not np.isinf(steradian).any(), f"Option {patch_option}: found infinite values"
def test_deterministic():
"""Test that results are deterministic across multiple calls."""
L_patches = _build_L_patches(patch_option=1)
result1 = patch_steradians(L_patches)
result2 = patch_steradians(L_patches)
np.testing.assert_array_equal(result1[0], result2[0])
np.testing.assert_array_equal(result1[1], result2[1])
np.testing.assert_array_equal(result1[2], result2[2])
def test_unique_altitudes_match():
"""Test that returned skyalt matches unique values from input."""
for patch_option in [1, 2, 3, 4]:
L_patches = _build_L_patches(patch_option=patch_option)
_, skyalt, _ = patch_steradians(L_patches)
expected_unique = np.unique(L_patches[:, 0])
np.testing.assert_array_equal(skyalt, expected_unique)
def test_altitude_preserved():
"""Test that patch_altitude output matches input column."""
L_patches = _build_L_patches(patch_option=1)
_, _, patch_altitude = patch_steradians(L_patches)
np.testing.assert_array_equal(patch_altitude, L_patches[:, 0])
# =============================================================================
# Band consistency tests
# =============================================================================
def test_band_consistency_option1():
"""Test patches in same altitude band have equal steradians for option 1."""
L_patches = _build_L_patches(patch_option=1)
steradian, skyalt, patch_altitude = patch_steradians(L_patches)
for alt in skyalt:
mask = patch_altitude == alt
steradians_at_alt = steradian[mask]
assert np.allclose(steradians_at_alt, steradians_at_alt[0])
def test_band_consistency_option2():
"""Test patches in same altitude band have equal steradians for option 2."""
L_patches = _build_L_patches(patch_option=2)
steradian, skyalt, patch_altitude = patch_steradians(L_patches)
for alt in skyalt:
mask = patch_altitude == alt
steradians_at_alt = steradian[mask]
assert np.allclose(steradians_at_alt, steradians_at_alt[0])
def test_band_consistency_option3():
"""Test patches in same altitude band have equal steradians for option 3."""
L_patches = _build_L_patches(patch_option=3)
steradian, skyalt, patch_altitude = patch_steradians(L_patches)
for alt in skyalt:
mask = patch_altitude == alt
steradians_at_alt = steradian[mask]
assert np.allclose(steradians_at_alt, steradians_at_alt[0])
def test_band_consistency_option4():
"""Test patches in same altitude band have equal steradians for option 4."""
L_patches = _build_L_patches(patch_option=4)
steradian, skyalt, patch_altitude = patch_steradians(L_patches)
for alt in skyalt:
mask = patch_altitude == alt
steradians_at_alt = steradian[mask]
assert np.allclose(steradians_at_alt, steradians_at_alt[0])
def test_lower_bands_larger():
"""Test that lower altitude bands have larger steradians (more sky area near horizon)."""
L_patches = _build_L_patches(patch_option=1)
steradian, skyalt, patch_altitude = patch_steradians(L_patches)
# Get mean steradian per band (excluding zenith which is special case)
band_steradians = []
for alt in sorted(skyalt)[:-1]: # Exclude zenith (90 degrees)
mask = patch_altitude == alt
band_steradians.append((alt, steradian[mask].mean()))
# Higher altitude bands shouldn't be drastically larger than lower bands
for i in range(len(band_steradians) - 1):
alt_low, sr_low = band_steradians[i]
alt_high, sr_high = band_steradians[i + 1]
assert sr_high <= sr_low * 2.5, f"Band at {alt_high}° has unexpectedly large steradian compared to {alt_low}°"
# =============================================================================
# Additional edge case tests
# =============================================================================
def test_two_patches_same_altitude():
"""Test with exactly two patches at the same altitude (skyalt_count > 1 branch)."""
# Two patches at 45 degrees
L_patches = np.array([
[45.0, 0.0, 0.0],
[45.0, 180.0, 0.0],
])
steradian, skyalt, patch_altitude = patch_steradians(L_patches)
assert len(steradian) == 2
assert (steradian > 0).all()
# Both patches at same altitude should have equal steradians
assert steradian[0] == steradian[1]
def test_multiple_altitude_bands_sorted():
"""Test with multiple altitude bands, each with multiple patches.
NOTE: The function assumes patches are sorted by altitude in ascending order,
with each altitude band's patches grouped together. This matches the output
of create_patches(). The altitude values must follow the expected spacing
pattern (first band at 6° with 12° spacing) for correct steradian calculation.
"""
# Use realistic altitude values matching create_patches pattern:
# 6°, 18°, 30°, ... with 12° spacing
# 3 patches at 6°, 2 patches at 18°, 1 patch at 90° (sorted, grouped)
L_patches = np.array([
[6.0, 0.0, 0.0],
[6.0, 120.0, 0.0],
[6.0, 240.0, 0.0],
[18.0, 0.0, 0.0],
[18.0, 180.0, 0.0],
[90.0, 0.0, 0.0],
])
steradian, skyalt, patch_altitude = patch_steradians(L_patches)
assert len(steradian) == 6
assert len(skyalt) == 3 # Three unique altitudes
assert (steradian > 0).all()
# Check band consistency
assert steradian[0] == steradian[1] == steradian[2] # 6° band
assert steradian[3] == steradian[4] # 18° band
def test_input_not_mutated():
"""Test that the input array is not modified by the function."""
L_patches = _build_L_patches(patch_option=1)
L_patches_copy = L_patches.copy()
patch_steradians(L_patches)
np.testing.assert_array_equal(L_patches, L_patches_copy)
def test_horizon_altitude():
"""Test with patches near the horizon (0 degrees altitude)."""
# Patches near horizon - this tests the sin calculation at low angles
L_patches = np.array([
[6.0, 0.0, 0.0],
[6.0, 90.0, 0.0],
[6.0, 180.0, 0.0],
[6.0, 270.0, 0.0],
])
steradian, _, _ = patch_steradians(L_patches)
assert len(steradian) == 4
assert (steradian > 0).all()
assert not np.isnan(steradian).any()
def test_unsorted_altitudes_known_limitation():
"""Document that the function requires sorted, grouped altitude input.
NOTE: This test documents a known limitation. The function assumes patches
are sorted by altitude in ascending order with each band grouped together.
Unsorted input produces incorrect results. This matches how create_patches()
generates data, so it's not a problem in practice.
"""
# Deliberately unsorted: 60°, 30°, 90°, 30°
L_patches = np.array([
[60.0, 0.0, 0.0],
[30.0, 0.0, 0.0],
[90.0, 0.0, 0.0],
[30.0, 180.0, 0.0],
])
steradian, skyalt, patch_altitude = patch_steradians(L_patches)
# Function runs but produces incorrect results for unsorted input
# This documents the limitation rather than asserting correct behavior
assert len(steradian) == 4
# Note: steradians may be negative or zero for unsorted input - this is expected
# The function is designed for sorted input from create_patches()
def test_float_precision():
"""Test that near-identical float altitudes are handled correctly."""
L_patches = _build_L_patches(patch_option=1)
# Verify no floating point comparison issues
steradian, skyalt, patch_altitude = patch_steradians(L_patches)
# Each unique altitude should map to consistent count
for alt in skyalt:
count = (patch_altitude == alt).sum()
assert count > 0, f"Altitude {alt} has no matching patches"
Refactor steradian calculation to use scalar values and improve clarity.