diff --git a/src/glide/cli.py b/src/glide/cli.py index 7a7a633..cbc35b5 100644 --- a/src/glide/cli.py +++ b/src/glide/cli.py @@ -228,7 +228,9 @@ def l2( merged = process_l1.calculate_thermodynamics(merged, conf) - out = profiles.get_profiles(merged, shallowest_profile, min_surface_time) + out = profiles.get_profiles( + merged, shallowest_profile, min_surface_time, flt=flt_raw + ) out = profiles.assign_surface_state(out, flt=flt_raw) diff --git a/src/glide/process_l2.py b/src/glide/process_l2.py index 490a0de..20b7b81 100644 --- a/src/glide/process_l2.py +++ b/src/glide/process_l2.py @@ -231,6 +231,9 @@ def bin_l2( for row in profile_indexes: profile = ds.isel(i=slice(row[0], row[1])) + if not np.isfinite(profile.depth.values).any(): + _log.debug("Skipping profile [%d:%d] with no finite depth", row[0], row[1]) + continue time = profile.time.values idx_mid = np.searchsorted(time, 0.5 * (time[0] + time[-1])) diff --git a/src/glide/profiles.py b/src/glide/profiles.py index 18e8e8b..baf6813 100644 --- a/src/glide/profiles.py +++ b/src/glide/profiles.py @@ -6,24 +6,256 @@ _log = logging.getLogger(__name__) +_STATE_ATTRS = dict( + long_name="Glider state", + flag_values=np.array([-1, 0, 1, 2, 3], "b"), + flag_meanings="unknown surface dive climb drift", + valid_max=np.int8(3), + valid_min=np.int8(-1), +) + + +def _detect_drift( + pressure: np.ndarray, + time: np.ndarray, + dt_s: float, + flt: xr.Dataset | None = None, + min_drift_duration: float = 600.0, + pressure_std_threshold: float = 2.0, + surface_pressure: float = 2.0, +) -> np.ndarray: + """Return boolean mask True where glider is drifting at depth. + + Uses x_hover_active from raw flight data when present (1=on, 0=off, + -127=missing; forward-filled to science timestamps). Otherwise applies a + rolling-variance detector: rolling pressure std < pressure_std_threshold + and pressure > surface_pressure sustained for min_drift_duration seconds. + """ + n = len(pressure) + drift = np.zeros(n, dtype=bool) + + if flt is not None and "x_hover_active" in flt: + ft = flt.m_present_time.values + ha = flt.x_hover_active.values + keep = np.isfinite(ha) & np.isfinite(ft) & (ha != -127) + if keep.any(): + order = np.argsort(ft[keep]) + ft, ha = ft[keep][order], ha[keep][order] + pos = np.searchsorted(ft, time, side="right") - 1 + in_range = pos >= 0 + drift[in_range] = ha[pos[in_range]] == 1 + _log.debug("Drift: %d points from x_hover_active", int(drift.sum())) + return drift + _log.debug("x_hover_active has no finite values; falling back to pressure") + + valid = np.isfinite(pressure) + if not valid.any(): + return drift + + # Window = min_drift_duration / 4 (smaller reduces edge miss). np.convolve + # mode='same' returns max(n, window) elements, so cap so window < n. + half = min((n - 1) // 2, max(2, round(min_drift_duration / 4 / dt_s))) + ones = np.ones(2 * half + 1) + p_fill = np.where(valid, pressure, 0.0) + cnt = np.convolve(valid.astype(float), ones, mode="same") + sm = np.convolve(p_fill, ones, mode="same") + sq = np.convolve(p_fill**2, ones, mode="same") + safe = np.where(cnt >= 3, cnt, 1.0) + var = sq / safe - (sm / safe) ** 2 + rolling_std = np.where(cnt >= 3, np.sqrt(np.maximum(0.0, var)), np.inf) + + candidate = ( + valid & (pressure > surface_pressure) & (rolling_std < pressure_std_threshold) + ) + + # Keep only contiguous runs spanning at least min_drift_duration seconds. + i = 0 + while i < n: + if not candidate[i]: + i += 1 + continue + j = i + 1 + while j < n and candidate[j]: + j += 1 + if (j - i) * dt_s >= min_drift_duration: + drift[i:j] = True + i = j + + # The rolling-variance window misses the settling phase (overshoot + # recovery before, slow ascent start after). Expand each drift run to + # absorb adjacent samples whose pressure is close to the drift mean. + # A real climb/dive moves through drift depth quickly so the expansion + # stops shortly after crossing it. + if drift.any(): + diff_d = np.diff(drift.astype(int), prepend=0, append=0) + run_starts, run_ends = np.where(diff_d == 1)[0], np.where(diff_d == -1)[0] + tol = 2 * pressure_std_threshold + for s, e in zip(run_starts, run_ends): + mean_p = float(np.nanmean(pressure[s:e])) + while ( + s > 0 + and np.isfinite(pressure[s - 1]) + and abs(pressure[s - 1] - mean_p) < tol + ): + s -= 1 + while ( + e < n and np.isfinite(pressure[e]) and abs(pressure[e] - mean_p) < tol + ): + e += 1 + drift[s:e] = True + + _log.debug("Drift: %d points from pressure detector", int(drift.sum())) + return drift + + +def _absorb_apex_unknowns( + state: np.ndarray, + dive_id: np.ndarray, + climb_id: np.ndarray, + profile_id: np.ndarray, + pressure: np.ndarray, + dt_s: float, + max_gap_duration: float, + surface_pressure: float = 2.0, +) -> None: + """Split short underwater unknown gaps between adjacent profiles at the + pressure extremum. Handles dive→climb (split at max: pre→dive, post→climb) + and climb→dive yo-tops (split at min: pre→climb, post→dive). Skips gaps + that go shallow (left for the surface classifier) or are too long.""" + n = len(state) + unknown = state == -1 + if not unknown.any(): + return + diff = np.diff(unknown.astype(int), prepend=0, append=0) + starts, ends = np.where(diff == 1)[0], np.where(diff == -1)[0] + max_samples = max(2, round(max_gap_duration / dt_s)) + + for s, e in zip(starts, ends): + if (e - s) > max_samples or s == 0 or e >= n: + continue + prev, nxt = int(state[s - 1]), int(state[e]) + if {prev, nxt} != {1, 2}: + continue + p = pressure[s:e] + finite = np.isfinite(p) + if not finite.any() or float(np.min(p[finite])) < surface_pressure: + continue + # dive→climb splits at max (extremum is positive); climb→dive at min. + fill, extreme = (-np.inf, np.argmax) if prev == 1 else (np.inf, np.argmin) + split = s + int(extreme(np.where(finite, p, fill))) + 1 + pre_id = dive_id if prev == 1 else climb_id + post_id = climb_id if nxt == 2 else dive_id + state[s:split] = prev + state[split:e] = nxt + pre_id[s:split] = pre_id[s - 1] + post_id[split:e] = post_id[e] + profile_id[s:split] = profile_id[s - 1] + profile_id[split:e] = profile_id[e] + + +def _absorb_pre_drift_transients( + state: np.ndarray, + dive_id: np.ndarray, + climb_id: np.ndarray, + profile_id: np.ndarray, + drift_mask: np.ndarray, + dt_s: float, + max_transient_duration: float, +) -> None: + """Fold brief climb/unknown segments between a dive end and a drift start + into that drift. This is the dive overshoot recovery: the glider arrives + past target, briefly recovers upward, then settles into hover. Without + this pass the recovery becomes a tiny climb profile in L3.""" + n = len(state) + if not drift_mask.any(): + return + diff = np.diff(drift_mask.astype(int), prepend=0, append=0) + drift_starts = np.where(diff == 1)[0] + window = max(2, round(max_transient_duration / dt_s)) + + for ds_ in drift_starts: + if ds_ == 0: + continue + start = max(0, ds_ - window) + i = ds_ - 1 + while i >= start and (state[i] == 2 or state[i] == -1): + i -= 1 + # Only absorb when bounded on the left by a dive within the window + if i < start or state[i] != 1: + continue + state[i + 1 : ds_] = 3 + dive_id[i + 1 : ds_] = -1 + climb_id[i + 1 : ds_] = -1 + profile_id[i + 1 : ds_] = -1 + + +def _absorb_post_drift_transients( + state: np.ndarray, + dive_id: np.ndarray, + climb_id: np.ndarray, + profile_id: np.ndarray, + drift_mask: np.ndarray, + dt_s: float, + max_transient_duration: float, +) -> None: + """Fold brief dive/unknown segments between a drift end and the next climb + into that climb. The drift mask gap can cause profinder to detect a + spurious tiny peak; this extends the climb backward to absorb it.""" + n = len(state) + if not drift_mask.any(): + return + diff = np.diff(drift_mask.astype(int), prepend=0, append=0) + drift_ends = np.where(diff == -1)[0] + window = max(2, round(max_transient_duration / dt_s)) + + for de in drift_ends: + if de >= n: + continue + in_window = state[de : min(de + window, n)] + climb_local = np.where(in_window == 2)[0] + if len(climb_local) == 0: + continue + first_climb = de + int(climb_local[0]) + if first_climb == de: + continue + intermediate = state[de:first_climb] + if not np.all((intermediate == 1) | (intermediate == -1)): + continue + state[de:first_climb] = 2 + climb_id[de:first_climb] = climb_id[first_climb] + profile_id[de:first_climb] = profile_id[first_climb] + dive_id[de:first_climb] = -1 + def get_profiles( ds: xr.Dataset, shallowest_profile: float, min_surface_time: float = 180.0, + flt: xr.Dataset | None = None, + min_drift_duration: float = 600.0, + drift_pressure_std: float = 2.0, ) -> xr.Dataset: - """Identify dive and climb profiles from a pressure time series. + """Identify dive, climb, and drift profiles from a pressure time series. Parameters ---------- ds : xr.Dataset Merged L2 dataset with 'pressure' and 'time' variables. shallowest_profile : float - Minimum peak pressure (dbar) for a profile to be recognised. + Minimum peak pressure (dbar) for a profile to be recognised. Also + used as the inter-profile trough prominence threshold so small stalls + within a climb are not mistaken for surfacings. min_surface_time : float - Minimum time (seconds) between consecutive dive apexes. Used to - compute sample-count thresholds so the detector adapts to the data - sampling rate. A value of ~180 s works for most Slocum deployments. + Minimum time (seconds) between consecutive dive apexes (default 180). + flt : xr.Dataset, optional + Raw flight data. Used to detect drift via x_hover_active when + present; otherwise the rolling-variance fallback runs. + min_drift_duration : float + Minimum sustained-low-variance duration (s) for the pressure-based + drift detector (default 600 s). + drift_pressure_std : float + Rolling pressure std threshold (dbar) for the pressure-based drift + detector (default 2.0). """ raw_diff = np.diff(ds.time.values) if np.issubdtype(raw_diff.dtype, np.timedelta64): @@ -32,6 +264,20 @@ def get_profiles( dt_s = float(np.nanmedian(raw_diff)) fs = 1.0 / dt_s + drift_mask = _detect_drift( + ds.pressure.values, + ds.time.values, + dt_s, + flt=flt, + min_drift_duration=min_drift_duration, + pressure_std_threshold=drift_pressure_std, + ) + + # Mask drift before profile finding so profinder pairs the post-drift + # climb with the pre-drift dive across the gap (missing="drop"). + pressure_masked = ds.pressure.values.copy().astype(float) + pressure_masked[drift_mask] = np.nan + peaks_kwargs = { "height": shallowest_profile, "prominence": shallowest_profile, @@ -39,25 +285,26 @@ def get_profiles( "width": max(2, round(20 * fs)), # ≥20 s half-width detects ~10 m dives } troughs_kwargs = { - "prominence": 2, + # Real inter-profile troughs are at least as deep as a real dive's + # prominence; smaller "troughs" are stalls within a profile. + "prominence": shallowest_profile, "distance": max(2, round(30 * fs)), "width": max(1, round(5 * fs)), } - _log.debug( - "fs=%.3f Hz; peaks_kwargs=%s; troughs_kwargs=%s", - fs, - peaks_kwargs, - troughs_kwargs, - ) + _log.debug("fs=%.3f Hz; peaks=%s; troughs=%s", fs, peaks_kwargs, troughs_kwargs) profiles = find_profiles( - ds.pressure.values, + pressure_masked, peaks_kwargs=peaks_kwargs, troughs_kwargs=troughs_kwargs, + # Real glider pressure has per-second sensor bobbing (positive Δp every + # ~5 samples) that breaks profinder's default 4-sample-all-ascent + # confirmation, terminating the climb tens of dbar above the surface. + # 2 samples is loose enough to find ascent windows through the bobbing. + run_length=2, missing="drop", ) - _log.debug("Found %i profiles", len(profiles)) n = ds.time.size @@ -66,40 +313,32 @@ def get_profiles( profile_id = np.full(n, -1, dtype="i4") state = np.full(n, -1, dtype="b") - dive_counter = 1 - climb_counter = 1 - profile_counter = 1 - - for prof in profiles: - dive_start, dive_end, climb_start, climb_end = prof - - dive_id[dive_start:dive_end] = dive_counter - profile_id[dive_start:dive_end] = profile_counter - state[dive_start:dive_end] = 1 - dive_counter += 1 - profile_counter += 1 - - climb_id[climb_start:climb_end] = climb_counter - profile_id[climb_start:climb_end] = profile_counter - state[climb_start:climb_end] = 2 - climb_counter += 1 - profile_counter += 1 - - ds["dive_id"] = ("time", dive_id, dict(_FillValue=np.int32(-1))) - ds["climb_id"] = ("time", climb_id, dict(_FillValue=np.int32(-1))) - ds["profile_id"] = ("time", profile_id, dict(_FillValue=np.int32(-1))) - ds["state"] = ( - "time", - state, - dict( - long_name="Glider state", - flag_values=np.array([-1, 0, 1, 2], "b"), - flag_meanings="unknown surface dive climb", - valid_max=np.int8(2), - valid_min=np.int8(-1), - ), - ) - + pid = 1 + for k, (ds_, de, cs, ce) in enumerate(profiles, start=1): + dive_id[ds_:de] = k + profile_id[ds_:de] = pid + state[ds_:de] = 1 + climb_id[cs:ce] = k + profile_id[cs:ce] = pid + 1 + state[cs:ce] = 2 + pid += 2 + + # Drift overrides any profile membership profinder may have spanned across + state[drift_mask] = 3 + dive_id[drift_mask] = -1 + climb_id[drift_mask] = -1 + profile_id[drift_mask] = -1 + + ids = (state, dive_id, climb_id, profile_id) + _absorb_pre_drift_transients(*ids, drift_mask, dt_s, min_surface_time) + _absorb_post_drift_transients(*ids, drift_mask, dt_s, min_surface_time) + _absorb_apex_unknowns(*ids, ds.pressure.values, dt_s, min_surface_time) + + fill = dict(_FillValue=np.int32(-1)) + ds["dive_id"] = ("time", dive_id, fill) + ds["climb_id"] = ("time", climb_id, fill) + ds["profile_id"] = ("time", profile_id, fill) + ds["state"] = ("time", state, _STATE_ATTRS) return ds @@ -109,61 +348,63 @@ def assign_surface_state( dt: float = 300.0, surface_pressure: float = 2.0, ) -> xr.Dataset: - """Assign surface state (0) to unknown points near GPS fixes. - - A point with state == -1 (unknown) is marked as surface (0) only if it is - within `dt` seconds of a valid GPS fix AND shallower than `surface_pressure` - dbar. The pressure gate prevents the broad GPS time window from reaching - into adjacent dives. - - Parameters - ---------- - ds : xr.Dataset - Dataset with state variable from get_profiles. - flt : xr.Dataset, optional - Raw flight data containing m_gps_lat/lon with valid GPS fix times. - dt : float - Time threshold in seconds for matching to GPS fixes (default 300). - surface_pressure : float - Maximum pressure (dbar) for a point to be considered at the surface - (default 2.0). + """Assign surface state (0) to unknown points. + + Two passes: (1) GPS-proximity — unknowns within `dt` seconds of a valid + GPS fix AND shallower than `surface_pressure`; (2) whole-segment — any + contiguous unknown run with no finite pressure ≥ `surface_pressure`. + Missing pressure is the expected state at the surface (sensor out of + water), so all-NaN runs are treated as surface — except when bounded by + dive→climb (the underwater apex), which is left unknown. """ - if flt is None or "m_gps_lat" not in flt: - _log.warning("No flight data with GPS, cannot assign surface state") - return ds - - gps_valid = np.isfinite(flt.m_gps_lat.values) - if not gps_valid.any(): - _log.warning("No valid GPS fixes found") - return ds - - gps_times = np.sort(flt.m_present_time.values[gps_valid]) - state = ds.state.values.copy() + pressure = ds.pressure.values time_l2 = ds.time.values - unknown_mask = state == -1 - - if unknown_mask.any(): - unknown_times = time_l2[unknown_mask] - pos = np.searchsorted(gps_times, unknown_times) - dist_left = np.where( - pos > 0, - unknown_times - gps_times[np.maximum(pos - 1, 0)], - np.inf, - ) - dist_right = np.where( - pos < len(gps_times), - gps_times[np.minimum(pos, len(gps_times) - 1)] - unknown_times, - np.inf, - ) - is_near = np.minimum(dist_left, dist_right) <= dt - is_shallow = np.isfinite(ds.pressure.values[unknown_mask]) & ( - ds.pressure.values[unknown_mask] < surface_pressure - ) - is_near = is_near & is_shallow - state[unknown_mask] = np.where(is_near, np.int8(0), state[unknown_mask]) - _log.debug("Assigned %d points to surface state", int(is_near.sum())) + if flt is not None and "m_gps_lat" in flt: + gps_valid = np.isfinite(flt.m_gps_lat.values) + if gps_valid.any(): + gps_times = np.sort(flt.m_present_time.values[gps_valid]) + unk = state == -1 + if unk.any(): + ut = time_l2[unk] + pos = np.searchsorted(gps_times, ut) + left = np.where(pos > 0, ut - gps_times[np.maximum(pos - 1, 0)], np.inf) + right = np.where( + pos < len(gps_times), + gps_times[np.minimum(pos, len(gps_times) - 1)] - ut, + np.inf, + ) + near = np.minimum(left, right) <= dt + shallow = np.isfinite(pressure[unk]) & ( + pressure[unk] < surface_pressure + ) + state[unk] = np.where(near & shallow, np.int8(0), state[unk]) + else: + _log.warning("No valid GPS fixes found") + else: + _log.warning("No flight data with GPS, skipping GPS-proximity surface check") + + unknown = state == -1 + if unknown.any(): + n = len(state) + diff = np.diff(unknown.astype(int), prepend=0, append=0) + starts, ends = np.where(diff == 1)[0], np.where(diff == -1)[0] + for s, e in zip(starts, ends): + p = pressure[s:e] + finite = np.isfinite(p) + if finite.any() and float(np.max(p[finite])) >= surface_pressure: + continue + # All-NaN gap bounded by dive→climb is the underwater apex; leave it + if ( + not finite.any() + and 0 < s + and e < n + and state[s - 1] == 1 + and state[e] == 2 + ): + continue + state[s:e] = 0 ds["state"] = ("time", state, ds.state.attrs) return ds diff --git a/tests/test_process_l1.py b/tests/test_process_l1.py index 09a555b..57b0472 100644 --- a/tests/test_process_l1.py +++ b/tests/test_process_l1.py @@ -2,6 +2,7 @@ import numpy as np import pandas as pd +import pytest import xarray as xr import glide.gliderdac as gd @@ -374,19 +375,124 @@ def test_assign_surface_state_pressure_gate() -> None: def test_assign_surface_state_no_flight_data() -> None: - """Test that assign_surface_state handles missing flight data gracefully.""" + """Without flight data the GPS-proximity check is skipped, but shallow + unknown segments are still classified as surface by the pressure-only pass.""" n = 20 state = np.full(n, -1, dtype="b") state[5:15] = 1 + pressure = np.full(n, 0.5) + pressure[5:15] = 50.0 # dive at depth ds = xr.Dataset( - {"state": ("time", state)}, + {"state": ("time", state), "pressure": ("time", pressure)}, coords={"time": np.arange(n, dtype=float)}, ) - # No flight data - should return unchanged - result = prof.assign_surface_state(ds, flt=None) - assert np.array_equal(result.state.values, state) + result = prof.assign_surface_state(ds, flt=None, surface_pressure=2.0) + + assert np.all(result.state.values[5:15] == 1), "Dive states unchanged" + assert np.all(result.state.values[:5] == 0), "Pre-dive shallow → surface" + assert np.all(result.state.values[15:] == 0), "Post-dive shallow → surface" + + +def test_assign_surface_state_pre_first_dive_and_post_last_climb() -> None: + """Time before the first dive and after the last climb is classified as + surface when pressure stays below surface_pressure.""" + n = 50 + state = np.full(n, -1, dtype="b") + state[10:20] = 1 # dive + state[20:30] = 2 # climb + pressure = np.full(n, 0.3) + pressure[10:20] = np.linspace(0.3, 80, 10) + pressure[20:30] = np.linspace(80, 0.3, 10) + + ds = xr.Dataset( + {"state": ("time", state), "pressure": ("time", pressure)}, + coords={"time": np.arange(n, dtype=float)}, + ) + + result = prof.assign_surface_state(ds, flt=None, surface_pressure=2.0) + + assert np.all(result.state.values[:10] == 0), "Pre-first-dive → surface" + assert np.all(result.state.values[30:] == 0), "Post-last-climb → surface" + assert np.all(result.state.values[10:20] == 1) + assert np.all(result.state.values[20:30] == 2) + + +def test_assign_surface_state_all_nan_pressure_is_surface() -> None: + """Surface segments where the science pressure sensor is out of water + (all NaN) must still be classified as surface.""" + n = 30 + state = np.full(n, -1, dtype="b") + state[5:15] = 1 # dive + state[15:20] = 2 # climb + pressure = np.full(n, np.nan) + pressure[5:15] = np.linspace(2, 80, 10) + pressure[15:20] = np.linspace(80, 2, 5) + # Indices 0–4 and 20–29 have NaN pressure (surface, sensor out of water) + + ds = xr.Dataset( + {"state": ("time", state), "pressure": ("time", pressure)}, + coords={"time": np.arange(n, dtype=float)}, + ) + + result = prof.assign_surface_state(ds, flt=None, surface_pressure=2.0) + + assert np.all(result.state.values[:5] == 0), "Pre-dive all-NaN → surface" + assert np.all(result.state.values[20:] == 0), "Post-climb all-NaN → surface" + assert np.all(result.state.values[5:15] == 1) + assert np.all(result.state.values[15:20] == 2) + + +def test_assign_surface_state_protects_apex_nan_gap() -> None: + """An all-NaN unknown gap sandwiched between a dive and a climb is the + underwater apex (the apex absorber couldn't split it because pressure + was missing). It must NOT be reclassified as surface.""" + n = 30 + state = np.full(n, -1, dtype="b") + state[5:13] = 1 # dive + state[16:25] = 2 # climb + pressure = np.full(n, np.nan) + pressure[5:13] = np.linspace(2, 80, 8) + pressure[16:25] = np.linspace(80, 2, 9) + # Indices 13–15 NaN (sensor briefly missing at the apex) + + ds = xr.Dataset( + {"state": ("time", state), "pressure": ("time", pressure)}, + coords={"time": np.arange(n, dtype=float)}, + ) + + result = prof.assign_surface_state(ds, flt=None, surface_pressure=2.0) + + assert np.all(result.state.values[13:16] == -1), ( + "Apex NaN gap must stay unknown, not flip to surface" + ) + assert np.all(result.state.values[5:13] == 1) + assert np.all(result.state.values[16:25] == 2) + + +def test_assign_surface_state_keeps_aborted_dive_unknown() -> None: + """An unknown segment that contains a partial dive (pressure exceeds + surface_pressure) must not be reclassified as surface.""" + n = 30 + state = np.full(n, -1, dtype="b") + state[20:25] = 2 # climb + pressure = np.full(n, 0.3) + # Aborted dive between samples 5–15: pressure briefly reaches 4 dbar + pressure[5:15] = np.array([1.0, 2.0, 3.0, 4.0, 4.0, 4.0, 3.0, 2.0, 1.0, 0.5]) + pressure[20:25] = np.linspace(40, 0.3, 5) + + ds = xr.Dataset( + {"state": ("time", state), "pressure": ("time", pressure)}, + coords={"time": np.arange(n, dtype=float)}, + ) + + result = prof.assign_surface_state(ds, flt=None, surface_pressure=2.0) + + # The whole pre-climb segment contains a sample > surface_pressure → stays unknown + assert np.all(result.state.values[:20] == -1) + # Post-climb stays surface (pressure shallow throughout) + assert np.all(result.state.values[25:] == 0) def test_add_velocity_groups_by_velocity_reports() -> None: @@ -911,3 +1017,252 @@ def test_emit_ioos_profiles_yo_yo_shares_time_uv(tmp_path) -> None: # All four profiles share the same scalar time_uv (= the segment's value). assert all(t == time_uvs[0] for t in time_uvs) assert time_uvs[0] == 25.0 + + +def _make_drift_at_depth_dataset(dt_s: float) -> tuple[xr.Dataset, float, float, float]: + """Synthetic surface→dive(105 overshoot)→correction→drift(100±2)→climb→surface + cycle. Returns (ds, t_dive_start, t_drift_start, t_climb_start) in seconds.""" + t_surface, t_dive, t_overshoot, t_drift, t_climb = ( + 120.0, + 1500.0, + 120.0, + 3600.0, + 1500.0, + ) + t0_dive = t_surface + t0_overshoot = t0_dive + t_dive + t0_drift = t0_overshoot + t_overshoot + t0_climb = t0_drift + t_drift + t = np.arange(0.0, t0_climb + t_climb + t_surface, dt_s) + p = np.full(len(t), 0.5) + m = (t >= t0_dive) & (t < t0_overshoot) + p[m] = 105.0 * (t[m] - t0_dive) / t_dive + m = (t >= t0_overshoot) & (t < t0_drift) + p[m] = 105.0 - 5.0 * (t[m] - t0_overshoot) / t_overshoot + m = (t >= t0_drift) & (t < t0_climb) + p[m] = 100.0 + 2.0 * np.sin(2.0 * np.pi * (t[m] - t0_drift) / 300.0) + m = (t >= t0_climb) & (t < t0_climb + t_climb) + p[m] = 100.0 * (1.0 - (t[m] - t0_climb) / t_climb) + return ( + xr.Dataset({"pressure": ("time", p)}, coords={"time": t}), + t0_dive, + t0_drift, + t0_climb, + ) + + +def _make_state_arrays( + n: int, +) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + """Allocate (state, dive_id, climb_id, profile_id) all initialised to -1.""" + return ( + np.full(n, -1, dtype="b"), + np.full(n, -1, dtype="i4"), + np.full(n, -1, dtype="i4"), + np.full(n, -1, dtype="i4"), + ) + + +def _set_profile(arrs, slc, state_val, id_val, pid_val): + """Set state, the matching dive_id or climb_id, and profile_id over slc.""" + state, dive_id, climb_id, profile_id = arrs + state[slc] = state_val + (dive_id if state_val == 1 else climb_id)[slc] = id_val + profile_id[slc] = pid_val + + +@pytest.mark.parametrize("dt_s", [1.0, 15.0], ids=["post_recovery_1s", "realtime_15s"]) +def test_get_profiles_drift_at_depth(dt_s: float) -> None: + """Drift labelled state=3; post-drift climb spans the full water column + rather than being mis-attributed to the brief overshoot correction.""" + ds, _, t_drift, t_climb = _make_drift_at_depth_dataset(dt_s) + result = prof.get_profiles(ds, shallowest_profile=5.0, min_surface_time=180.0) + state, time = result.state.values, ds.time.values + + drift_state = state[(time >= t_drift) & (time < t_climb)] + assert (drift_state == 3).mean() > 0.7 + assert (state[time >= t_climb] == 2).mean() > 0.7 + + climb_p = ds.pressure.values[state == 2] + assert climb_p.max() > 90.0 and climb_p.min() < 5.0 + assert 3 in result.state.attrs["flag_values"] + + +@pytest.mark.parametrize("dt_s", [1.0, 15.0], ids=["post_recovery_1s", "realtime_15s"]) +def test_get_profiles_climb_with_stall(dt_s: float) -> None: + """A 2-minute stall (within the 3-minute tolerance) within the climb must + not split the climb at either sampling rate.""" + t_surf, t_dive, t_pre, t_stall, t_post = 60.0, 800.0, 600.0, 120.0, 400.0 + t0_dive = t_surf + t0_pre = t0_dive + t_dive + t0_stall = t0_pre + t_pre + t0_post = t0_stall + t_stall + t0_end = t0_post + t_post + t = np.arange(0.0, t0_end + t_surf, dt_s) + p = np.full(len(t), 0.5) + m = (t >= t0_dive) & (t < t0_pre) + p[m] = 100.0 * (t[m] - t0_dive) / t_dive + m = (t >= t0_pre) & (t < t0_stall) + p[m] = 100.0 - 80.0 * (t[m] - t0_pre) / t_pre + m = (t >= t0_stall) & (t < t0_post) # stall: 20 → 22 dbar over 2 min + p[m] = 20.0 + 2.0 * (t[m] - t0_stall) / t_stall + m = (t >= t0_post) & (t < t0_end) + p[m] = 22.0 * (1.0 - (t[m] - t0_post) / t_post) + p[0] = np.nan # profinder needs NaN to avoid valid_idx edge case + ds = xr.Dataset({"pressure": ("time", p)}, coords={"time": t}) + + result = prof.get_profiles(ds, shallowest_profile=5.0, min_surface_time=180.0) + assert (result.state.values[(t >= t0_post) & (t < t0_end)] == 2).mean() > 0.7 + cids = np.unique(result.climb_id.values[result.climb_id.values >= 0]) + assert len(cids) == 1 + + +def test_get_profiles_drift_x_hover_active() -> None: + """x_hover_active takes precedence over pressure-based detection. Sparse + transition reports must be forward-filled and -127 sentinel filtered.""" + ds, _, t_drift, t_climb = _make_drift_at_depth_dataset(15.0) + time = ds.time.values + flt = xr.Dataset( + { + "m_present_time": ("i", np.array([0.0, 60.0, t_drift, t_climb])), + "x_hover_active": ("i", np.array([0.0, -127.0, 1.0, 0.0])), + } + ) + result = prof.get_profiles( + ds, shallowest_profile=5.0, min_surface_time=180.0, flt=flt + ) + state = result.state.values + assert (state[(time >= t_drift) & (time < t_climb)] == 3).mean() > 0.9 + assert (state[time >= t_climb] == 2).mean() > 0.7 + + +def test_absorb_post_drift_transients() -> None: + """Brief dive/unknown segments between drift and climb are folded into climb.""" + arrs = _make_state_arrays(50) + state, dive_id, climb_id, profile_id = arrs + drift_mask = np.zeros(50, dtype=bool) + _set_profile(arrs, slice(5, 15), 1, 1, 1) # main dive + state[15:30] = 3 + drift_mask[15:30] = True # drift + _set_profile(arrs, slice(30, 32), 1, 2, 2) # spurious dive + # state[32:34] left as -1 (unknown gap) + _set_profile(arrs, slice(34, 48), 2, 1, 3) # real climb + + prof._absorb_post_drift_transients(*arrs, drift_mask, 10.0, 180.0) + + assert np.all(state[30:34] == 2) and np.all(climb_id[30:34] == 1) + assert np.all(profile_id[30:34] == 3) and np.all(dive_id[30:34] == -1) + assert ( + np.all(state[5:15] == 1) + and np.all(state[15:30] == 3) + and np.all(state[34:48] == 2) + ) + + +def test_absorb_apex_unknowns_splits_at_max_pressure() -> None: + """Short unknown gap between dive and climb is split at the pressure max.""" + arrs = _make_state_arrays(30) + state, dive_id, climb_id, profile_id = arrs + pressure = np.zeros(30) + _set_profile(arrs, slice(5, 13), 1, 1, 1) + pressure[5:13] = np.linspace(10, 80, 8) + pressure[13:17] = [85, 88, 90, 87] # apex at index 15 + _set_profile(arrs, slice(17, 25), 2, 1, 2) + pressure[17:25] = np.linspace(85, 5, 8) + + prof._absorb_apex_unknowns(*arrs, pressure, 10.0, 180.0, 2.0) + + assert np.all(state[13:16] == 1) and np.all(dive_id[13:16] == 1) + assert np.all(profile_id[13:16] == 1) + assert state[16] == 2 and climb_id[16] == 1 and profile_id[16] == 2 + + +def test_absorb_apex_unknowns_climb_to_dive_yo_top() -> None: + """Short underwater gap between climb→dive (yo-top) is split at the + pressure minimum: pre-min → climb, post-min → dive.""" + arrs = _make_state_arrays(30) + state, dive_id, climb_id, profile_id = arrs + pressure = np.zeros(30) + _set_profile(arrs, slice(5, 13), 2, 1, 1) + pressure[5:13] = np.linspace(80, 50, 8) + pressure[13:17] = [48, 45, 47, 50] # yo-top min at index 14 + _set_profile(arrs, slice(17, 25), 1, 2, 2) + pressure[17:25] = np.linspace(55, 200, 8) + + prof._absorb_apex_unknowns(*arrs, pressure, 10.0, 180.0, 2.0) + + assert np.all(state[13:15] == 2) and np.all(climb_id[13:15] == 1) + assert np.all(profile_id[13:15] == 1) + assert np.all(state[15:17] == 1) and np.all(dive_id[15:17] == 2) + assert np.all(profile_id[15:17] == 2) + + +def test_absorb_apex_unknowns_skips_surface_gap() -> None: + """A gap that goes shallow (glider surfaced) is left for the surface classifier.""" + arrs = _make_state_arrays(30) + state, _, _, _ = arrs + pressure = np.zeros(30) + state[5:10] = 1 + pressure[5:10] = np.linspace(10, 80, 5) + state[10:13] = 2 + pressure[10:13] = np.linspace(80, 1, 3) + pressure[13:17] = [0.3, 0.5, 0.4, 0.6] # shallow unknown gap + state[17:22] = 1 + pressure[17:22] = np.linspace(1, 60, 5) + before = state.copy() + + prof._absorb_apex_unknowns(*arrs, pressure, 10.0, 180.0, 2.0) + assert np.array_equal(state, before) + + +def test_absorb_pre_drift_transients() -> None: + """Brief climb/unknown samples between a dive and a drift (overshoot + recovery) are folded into the drift.""" + arrs = _make_state_arrays(50) + state, dive_id, climb_id, profile_id = arrs + drift_mask = np.zeros(50, dtype=bool) + _set_profile(arrs, slice(5, 15), 1, 1, 1) # dive (peak overshoot) + _set_profile(arrs, slice(15, 18), 2, 1, 2) # brief recovery climb + state[18:20] = -1 # brief unknown + state[20:35] = 3 + drift_mask[20:35] = True # drift + _set_profile(arrs, slice(35, 45), 2, 2, 3) # post-drift climb + + prof._absorb_pre_drift_transients(*arrs, drift_mask, 10.0, 180.0) + + # The recovery climb and unknown gap are now part of the drift + assert np.all(state[15:20] == 3) + assert np.all(climb_id[15:20] == -1) and np.all(dive_id[15:20] == -1) + assert np.all(profile_id[15:20] == -1) + # Surrounding dive and post-drift climb are unchanged + assert np.all(state[5:15] == 1) and np.all(state[35:45] == 2) + + +def test_absorb_pre_drift_transients_does_not_swallow_long_climb() -> None: + """A long climb (>max_transient_duration) before a drift is not absorbed.""" + arrs = _make_state_arrays(60) + state, _, _, _ = arrs + drift_mask = np.zeros(60, dtype=bool) + _set_profile(arrs, slice(0, 10), 1, 1, 1) + _set_profile(arrs, slice(10, 35), 2, 1, 2) # 250 s > 180 s + state[35:50] = 3 + drift_mask[35:50] = True + before = state.copy() + + prof._absorb_pre_drift_transients(*arrs, drift_mask, 10.0, 180.0) + assert np.array_equal(state, before) + + +def test_absorb_post_drift_transients_does_not_swallow_real_dive() -> None: + """A long dive (>max_transient_duration) between drift and climb is not absorbed.""" + arrs = _make_state_arrays(60) + state, _, _, _ = arrs + drift_mask = np.zeros(60, dtype=bool) + state[5:15] = 3 + drift_mask[5:15] = True + _set_profile(arrs, slice(15, 35), 1, 1, 1) # 200 s > 180 s + _set_profile(arrs, slice(40, 55), 2, 1, 2) + before = state.copy() + + prof._absorb_post_drift_transients(*arrs, drift_mask, 10.0, 180.0) + assert np.array_equal(state, before)