diff --git a/echopop/ingest/sv.py b/echopop/ingest/sv.py index 2de99c18..cdb85df7 100644 --- a/echopop/ingest/sv.py +++ b/echopop/ingest/sv.py @@ -1,8 +1,9 @@ from pathlib import Path -from typing import Any, Dict, Literal, Optional, Tuple +from typing import Any, Dict, Literal, Optional, Tuple, Union import numpy as np import pandas as pd +import xarray as xr from . import nasc @@ -400,12 +401,19 @@ def aggregate_transects( "thickness_mean" ].transform("sum") + # Define safe log10 function that handles zero/negative values + def safe_log10_sum(x): + sum_val = x.sum() + if sum_val <= 0: + return -999.0 # Use standard missing data indicator + return 10 * np.log10(sum_val) + # Aggregate the values over each interval data_pvt = data.groupby(["frequency", "transect_num"]).agg( { "longitude_weight": "sum", "latitude_weight": "sum", - "sv_L": lambda x: 10 * np.log10(x.sum()), + "sv_L": safe_log10_sum, "nasc": "sum", "thickness_interval": "mean", } @@ -531,13 +539,114 @@ def integrate_measurements( return sv_indexed, sv_coordinates +def df_to_xarray(df: pd.DataFrame) -> Union[xr.DataArray, xr.Dataset]: + """ + Convert a pandas DataFrame with MultiIndex index and columns to an xarray Dataset. + Handles both dense and sparse structures, and removes duplicate index level names. + """ + + # Return None if empty + if df is None or df.empty: + return None + + # Get index names + if isinstance(df.index, pd.MultiIndex): + index_names = list(df.index.names) + else: + index_names = [df.index.name] if df.index.name else ["index"] + + # Check for duplicate names in index + if len(index_names) != len(set(index_names)): + # ---- Keep only first occurrence of each name + seen = set() + unique_indices = [] + for i, name in enumerate(index_names): + if name not in seen: + seen.add(name) + unique_indices.append(i) + # ---- Drop duplicate columns that match index level names + df = df.copy() + df.index = df.index.droplevel( + [i for i in range(len(index_names)) if i not in unique_indices] + ) + index_names = [ + index_names[i] + for i in range(len(index_names)) + if i in [idx for idx, _ in enumerate(index_names) if index_names.index(_) == idx] + ] + + # For MultiIndex columns (dense array) + if isinstance(df.columns, pd.MultiIndex): + # ---- Get MultiIndex column names + column_names = list(df.columns.names) + # ---- Filter out non-numeric columns + numeric_vars = [ + var + for var in df.columns.get_level_values(0).unique() + if not all(df.columns[df.columns.get_level_values(0) == var].get_level_values(1) == "") + ] + # ---- Check if index is sparse with non-unique combinations + if len(index_names) > 1: + # ---- For sparse data, use a single point dimension + mindex_coords = xr.Coordinates.from_pandas_multiindex(df.index, "point") + ds = xr.Dataset( + {var: (["point", column_names[1]], df[var].values) for var in numeric_vars}, + coords={**mindex_coords, column_names[1]: df[numeric_vars[0]].columns.values}, + ) + else: + # ---- For dense data with unique indices, use separate dimensions + ds = xr.Dataset( + {var: (index_names + [column_names[1]], df[var].values) for var in numeric_vars}, + coords={ + **{name: df.index.get_level_values(name).unique() for name in index_names}, + column_names[1]: df[numeric_vars[0]].columns.values, + }, + ) + # ---- Add the uneven multiindexed columns as coordinates/attributes + to_add_vars = [ + var for var in df.columns.get_level_values(0).unique() if var not in numeric_vars + ] + # ---- Add as a coordinate along the index dimension + for var in to_add_vars: + # ---- Get column data + var_data = df[var] + # ---- If it's a DataFrame, get the first column + if isinstance(var_data, pd.DataFrame): + var_values = var_data.iloc[:, 0].values + # ---- If it is already a series + else: + var_values = var_data.values + # ---- Assign the coordinates using the extracted values + if len(index_names) > 1: + # ---- Sparse case + ds = ds.assign_coords({var: ("point", var_values)}) + else: + # ---- Dense case + ds = ds.assign_coords({var: (index_names, var_values)}) + return ds + # For MultiIndex index but single-level columns (sparse array) + elif isinstance(df.index, pd.MultiIndex): + # ---- Get unique column names + column_name = df.columns.name if df.columns.name else "column" + # ---- Create the DataArray + da = xr.DataArray( + df.values, + dims=["point", column_name], + coords={ + "point": pd.MultiIndex.from_tuples(df.index.to_list(), names=index_names), + column_name: df.columns.values, + }, + ) + return da + + def ingest_echoview_sv( sv_path: Path, center_frequencies: Optional[Dict[str, float]] = None, transect_pattern: Optional[str] = None, aggregate_method: Literal["cells", "interval", "transect"] = "cells", impute_coordinates: bool = True, -) -> Tuple[pd.DataFrame, pd.DataFrame]: +) -> Tuple[Union[xr.DataArray, xr.Dataset], Union[xr.DataArray, xr.Dataset]]: r""" Complete ingestion pipeline for Echoview Sv export data. @@ -563,11 +672,13 @@ def ingest_echoview_sv( Returns ------- - tuple[|pd.DataFrame|, |pd.DataFrame| or None] - - ``sv_integrated``: Spatially aggregated acoustic data with :class:`pandas.MultiIndex` - columns organized by measurement type and frequency. - - ``sv_coordinates``: Coordinate reference data for spatial analysis, or None if - coordinates unavailable. + tuple[xr.Dataset or xr.DataArray, xr.DataArray or None] + - ``sv_integrated``: Spatially aggregated acoustic data as xarray Dataset with data + variables for each measurement type (nasc, sv_mean, thickness_mean) organized by + frequency dimension. For sparse data (cells, intervals), uses a "point" dimension with + MultiIndex coordinates. + - ``sv_coordinates``: Coordinate reference data as xarray DataArray with "point" dimension + and frequency dimension, or None if coordinates unavailable. Raises ------ @@ -597,7 +708,18 @@ def ingest_echoview_sv( >>> sv_data, coords = ingest_echoview_sv( ... sv_path, frequencies, r"transect_(\d+)", "interval" ... ) + >>> print(sv_data) + + Dimensions: (point: 11608, frequency: 5) + Coordinates: + * point (point) MultiIndex + * frequency (frequency) float64 18000.0 38000.0 70000.0 120000.0 200000.0 + Data variables: + nasc (point, frequency) float64 ... + sv_mean (point, frequency) float64 ... + thickness_mean (point, frequency) float64 ... """ + # Validate directory existence if not sv_path.exists(): raise FileNotFoundError(f"The export file directory ({sv_path.as_posix()}) not found!") @@ -611,7 +733,11 @@ def ingest_echoview_sv( # Update the units for `center_frequencies` to match expected values from Echoview # ---- Hz -> kHz if center_frequencies is not None: - center_frequencies = {freq * 1e-3: value for freq, value in center_frequencies.items()} + center_frequencies_thresh = { + freq * 1e-3: value for freq, value in center_frequencies.items() + } + else: + center_frequencies_thresh = {} # Get the target Sv files sv_filepaths = {"cells": [p for p in sv_path.rglob("*.csv") if p.is_file()]} @@ -630,7 +756,7 @@ def ingest_echoview_sv( ) # Concatenate the files - sv = pd.concat( + sv_output = pd.concat( [ read_echoview_sv(row["file_path"], impute_coordinates, row["transect_num"]) for _, row in transect_num_df.iterrows() @@ -638,21 +764,21 @@ def ingest_echoview_sv( ) # Sort - nasc.sort_echoview_export_df(sv, inplace=True) + nasc.sort_echoview_export_df(sv_output, inplace=True) # Set min/max threshold for each frequency - if center_frequencies: - sv_subset = sv[sv["frequency"].isin(center_frequencies)] + if center_frequencies_thresh: + sv_subset = sv_output[sv_output["frequency"].isin(center_frequencies_thresh)] else: - sv_subset = sv.copy() - center_frequencies = { + sv_subset = sv_output.copy() + center_frequencies_thresh = { freq: {"min": -999.0, "max": 999.0} for freq in sv_subset["frequency"].unique() } # Integrate the backscatter based on the defined aggregation method sv_integrated, sv_coordinates = integrate_measurements( - data=sv_subset, method=aggregate_method, sv_thresholds=center_frequencies + data=sv_subset, method=aggregate_method, sv_thresholds=center_frequencies_thresh ) # Return - return sv_integrated, sv_coordinates + return df_to_xarray(sv_integrated), df_to_xarray(sv_coordinates) diff --git a/echopop/inversion/inversion_matrix.py b/echopop/inversion/inversion_matrix.py index 2778ed36..936056c6 100644 --- a/echopop/inversion/inversion_matrix.py +++ b/echopop/inversion/inversion_matrix.py @@ -1,12 +1,14 @@ import time import warnings -from typing import Any, Dict +from typing import Any, Dict, Union import numpy as np import pandas as pd +import xarray as xr from lmfit import Minimizer, Parameters from pydantic import ValidationError +from ..ingest.sv import df_to_xarray from ..validators.scattering_models import SCATTERING_MODEL_PARAMETERS from .inversion_base import InversionBase, InvParameters @@ -135,7 +137,7 @@ def prepare_minimizer( simulation_settings: Dict[str, Any], verbose: bool, **kwargs, -): +) -> Union[Minimizer, None]: """ Prepare lmfit.Minimizer objects for Monte Carlo optimization. @@ -192,36 +194,39 @@ def prepare_minimizer( Sv_measured = Sv_measured.to_numpy()[valid_idx].astype(float) # Initialize with Monte Carlo sampling - if simulation_settings["monte_carlo"]: - parameters_lmfit = monte_carlo_initialize( - parameters_lmfit, center_frequencies, Sv_measured, scattering_params, model_settings - ) - else: - parameters_lmfit = parameters_lmfit[0] - - # Set `iter_cb` if verbose - if verbose: - if "iter_cb" in simulation_settings and simulation_settings["iter_cb"]: - iter_cb_arg = simulation_settings["iter_cb"] + if len(Sv_measured) > 0: + if simulation_settings["monte_carlo"]: + parameters_lmfit = monte_carlo_initialize( + parameters_lmfit, center_frequencies, Sv_measured, scattering_params, model_settings + ) + else: + parameters_lmfit = parameters_lmfit[0] + + # Set `iter_cb` if verbose + if verbose: + if "iter_cb" in simulation_settings and simulation_settings["iter_cb"]: + iter_cb_arg = simulation_settings["iter_cb"] + else: + iter_cb_arg = mininizer_print_cb else: - iter_cb_arg = mininizer_print_cb + iter_cb_arg = None + # Generate `Minimizer` function class required for bounded optimization + return [ + Minimizer( + fit_Sv, + parameters_lmfit, + fcn_args=( + Sv_measured, + center_frequencies, + scattering_params, + model_settings, + ), + iter_cb=iter_cb_arg, + nan_policy="omit", + ) + ] else: - iter_cb_arg = None - # Generate `Minimizer` function class required for bounded optimization - return [ - Minimizer( - fit_Sv, - parameters_lmfit, - fcn_args=( - Sv_measured, - center_frequencies, - scattering_params, - model_settings, - ), - iter_cb=iter_cb_arg, - nan_policy="omit", - ) - ] + return None def fit_Sv( @@ -648,7 +653,7 @@ class InversionMatrix(InversionBase): def __new__( cls, - data: pd.DataFrame, + data: xr.Dataset, simulation_settings: Dict[str, Any], verbose: bool = True, ): @@ -668,7 +673,16 @@ def __new__( self = super().__new__(cls) # Update attributes - self.measurements = valid_args["data"].copy() + # ---- Format dataset to dataframe + data_fmt = valid_args["data"].copy().to_dataframe().reset_index() + # ---- Check for cases where a coordinate a singleton + unmapped_idx = list( + set(data_fmt.columns).intersection( + set(["transect_num", "interval", "layer", "longitude", "latitude", "frequency"]) + ) + ) + # ---- Set the index and rotate frequency + self.measurements = data_fmt.set_index(unmapped_idx).unstack("frequency") self.simulation_settings = valid_args["simulation_settings"] self.verbose = verbose @@ -677,7 +691,7 @@ def __new__( def __init__( self, - data: pd.DataFrame, + data: xr.Dataset, simulation_settings: Dict[str, Any], verbose: bool = True, ): @@ -690,9 +704,9 @@ def __init__( Parameters ---------- - data : pd.DataFrame - MultiIndex DataFrame containing acoustic measurements with required columns 'sv_mean', - 'nasc', and 'thickness_mean' at top level, and 'frequency' as nested index level. + data : xr.Dataset + Dataset containing acoustic measurements with required variables 'sv_mean', + 'nasc', and 'thickness_mean' at top level, and at least 'frequency' as a coordinate. simulation_settings : Dict[str, Any] Dictionary containing simulation configuration including: - environment: Environmental parameters (sound speed, density) @@ -871,7 +885,7 @@ def build_scattering_model( # Add the `lmfit.Minimizer` objects to the dataset self._set_minimizers() - def invert(self, optimization_kwargs: Dict[str, Any]): + def invert(self, optimization_kwargs: Dict[str, Any]) -> xr.Dataset: """ Execute acoustic scattering parameter inversion for all measurements. @@ -896,10 +910,11 @@ def invert(self, optimization_kwargs: Dict[str, Any]): Returns ------- - pd.DataFrame - DataFrame containing original acoustic measurements plus inversion - results in 'parameters' column. Each entry is an InvParameters - object with optimized parameter values and metadata. + xarray.Dataset + Dataset containing the original acoustic measurements (mean Sv and NASC), weighted + latitudes and longitudes, and layer thickness means as data variables. The increment(s) + used for integration (i.e., cells, interval, transect), frequency, and inversion + parameters are assigned as the coordinates. Notes ----- @@ -931,7 +946,7 @@ def invert(self, optimization_kwargs: Dict[str, Any]): ... } >>> results = inv_matrix.invert(optimization_config) >>> # Access optimized parameters for first measurement - >>> optimal_params = results.iloc[0]["parameters"] + >>> optimal_params = results.sel(transect_num=1)["parameters"].item() """ # Run optimization @@ -955,7 +970,9 @@ def invert(self, optimization_kwargs: Dict[str, Any]): # Bundle the parameter results and add to the output output["parameters"] = [ - InvParameters.from_series(row.drop("Q")) for _, row in inversion_results.iterrows() + InvParameters.from_series(row.drop("Q")) if not row.isna().all() else None + for _, row in inversion_results.iterrows() ] - return output + # Convert to xarray + return df_to_xarray(output) diff --git a/echopop/inversion/post_inversion.py b/echopop/inversion/post_inversion.py index 3b2b8ffe..32f4bb08 100644 --- a/echopop/inversion/post_inversion.py +++ b/echopop/inversion/post_inversion.py @@ -1,64 +1,70 @@ from typing import Literal import numpy as np -import pandas as pd +import xarray as xr from .utils import _extract_parameters_optimized def estimate_population( - inverted_data: pd.DataFrame, - nasc_data: pd.DataFrame, + inverted_data: xr.Dataset, + nasc_data: xr.DataArray, density_sw: float, reference_frequency: float, aggregate_method: Literal["cells", "interval", "transect"], -): +) -> xr.Dataset: """ Estimate population characteristics from inverted acoustic data. - This function computes population-level estimates including body size, - density, and biomass from inverted acoustic scattering parameters. - It combines acoustic data with biological parameter estimates to - provide comprehensive population assessments. + This function computes population-level estimates including body size, density, and biomass + from inverted acoustic scattering parameters. It combines acoustic data with biological + parameter estimates to provide comprehensive population assessments. Parameters ---------- - inverted_data : pd.DataFrame - DataFrame containing inverted scattering parameters with columns - including 'parameters' containing biological model parameters - nasc_data : pd.DataFrame - DataFrame containing Nautical Area Scattering Coefficient (NASC) - values with frequency as column level + inverted_data : xr.Dataset + Dataset containing inverted scattering parameters with 'parameters' coordinate containing + biological model parameters. + nasc_data : xr.DataArray + DataArray containing Nautical Area Scattering Coefficient (NASC) values with frequency + dimension. density_sw : float Seawater density in kg/m³, typically ~1026 kg/m³ reference_frequency : float - Reference acoustic frequency in Hz for population estimation + Reference acoustic frequency in Hz for population estimation. aggregate_method : Literal["cells", "interval", "transect"] Method for spatial aggregation of population estimates: + - "cells": Individual acoustic cells + - "interval": Depth/range intervals + - "transect": Survey transect lines Returns ------- - pd.DataFrame - Combined DataFrame containing: + xr.Dataset + Dataset containing: + - nasc: NASC values at reference frequency + - number_density: Estimated areal number density (individuals/m²) + - biomass_density: Estimated areal biomass density (kg/m²) - - Biological parameters: length_mean, radius_mean, body_volume, - body_density, body_weight, etc. + + - Biological parameters: length_mean, radius_mean, body_volume, body_density, + body_weight, etc. Notes ----- - The function assumes organisms can be modeled as uniformly bent cylinders - for volume estimation. Body weight calculation uses: + The function assumes organisms can be modeled as uniformly bent cylinders for volume + estimation. Body weight calculation uses: .. math:: W = \\rho_{sw} \\cdot g \\cdot \\pi r^2 L - where W is body weight, ρ_sw is seawater density, g is the relative - density factor, r is mean radius, and L is mean length. + where W is body weight, ρ_sw is seawater density, g is the relative density factor, r is mean + radius, and L is mean length. """ # Unpackage the parameters using optimized extraction @@ -76,82 +82,97 @@ def estimate_population( # Compute average body weight [single animal] parameters["body_weight"] = parameters["body_volume"] * parameters["body_density"] - # Subset the dataset further to the single frequency - acoustic_data = ( - inverted_data.loc[ - :, inverted_data.columns.get_level_values("frequency") == reference_frequency - ] - ).droplevel("frequency", axis=1) - - # Repeat the same for the coordinates - reference_nasc = ( - nasc_data.loc[:, nasc_data.columns.get_level_values("frequency") == reference_frequency] - )[reference_frequency].to_frame("nasc") + # Subset the datasets further to the single frequency + acoustic_data = inverted_data.sel(frequency=reference_frequency) + reference_nasc = nasc_data.sel(frequency=reference_frequency) # Calculate the areal number density if aggregate_method == "transect": - reference_nasc.loc[:, "number_density"] = calculate_transect_number_density( + number_density = calculate_transect_number_density( acoustic_data, reference_nasc, parameters - ).to_numpy() - else: - reference_nasc["number_density"] = calculate_intervals_number_density( - acoustic_data, parameters ) + else: + number_density = calculate_intervals_number_density(acoustic_data, parameters) + + # Convert NASC to drop 'point' coordinate + nasc_cnv = reference_nasc.to_series().to_frame("nasc").reset_index() + + # Convert the number densities and parameter sets and drop the point coordinate + number_density_cnv = number_density.to_series().to_frame("number_density").reset_index() + + # Get the shared column names (excluding 'point') + shared_cols = [ + col for col in number_density_cnv.columns if col in nasc_cnv.columns and col != "point" + ] + + # Do a left-join to broadcast density values + merged_dens = nasc_cnv.merge(number_density_cnv, on=shared_cols, how="left").fillna(0.0) + + # Reformat to Dataset structure + # ---- Get overlapping coordinates + overlapping_coords = [c for c in reference_nasc.coords if c in merged_dens.columns] + # ---- Find valid ordering + coord_order = [ + c + for c in ["transect_num", "longitude", "latitude", "interval", "layer"] + if c in overlapping_coords + ] + # ---- Set the index + output_indexed = merged_dens.set_index(coord_order) + parameters_indexed = parameters.reset_index().set_index( + [c for c in parameters.index.names if c in coord_order] + ) - # Compute areal biomass density - reference_nasc["biomass_density"] = ( - reference_nasc["number_density"] * parameters["body_weight"] + # Calculate biomass + output_indexed["biomass_density"] = ( + output_indexed["number_density"] * parameters_indexed["body_weight"] ).fillna(0.0) - # Find shared indices - idx = list(set(reference_nasc.index.names).intersection(parameters.index.names)) - - # Reset indices - reference_nasc = reference_nasc.reset_index().set_index(idx) + # Create Dataset by converting each column separately + data_vars = {} + for col in ["nasc", "number_density", "biomass_density", "parameters"]: + if col in output_indexed.columns: + data_vars[col] = (["point"], output_indexed[col].values) + # ---- Create coordinates from the MultiIndex using from_pandas_multiindex + coords = xr.Coordinates.from_pandas_multiindex(output_indexed.index, "point") - # Concatenate the parameters - return pd.concat( - [ - reference_nasc, - parameters.reset_index() - .drop("number_density", axis=1) - .set_index(idx) - .reindex(reference_nasc.index), - ], - axis=1, - ) + return xr.Dataset(data_vars, coords=coords) def calculate_transect_number_density( - acoustic_data: pd.DataFrame, - reference_nasc: pd.DataFrame, - parameters: pd.DataFrame, + acoustic_data: xr.Dataset, + reference_nasc: xr.DataArray, + parameters: xr.Dataset, ): """ Compute areal number density aggregated by survey transect. - This function estimates the areal number density of organisms by - weighting volumetric densities by NASC contributions and aggregating - across survey transects. The method accounts for varying layer - thickness and spatial sampling intensity. + This function estimates the areal number density of organisms by weighting volumetric densities + by NASC contributions and aggregating across survey transects. The method accounts for varying + layer thickness and spatial sampling intensity. Parameters ---------- - acoustic_data : pd.DataFrame - DataFrame containing acoustic measurements including: + acoustic_data : xr.Dataset + Dataset containing acoustic measurements including: + - nasc: Nautical Area Scattering Coefficient values + - thickness_mean: Mean layer thickness in meters - reference_nasc : pd.DataFrame - DataFrame with NASC values at reference frequency for weighting - parameters : pd.DataFrame - DataFrame containing biological parameters including: + + reference_nasc : xr.DataArray + DataArray with NASC values at reference frequency for weighting. + + parameters : xr.Dataset + Dataset containing biological parameters including: + - number_density: Volumetric number density (individuals/m³) Returns ------- - pd.Series - Areal number density in individuals/m² aggregated by transect, - converted from nautical miles to meters (factor of 1852²) + xr.DataArray + Areal number density in individuals/m² aggregated by transect, converted from nautical + miles to meters (factor of 1852²). Notes ----- @@ -170,60 +191,79 @@ def calculate_transect_number_density( w = \\frac{\\text{NASC}_{ref}}{\\text{NASC}_{acoustic}} """ - # Create copies - acoustic_df = acoustic_data.copy() - reference_df = reference_nasc.copy() + # Get the transects that exist in acoustic_data + valid_transects = acoustic_data["transect_num"].values - # Find shared indices - idx = list(set(reference_nasc.index.names).intersection(acoustic_data.index.names)) + # Filter reference_nasc to only include valid transects + reference_filtered = reference_nasc.where( + reference_nasc["transect_num"].isin(valid_transects), drop=True + ) - # Reset the indices for each - acoustic_df = acoustic_df.reset_index().set_index(idx) - reference_df = reference_df.reset_index().set_index(idx) + # Reindex acoustic data to match reference_nasc structure + acoustic_aligned = acoustic_data.sel(transect_num=reference_filtered["transect_num"]) + parameters_aligned = parameters.to_xarray().sel(transect_num=reference_filtered["transect_num"]) # Compute the NASC weights - reference_df["nasc_weight"] = reference_df["nasc"] / acoustic_df["nasc"] + nasc_weight = reference_filtered.values / acoustic_aligned["nasc"].values - # Get the counts of each transect - interval_counts = reference_df.groupby(level="transect_num")["nasc"].count() + # Count intervals per transect + interval_counts = reference_filtered.groupby("transect_num").count() - # Compute the areal number density - areal_number_density = ( - parameters["number_density"] - * reference_df["nasc_weight"] - * acoustic_df["thickness_mean"] - * interval_counts - ).fillna(0.0) * 1852**2 + # Broadcast interval counts back to reference dimensions + interval_counts_broadcast = xr.DataArray( + [ + interval_counts.sel(transect_num=t).values + for t in reference_filtered["transect_num"].values + ], + coords=reference_filtered.coords, + dims=reference_filtered.dims, + ) + + # Compute the areal number density using values + areal_number_density_vals = ( + parameters_aligned["number_density"].values + * nasc_weight + * acoustic_aligned["thickness_mean"].values + * interval_counts_broadcast.values + ) * 1852**2 + + # Create output DataArray with reference_filtered structure + areal_number_density = xr.DataArray( + areal_number_density_vals, coords=reference_filtered.coords, dims=reference_filtered.dims + ) # Return return areal_number_density def calculate_intervals_number_density( - acoustic_data: pd.DataFrame, - parameters: pd.DataFrame, -): + acoustic_data: xr.Dataset, + parameters: xr.Dataset, +) -> xr.DataArray: """ Compute areal number density for individual depth/range intervals. - This function calculates areal number density by converting volumetric - density estimates to areal density using layer thickness information. - Unlike transect aggregation, this preserves interval-level resolution. + This function calculates areal number density by converting volumetric density estimates to + areal density using layer thickness information. Unlike transect aggregation, this preserves + interval-level resolution. Parameters ---------- - acoustic_data : pd.DataFrame - DataFrame containing acoustic measurements with: + acoustic_data : xr.Dataset + Dataset containing acoustic measurements with: + - thickness_mean: Mean layer thickness in meters for each interval - parameters : pd.DataFrame - DataFrame containing biological parameters with: + + parameters : xr.Dataset + Dataset containing biological parameters with: + - number_density: Volumetric number density (individuals/m³) Returns ------- - pd.Series - Areal number density in individuals/m² for each interval, - converted from nautical miles to meters (factor of 1852²) + xr.DataArray + Areal number density in individuals/m² for each interval, converted from nautical miles to + meters (factor of 1852²). Notes ----- @@ -232,16 +272,22 @@ def calculate_intervals_number_density( .. math:: N_A = N_V \\cdot h \\cdot 1852^2 - where N_A is areal density (individuals/m²), N_V is volumetric density - (individuals/m³), h is layer thickness (m), and 1852² converts from - nautical miles² to meters². + where N_A is areal density (individuals/m²), N_V is volumetric density (individuals/m³), h is + layer thickness (m), and 1852² converts from nautical miles² to meters². - This method maintains spatial resolution at the interval level, making - it suitable for detailed vertical distribution analysis. + This method maintains spatial resolution at the interval level, making it suitable for + detailed vertical distribution analysis. """ - # Calculate the layer density - layer_density = parameters["number_density"] * acoustic_data["thickness_mean"] + # Calculate the layer density using values to avoid index conflict + layer_density_vals = ( + parameters["number_density"].values * acoustic_data["thickness_mean"].values + ) * 1852**2 + + # Create output DataArray with acoustic_data structure + layer_density = xr.DataArray( + layer_density_vals, coords=acoustic_data.coords, dims=acoustic_data.dims + ) # Return - return layer_density * 1852**2 + return layer_density diff --git a/echopop/inversion/utils.py b/echopop/inversion/utils.py index c38d2d2a..658bad1e 100644 --- a/echopop/inversion/utils.py +++ b/echopop/inversion/utils.py @@ -3,6 +3,7 @@ import numpy as np import pandas as pd +import xarray as xr def impute_missing_sigma_bs( @@ -649,25 +650,41 @@ def generate_frequency_interval( return _generate_frequency_interval_cached_key(key) -def _extract_parameters_optimized(inverted_data: pd.DataFrame) -> pd.DataFrame: +def _extract_parameters_optimized(inverted_data: xr.Dataset) -> pd.DataFrame: """ Extract parameters more efficiently than using .apply(). - This optimized version avoids the overhead of pandas .apply() - by using direct iteration and batch DataFrame construction. + This function extracts InvParameters objects from the Dataset's 'parameters' coordinate and + converts them to a DataFrame with parameter values as columns. Parameters ---------- - inverted_data : pd.DataFrame - DataFrame with 'parameters' column containing InvParameters objects + inverted_data : xr.Dataset + Dataset with 'parameters' coordinate containing InvParameters objects. Returns ------- pd.DataFrame - DataFrame with parameter values as columns + DataFrame with parameter values as columns, indexed by the appropriate dimensions + excluding 'frequency'. """ - # Extract all parameter dictionaries at once - param_dicts = [obj.values for obj in inverted_data["parameters"]] - # Construct DataFrame in one operation - return pd.DataFrame(param_dicts, index=inverted_data.index) + # Extract parameter objects from coordinates + param_objects = inverted_data.coords["parameters"].values + + # Extract all parameter dictionaries + param_dicts = [obj.values for obj in param_objects] + + # Convert to DataFrame + params = pd.DataFrame(param_dicts) + + # Get the index from inverted_data - handle both MultiIndex and regular index + if "point" in inverted_data.indexes and hasattr(inverted_data.indexes["point"], "names"): + params.index = inverted_data.indexes["point"] + else: + dim_names = [d for d in inverted_data.dims if d != "frequency"] + if dim_names: + params.index = inverted_data.indexes[dim_names[0]] + + # Return + return params diff --git a/echopop/survey/proportions.py b/echopop/survey/proportions.py index bab67219..6740d98c 100644 --- a/echopop/survey/proportions.py +++ b/echopop/survey/proportions.py @@ -401,7 +401,13 @@ def binned_weights( ) # Return the grouped sums - return result.groupby(group_columns, observed=False)["weight"].sum().to_xarray().fillna(0.0) + return ( + result.groupby(group_columns, observed=False)["weight"] + .sum() + .to_xarray() + .fillna(0.0) + .astype(float) + ) def calculate_adjusted_proportions( diff --git a/echopop/tests/ingest/test_inversion_sv_ingestion.py b/echopop/tests/ingest/test_inversion_sv_ingestion.py index ce49da1c..eb6f3c71 100644 --- a/echopop/tests/ingest/test_inversion_sv_ingestion.py +++ b/echopop/tests/ingest/test_inversion_sv_ingestion.py @@ -3,6 +3,7 @@ import numpy as np import pandas as pd import pytest +import xarray as xr from echopop.ingest import sv as ingest_sv @@ -296,8 +297,8 @@ def test_ingest_echoview_sv_basic_functionality(sv_directory_with_files): impute_coordinates=True, ) - assert isinstance(result_data, pd.DataFrame) - assert result_coords is None or isinstance(result_coords, pd.DataFrame) + assert isinstance(result_data, xr.Dataset) + assert result_coords is None or isinstance(result_coords, xr.DataArray) def test_ingest_echoview_sv_with_transect_pattern(sv_directory_with_files): @@ -311,8 +312,8 @@ def test_ingest_echoview_sv_with_transect_pattern(sv_directory_with_files): aggregate_method="interval", ) - assert isinstance(result_data, pd.DataFrame) - assert isinstance(result_coords, pd.DataFrame) + assert isinstance(result_data, xr.Dataset) + assert isinstance(result_coords, xr.DataArray) def test_ingest_echoview_sv_no_center_frequencies(sv_directory_with_files): @@ -321,8 +322,8 @@ def test_ingest_echoview_sv_no_center_frequencies(sv_directory_with_files): sv_directory_with_files, transect_pattern=r"x(\d+)", aggregate_method="transect" ) - assert isinstance(result_data, pd.DataFrame) - assert isinstance(result_coords, pd.DataFrame) + assert isinstance(result_data, xr.Dataset) + assert isinstance(result_coords, xr.DataArray) def test_ingest_echoview_sv_directory_not_found(): diff --git a/echopop/tests/inversion/test_inversion_matrix.py b/echopop/tests/inversion/test_inversion_matrix.py index 1f9daadf..ce3a9ec8 100644 --- a/echopop/tests/inversion/test_inversion_matrix.py +++ b/echopop/tests/inversion/test_inversion_matrix.py @@ -5,6 +5,7 @@ import echopop.inversion.inversion_matrix as im from echopop import inversion +from echopop.ingest import sv as ingest_sv from echopop.inversion import InversionMatrix, InvParameters, estimate_population @@ -243,64 +244,52 @@ def test_estimate_population(inv_transect_info, inv_interval_info, inv_cells_inf # Test transect-index transect_result = estimate_population( - inv_transect_info["inverted"], - inv_transect_info["coords"], + ingest_sv.df_to_xarray(inv_transect_info["inverted"]), + ingest_sv.df_to_xarray(inv_transect_info["coords"]), density_sw=1026.0, reference_frequency=120e3, aggregate_method="transect", ) + # ---- Check: shape - assert transect_result.shape == (5, 17) - # ---- Get keys: - params = list(inv_transect_info["inverted"]["parameters"].loc[1].values.keys()) - # ---- Check: columns - expected_cols = ["longitude", "latitude", "nasc", "number_density", "biomass_density"] + params - assert all([col in transect_result.columns for col in expected_cols]) + assert transect_result.sizes == {"point": 5} + # ---- Check: coords + expected_coords = ["longitude", "latitude", "transect_num"] + assert all([c in transect_result.coords for c in expected_coords]) # ---- Check: index - assert list(transect_result.index.names) == ["transect_num"] + assert set(transect_result.data_vars) == {"biomass_density", "number_density", "nasc"} # Test interval-index interval_result = estimate_population( - inv_interval_info["inverted"], - inv_interval_info["coords"], + ingest_sv.df_to_xarray(inv_interval_info["inverted"]), + ingest_sv.df_to_xarray(inv_interval_info["coords"]), density_sw=1026.0, reference_frequency=120e3, aggregate_method="interval", ) # ---- Check: shape - assert interval_result.shape == (20, 15) - # ---- Get keys: - params = list(inv_interval_info["inverted"]["parameters"].loc[1, 1, 1].values.keys()) - # ---- Check: columns - expected_cols = ["nasc", "number_density", "biomass_density"] + params - assert all([col in interval_result.columns for col in expected_cols]) + assert interval_result.sizes == {"point": 20} + # ---- Check: coords + expected_coords = ["longitude", "latitude", "interval"] + assert all([c in interval_result.coords for c in expected_coords]) # ---- Check: index - assert all( - [col in ["longitude", "latitude", "interval"] for col in list(interval_result.index.names)] - ) + assert set(interval_result.data_vars) == {"biomass_density", "number_density", "nasc"} # Test cells-index cells_result = estimate_population( - inv_cells_info["inverted"], - inv_cells_info["coords"], + ingest_sv.df_to_xarray(inv_cells_info["inverted"]), + ingest_sv.df_to_xarray(inv_cells_info["coords"]), density_sw=1026.0, reference_frequency=120e3, aggregate_method="cells", ) # ---- Check: shape - assert cells_result.shape == (10, 15) - # ---- Get keys: - params = list(inv_cells_info["inverted"]["parameters"].loc[1, 1, -1, -1].values.keys()) - # ---- Check: columns - expected_cols = ["nasc", "number_density", "biomass_density"] + params - assert all([col in cells_result.columns for col in expected_cols]) + assert cells_result.sizes == {"point": 10} + # ---- Check: coords + expected_coords = ["longitude", "latitude", "interval", "layer"] + assert all([c in cells_result.coords for c in expected_coords]) # ---- Check: index - assert all( - [ - col in ["longitude", "latitude", "interval", "layer"] - for col in list(cells_result.index.names) - ] - ) + assert set(cells_result.data_vars) == {"biomass_density", "number_density", "nasc"} # Test functions for scattering operations @@ -601,17 +590,27 @@ def test_inversion_matrix_creation(): """Test InversionMatrix creation and basic functionality.""" # Create mock data frequencies = [38e3, 120e3] - n_obs = 5 + n_obs = 4 # Create MultiIndex columns - cols = pd.MultiIndex.from_product( - [["sv_mean", "nasc", "thickness_mean"], frequencies], names=[None, "frequency"] - ) # Generate random data np.random.seed(131) - data = np.random.randn(n_obs, len(cols)) - df = pd.DataFrame(data, columns=cols) + data = ( + pd.DataFrame( + { + "transect_num": np.repeat([1, 2], n_obs), + "frequency": np.tile(frequencies, n_obs), + "longitude": np.random.randn(n_obs * len(frequencies)), + "sv_mean": np.random.randn(n_obs * len(frequencies)), + "nasc": np.random.randn(n_obs * len(frequencies)), + "thickness_mean": np.random.randn(n_obs * len(frequencies)), + } + ) + .set_index(["transect_num", "longitude", "frequency"]) + .unstack("frequency") + .fillna(0.0) + ) # Simulation settings sim_settings = { @@ -622,7 +621,7 @@ def test_inversion_matrix_creation(): } # Create InversionMatrix - inv_matrix = InversionMatrix(df, sim_settings, verbose=False) + inv_matrix = InversionMatrix(ingest_sv.df_to_xarray(data), sim_settings, verbose=False) assert inv_matrix is not None assert inv_matrix.inversion_method == "scattering_model" diff --git a/echopop/validators/__init__.py b/echopop/validators/__init__.py index aa806234..fbf895eb 100644 --- a/echopop/validators/__init__.py +++ b/echopop/validators/__init__.py @@ -12,6 +12,19 @@ ValidateVariogramClass, ) + +# Rebuild ValidateBuildModelArgs after InvParameters is available +def _rebuild_on_import(): + try: + from ..inversion import InvParameters # noqa: F401 + + ValidateBuildModelArgs.model_rebuild() + except ImportError: + pass + + +_rebuild_on_import() + __all__ = [ # Inversion "ValidateBuildModelArgs", diff --git a/echopop/validators/inversion.py b/echopop/validators/inversion.py index 2c7d4f1d..1125133b 100644 --- a/echopop/validators/inversion.py +++ b/echopop/validators/inversion.py @@ -3,10 +3,10 @@ from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Optional import numpy as np -import pandas as pd +import xarray as xr from pydantic import ConfigDict, Field, RootModel, ValidationError, field_validator, model_validator -from ..core.validators import BaseDataFrame, BaseDictionary +from ..core.validators import BaseDictionary if TYPE_CHECKING: from ..inversion import InvParameters @@ -90,54 +90,6 @@ def validate_expected_strata(cls, v): return v -class ScatterDF(BaseDataFrame): - """ - Validates MultiIndex DataFrame structure with required top-level columns and nested frequency - index. - - Ensures the top-level column names include 'sv_mean', 'nasc', and 'thickness_mean', and that - the second-level column names ('frequency') are numeric. All values under each frequency must - be floats. - """ - - class Config(BaseDataFrame.Config): - title = "acoustic backscatter export DataFrame" - multiindex_strict = True - multiindex_coerce = True - unique_column_names = False - - @classmethod - def pre_validate(cls, df: pd.DataFrame) -> pd.DataFrame: - # Raise Error if not a MultiIndex DataFrame - if not isinstance(df.columns, pd.MultiIndex): - raise TypeError("Expected MultiIndex columns.") - - # Check for required top-level columns - # ---- Get the top level indices - top_level = set(df.columns.get_level_values(0)) - # ---- Identify missing columns - missing_columns = {"sv_mean", "nasc", "thickness_mean"} - top_level - # ---- Raise Error, if needed - if missing_columns: - # ---- Format - missing_str = ", ".join(f"'{col}'" for col in missing_columns) - raise KeyError(f"Missing required top-level columns: {missing_str}.") - - # Check for nested column 'frequency' - if "frequency" not in df.columns.names: - # ---- Get current column names - current_names = list(df.columns.names) - # ---- Format - current_str = ", ".join(f"'{col}'" for col in current_names if col) - # ---- Raise Error - raise KeyError( - f"Missing required nested column index 'frequency'. Current column indices are: " - f"{current_str}." - ) - - return df - - class EnvironmentParameters(BaseDictionary): """ Environmental and medium parameters for acoustic scattering models. @@ -181,14 +133,26 @@ class ValidateInversionMatrix(BaseDictionary): Validation model for InversionMatrix class configuration. """ - data: pd.DataFrame + data: xr.Dataset simulation_settings: SimulationParameters = Field(default_factory=SimulationParameters) model_config = ConfigDict(title="scattering model inversion analysis") @field_validator("data", mode="after") def validate_data(cls, v): - # Validate with pandera - return ScatterDF.validate(v) + + # Validate coordinates + if "frequency" not in v.coords: + raise KeyError("Required coordinate 'frequency' missing from input xarray.Dataset.") + + # Validate data variables + missing_vars = {"sv_mean", "nasc", "thickness_mean"} - set(v.data_vars) + # ---- Raise Error, if needed + if missing_vars: + # ---- Format + missing_str = ", ".join(f"'{col}'" for col in missing_vars) + raise KeyError(f"Missing required data variables from xarray.Dataset: {missing_str}.") + + return v class ModelSettingsParameters(BaseDictionary): @@ -355,3 +319,15 @@ def create(cls, **kwargs): Factory creation method """ return cls.judge(**kwargs).model_dump(exclude_none=True) + + +# Rebuild models with forward references after all classes are defined +_model_rebuilt = False + + +def ensure_model_rebuilt(): + """Ensure model is rebuilt with forward references resolved.""" + global _model_rebuilt + if not _model_rebuilt: + ValidateBuildModelArgs.model_rebuild() + _model_rebuilt = True diff --git a/echopop/validators/scattering.py b/echopop/validators/scattering.py index a537a086..f5e8412a 100644 --- a/echopop/validators/scattering.py +++ b/echopop/validators/scattering.py @@ -48,4 +48,4 @@ class ValidatePCDWBASettings(BaseDictionary): orientation_distribution: DistributionParameters = Field(default_factory=DistributionParameters) taper_order: float = Field(default=10.0, gt=0.0, allow_inf_nan=False) type: str - model_config = ConfigDict(title="PCDWBA model settings and additional arguments") + model_config = ConfigDict(title="PCDWBA model settings and additional arguments", extra="allow") diff --git a/echopop/workflows/nwfsc_feat/workflows/feat_krill.py b/echopop/workflows/nwfsc_feat/workflows/feat_krill.py index 4970678c..734a7f03 100644 --- a/echopop/workflows/nwfsc_feat/workflows/feat_krill.py +++ b/echopop/workflows/nwfsc_feat/workflows/feat_krill.py @@ -1,6 +1,5 @@ from pathlib import Path -from echopop import utils from echopop.ingest import sv from echopop.inversion import InversionMatrix, InvParameters, estimate_population @@ -40,8 +39,16 @@ # DATA SUBSET # ================================================================================================== -sv_data_sub = utils.apply_filters(sv_data, include_filter={"transect_num": [1, 2, 3]}) - +# sv_data_sub = sv_data.sel(transect_num=[1,2,3]) +if PROCESSING_METHOD == "transect": + sv_data_sub = sv_data.where(sv_data["transect_num"].isin([1, 2]), drop=True) +elif PROCESSING_METHOD == "interval": + sv_data_sub = sv_data.where((sv_data["transect_num"].isin([1])) & + sv_data["interval"].isin([13, 29, 30, 31]), + drop=True) +else: + out = sv_data.sel(transect_num=2, interval=17) + sv_data_sub = out.where(out["layer"] < 15, drop=True) # ================================================================================================== # ================================================================================================== # DEFINE PARAMETERS @@ -60,6 +67,12 @@ } # Model-specific settings, including distributions +ENVIRONMENT = { + "sound_speed_sw": 1500.0, + "density_sw": 1026.9, +} + + MODEL_SETTINGS = { "type": "pcdwba", "taper_order": 10.0, @@ -68,15 +81,10 @@ "n_wavelength": 10, "orientation_distribution": {"family": "gaussian", "bins": 60}, "length_distribution": {"family": "gaussian", "bins": 100}, -} - -ENVIRONMENT = { - "sound_speed_sw": 1500.0, - "density_sw": 1026.9, + "environment": ENVIRONMENT, } SIMULATION_SETTINGS = { - "environment": ENVIRONMENT, "monte_carlo": True, "mc_realizations": 20, "minimum_frequency_count": 2, @@ -102,7 +110,6 @@ }, } - def run_krill_inversion_workflow(): # ================================================================================================== # ================================================================================================== @@ -117,7 +124,6 @@ def run_krill_inversion_workflow(): # ================================================================================================== # INITIALIZE # ================================================================================================== - inversion_krill = InversionMatrix(sv_data_sub, SIMULATION_SETTINGS) # ================================================================================================== @@ -131,7 +137,7 @@ def run_krill_inversion_workflow(): # ================================================================================================== # BUILD AND FORMAT SCATTERING MODEL OPTIMIZERS # ================================================================================================== - df_inversion_results = inversion_krill.invert(optimization_kwargs=OPTIMIZATION_KWARGS) + inversion_results = inversion_krill.invert(optimization_kwargs=OPTIMIZATION_KWARGS) # ================================================================================================== # ================================================================================================== @@ -139,9 +145,12 @@ def run_krill_inversion_workflow(): # ================================================================================================== return estimate_population( - df_inversion_results, + inversion_results, nasc_coordinates, - aggregate_method="transect", + aggregate_method=PROCESSING_METHOD, density_sw=ENVIRONMENT["density_sw"], reference_frequency=120e3, ) + +run_krill_inversion_workflow() +