diff --git a/pyproject.toml b/pyproject.toml index 3d59f0e7..7cfef2d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ dependencies = [ "astropy>=6.0", "attrs>=25.3.0", "numpy>=2", - "powerbox>=0.8.2", + "powerbox>=0.9.0", "scipy>=1.15.2", "deprecation", ] diff --git a/src/tuesday/core/summaries/powerspectra.py b/src/tuesday/core/summaries/powerspectra.py index c4ae8dee..481ccf9a 100644 --- a/src/tuesday/core/summaries/powerspectra.py +++ b/src/tuesday/core/summaries/powerspectra.py @@ -2,6 +2,7 @@ import warnings from collections.abc import Callable +from typing import Literal import astropy.units as un import numpy as np @@ -19,6 +20,8 @@ from ..units import validate from .psclasses import CylindricalPS, SphericalPS +interp_type = Literal["linear", "nan-aware"] | None + def get_chunk_indices( lc_redshifts: np.ndarray, @@ -81,7 +84,7 @@ def calculate_ps( k_bins: int | None = None, k_weights_1d: Callable | None = ignore_zero_ki, bin_ave: bool | None = True, - interp: bool | None = None, + interp: interp_type = None, prefactor_fnc: Callable | None = power2delta, interp_points_generator: Callable | None = None, get_variance: bool | None = False, @@ -121,9 +124,12 @@ def calculate_ps( If False, return the left edge of each bin i.e. len(kperp) = ps_2d.shape[0] + 1. interp : str, optional - If True, use linear interpolation to calculate the PS + If 'linear', use linear interpolation to calculate the PS at the points specified by interp_points_generator. - Note that this significantly slows down the calculation. + If 'nan-aware', use linear interpolation that ignores NaN values in + the input PS. + Note that interpolating significantly slows down the calculation. + Default is None, which means no interpolation. prefactor_fnc : callable, optional A function that takes a frequency tuple and returns the prefactor to multiply the PS with. @@ -148,13 +154,14 @@ def calculate_ps( if not calc_1d and not calc_2d: raise ValueError("At least one of calc_1d or calc_2d must be True.") - if not interp: - interp = None if not isinstance(chunk, un.Quantity): raise TypeError("chunk should be a Quantity.") if not isinstance(box_length, un.Quantity): raise TypeError("box_length should be a Quantity.") + + if interp is not None and interp not in ["linear", "nan-aware"]: + raise ValueError("interp should be either 'linear', 'nan-aware', or None.") # Split the lightcone into chunks for each redshift bin # Infer HII_DIM from lc side shape box_side_shape = chunk.shape[0] @@ -167,9 +174,6 @@ def calculate_ps( if calc_2d: out["ps_2d"] = {} - if interp: - interp = "linear" - if prefactor_fnc is None: ps_unit = chunk.unit**2 * box_length.unit**3 elif prefactor_fnc == power2delta: @@ -199,15 +203,10 @@ def calculate_ps( k_weights=k_weights_2d, prefactor_fnc=prefactor_fnc, interpolation_method=interp, - return_sumweights=True, get_variance=get_variance, bins_upto_boxlen=True, ) - if get_variance: - ps_2d, kperp, variance, nmodes, kpar = results - lc_var_2d = variance - else: - ps_2d, kperp, nmodes, kpar = results + ps_2d, kperp, lc_var_2d, nmodes, kpar = results kpar = np.array(kpar).squeeze() lc_ps_2d = ps_2d[..., kpar > 0] @@ -226,7 +225,7 @@ def calculate_ps( if calc_1d: results = get_power( - chunk, + chunk.value, ( box_length.value, box_length.value, @@ -239,16 +238,10 @@ def calculate_ps( prefactor_fnc=prefactor_fnc, interpolation_method=interp, interp_points_generator=interp_points_generator, - return_sumweights=True, get_variance=get_variance, bins_upto_boxlen=True, ) - if get_variance: - ps_1d, k, var_1d, nmodes_1d = results - lc_var_1d = var_1d - else: - ps_1d, k, nmodes_1d = results - lc_ps_1d = ps_1d + lc_ps_1d, k, lc_var_1d, nmodes_1d = results ps1d = SphericalPS( ps=lc_ps_1d * ps_unit, @@ -280,7 +273,7 @@ def calculate_ps_lc( k_bins: int | None = None, mu_min: float | None = None, bin_ave: bool = True, - interp: bool | None = None, + interp: str | None = None, deltasq: bool = True, interp_points_generator: Callable | None = None, get_variance: bool = False, @@ -314,6 +307,10 @@ def calculate_ps_lc( the power any k_i = 0 mode. Typically, only the central zero mode |k| = 0 is excluded, in which case use powerbox.tools.ignore_zero_absk. + log_bins : bool, optional + If True, use logarithmic binning for kperp and kpar. + Note that if the bins are already provided with kperp_bins or k_bins, + this argument has no effect. calc_1d : bool, optional If True, calculate the 1D power spectrum. k_bins : int, optional @@ -329,9 +326,12 @@ def calculate_ps_lc( If False, return the left edge of each bin i.e. len(kperp) = ps_2d.shape[0] + 1. interp : str, optional - If True, use linear interpolation to calculate the PS + If 'linear', use linear interpolation to calculate the PS at the points specified by interp_points_generator. - Note that this significantly slows down the calculation. + If 'nan-aware', use linear interpolation that ignores NaN values in + the input PS. + Note that interpolating significantly slows down the calculation. + Default is None, which means no interpolation. delta : bool, optional Whether to convert the power P [mK^2 Mpc^{-3}] to the dimensionless power :math:`\\delta^2` [mK^2]. @@ -449,7 +449,7 @@ def calculate_ps_coeval( k_bins: int | None = None, mu_min: float | None = None, bin_ave: bool | None = True, - interp: bool | None = None, + interp: interp_type = None, deltasq: bool | None = True, interp_points_generator: Callable | None = None, get_variance: bool | None = False, @@ -482,6 +482,10 @@ def calculate_ps_coeval( the power any k_i = 0 mode. Typically, only the central zero mode |k| = 0 is excluded, in which case use powerbox.tools.ignore_zero_absk. + log_bins : bool, optional + If True, use logarithmic binning for kperp and kpar. + Note that if the bins are already provided with kperp_bins or k_bins, + this argument has no effect. calc_1d : bool, optional If True, calculate the 1D power spectrum. k_bins : int, optional @@ -497,9 +501,9 @@ def calculate_ps_coeval( If False, return the left edge of each bin i.e. len(kperp) = ps_2d.shape[0] + 1. interp : str, optional - If True, use linear interpolation to calculate the PS - at the points specified by interp_points_generator. - Note that this significantly slows down the calculation. + Supports 'linear', 'nan-aware', and None. + Check powerbox.tools.get_power documentation for more details + on interpolation options. delta : bool, optional Whether to convert the power P [mK^2 Mpc^{-3}] to the dimensionless power :math:`\\delta^2` [mK^2]. @@ -562,11 +566,8 @@ def mask_fnc(freq, absk): k_weights_1d = mask_fnc if interp is not None: - k_weights_1d = ignore_zero_ki - interp_points_generator = above_mu_min_angular_generator(mu=mu_min) else: - k_weights_1d = ignore_zero_ki if interp is not None: interp_points_generator = regular_angular_generator() prefactor_fnc = power2delta if deltasq else None @@ -600,7 +601,7 @@ def mask_fnc(freq, absk): def bin_kpar( bins_kpar: int | un.Quantity | None = None, log_kpar: bool | None = False, - interp_kpar: bool | None = False, + interp_kpar: Literal["linear"] | None = None, crop_kperp: tuple[int, int] | None = None, crop_kpar: tuple[int, int] | None = None, ): @@ -616,9 +617,9 @@ def bin_kpar( log_kpar : bool or None, optional If True, use logarithmic binning for kpar. If False or None, use linear binning. Default is False. - interp_kpar : bool or None, optional - If True, interpolate the power spectrum onto the new kpar bins. - If False or None, aggregate using bin means. Default is False. + interp_kpar : str or None, optional + If 'linear', interpolate the power spectrum onto the new kpar bins. + If None, aggregate using bin means. Default is None. crop_kperp : tuple of int or None, optional Tuple specifying the (start, end) indices to crop the kperp axis after binning. If None, no cropping is applied. Default is None. @@ -639,9 +640,9 @@ def bin_kpar( Notes ----- - - If `interp_kpar` is True, the power spectrum and its variance (if present) are + - If `interp_kpar` is 'linear', the power spectrum and its variance (if present) are interpolated onto the new kpar bins. - - If `interp_kpar` is False, the power spectrum and its variance are aggregated + - If `interp_kpar` is None, the power spectrum and its variance are aggregated using the mean within each bin. - Cropping is applied after binning/interpolation. """ @@ -677,7 +678,7 @@ def transform_ps(ps: CylindricalPS): if not isinstance(bins_kpar, np.ndarray): raise ValueError("bins_kpar must be an array of bin edges or centres.") final_bins_kpar = bins_kpar - if interp_kpar: + if interp_kpar == "linear": mask = np.isnan(np.nanmean(ps.ps, axis=-1)) interp_fnc = RegularGridInterpolator( (ps.kperp.value[~mask], ps.kpar.value), @@ -754,11 +755,17 @@ def transform_ps(ps: CylindricalPS): if crop_kpar is not None else final_nmodes ) - kpar_grid, kperp_grid = np.meshgrid( + + kpar_nmodes_grid, kperp_nmodes_grid = np.meshgrid( final_kperp_modes, final_kpar_modes, indexing="ij" ) - final_nmodes = np.sqrt(kperp_grid**2 + kpar_grid**2) + # In a log kperp and linear kpar binning, + # the number of modes in each bin = the number of modes in each kperp bin + # (since there is one mode in each kpar bin) + # In a log-log binning, the number of modes in each bin is + # the number of modes in each kperp bin * the number of modes in each kpar bin + final_nmodes = kperp_nmodes_grid * kpar_nmodes_grid return CylindricalPS( ps=final_ps, @@ -778,15 +785,15 @@ def transform_ps(ps: CylindricalPS): def cylindrical_to_spherical( - ps, - kperp, - kpar, - nbins=16, - weights=1, - interp=False, - mu_min=None, - generator=None, - bin_ave=True, + ps: np.ndarray, + kperp: np.ndarray, + kpar: np.ndarray, + nbins: int = 16, + weights: float | np.ndarray = 1.0, + interp: interp_type = None, + mu_min: float | None = None, + generator: Callable | None = None, + bin_ave: bool = True, ): r""" Angularly average 2D PS to 1D PS. @@ -806,8 +813,11 @@ def cylindrical_to_spherical( Note that to obtain a 1D PS from the 2D PS that is consistent with the 1D PS obtained directly from the 3D PS, the weights should be the number of modes in each bin of the 2D PS (`Nmodes`). - interp : bool, optional - If True, use linear interpolation to calculate the 1D PS. + interp : str | None, optional + If 'linear', use linear interpolation to calculate the 1D PS. + If 'nan-aware', use linear interpolation that ignores NaN values in + the input PS. + If None, no interpolation is used. mu_min : float, optional The minimum value of :math:`\\cos(\theta), \theta = \arctan (k_\\perp/k_\\parallel)` @@ -822,24 +832,23 @@ def cylindrical_to_spherical( If False, return the left edge of each bin i.e. len(k) = ps_1d.shape[0] + 1. """ - if mu_min is not None and interp and generator is None: + if mu_min is not None and interp is not None and generator is None: generator = above_mu_min_angular_generator(mu=mu_min) - if mu_min is not None and not interp: + if mu_min is not None and interp is None: kpar_mesh, kperp_mesh = np.meshgrid(kpar, kperp) theta = np.arctan(kperp_mesh / kpar_mesh) mu_mesh = np.cos(theta) weights = mu_mesh >= mu_min - ps_1d, k, sws = angular_average( + ps_1d, k, _, sws = angular_average( ps, coords=[kperp, kpar], bins=nbins, weights=weights, bin_ave=bin_ave, log_bins=True, - return_sumweights=True, - interpolation_method="linear" if interp else None, + interpolation_method=interp, interp_points_generator=generator, ) return ps_1d, k, sws diff --git a/tests/test_plotting_ps.py b/tests/test_plotting_ps.py index 42936e4e..491fdd6b 100644 --- a/tests/test_plotting_ps.py +++ b/tests/test_plotting_ps.py @@ -26,7 +26,7 @@ def _psboth(): box_length=200 * un.Mpc, calc_2d=True, calc_1d=True, - interp=True, + interp="linear", ) return ps1d, ps2d diff --git a/tests/test_ps.py b/tests/test_ps.py index a58d86fe..8a04a839 100644 --- a/tests/test_ps.py +++ b/tests/test_ps.py @@ -96,10 +96,10 @@ def test_calculate_ps_lc(log_bins, test_lc, test_redshifts): test_redshifts, ps_redshifts=6.8, calc_1d=False, - interp=True, + interp="nan-aware", mu_min=0.5, log_bins=log_bins, - transform_ps2d=bin_kpar(bins_kpar=10, log_kpar=True, interp_kpar=True), + transform_ps2d=bin_kpar(bins_kpar=10, log_kpar=True, interp_kpar="linear"), ) def transform1d(ps): @@ -115,7 +115,7 @@ def transform1d(ps): transform_ps2d=bin_kpar( bins_kpar=None, log_kpar=False, - interp_kpar=False, + interp_kpar=None, crop_kpar=(0, 3), crop_kperp=(0, 8), ), @@ -124,13 +124,28 @@ def transform1d(ps): def test_calculate_ps_coeval(test_coeval): - with np.testing.assert_raises(TypeError): + with pytest.raises( + TypeError, + match=r"calculate_ps_coeval\(\) got an unexpected keyword" + r" argument 'ps_redshifts'", + ): calculate_ps_coeval( test_coeval * un.mK, box_length=200 * un.Mpc, ps_redshifts=6.8, calc_1d=False, - interp=True, + interp="nan-aware", + mu_min=0.5, + ) + + with pytest.raises( + ValueError, match=r"interp should be either 'linear', 'nan-aware', or None." + ): + calculate_ps_coeval( + test_coeval * un.mK, + box_length=200 * un.Mpc, + calc_1d=False, + interp="ultimate-interp", mu_min=0.5, ) @@ -141,12 +156,12 @@ def transform1d(ps): test_coeval * un.dimensionless_unscaled, box_length=200 * un.Mpc, calc_1d=False, - interp=True, + interp="linear", mu_min=0.5, transform_ps2d=bin_kpar( bins_kpar=np.array([0.1, 0.5, 1]) / un.Mpc, log_kpar=True, - interp_kpar=True, + interp_kpar="linear", crop_kpar=(0, 3), crop_kperp=(0, 8), ), @@ -174,7 +189,7 @@ def test_calculate_ps_corner_cases(test_lc, test_redshifts): 200 * un.Mpc, test_redshifts, calc_1d=True, - interp=True, + interp="linear", mu_min=0.5, deltasq=True, ) @@ -184,7 +199,7 @@ def test_calculate_ps_corner_cases(test_lc, test_redshifts): 200 * un.Mpc, test_redshifts, calc_1d=True, - interp=True, + interp="linear", mu_min=0.5, deltasq=False, ) @@ -235,6 +250,11 @@ def test_ps_avg(): mask = mu_mesh >= 0.9 ps_2d[mask] = 1000 ps, k, sws = cylindrical_to_spherical( - ps_2d, x, x, nbins=32, interp=True, mu_min=0.98 + ps_2d, x, x, nbins=32, interp="nan-aware", mu_min=0.98 + ) + assert np.nanmean(ps[-20:]) == 1000.0 + + ps, k, sws = cylindrical_to_spherical( + ps_2d, x, x, nbins=32, interp=None, mu_min=0.98 ) assert np.nanmean(ps[-20:]) == 1000.0