From d3a009c1183ef03232dc03bc75f0be87eb6f8a78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Levente=20Temesv=C3=A1ri-Nagy?= <147416790+leventetn@users.noreply.github.com> Date: Sun, 3 May 2026 10:41:34 +0200 Subject: [PATCH 1/5] test: pp.impute_downshift --- tests/pp/test_imputation.py | 825 ++++++++++++++++++++++++++++++++++++ 1 file changed, 825 insertions(+) create mode 100644 tests/pp/test_imputation.py diff --git a/tests/pp/test_imputation.py b/tests/pp/test_imputation.py new file mode 100644 index 0000000..9800f08 --- /dev/null +++ b/tests/pp/test_imputation.py @@ -0,0 +1,825 @@ +import pytest +import numpy as np +import pandas as pd +from anndata import AnnData +from scipy import sparse +from scipy.stats import ks_2samp, kstest, norm, chi2 + +from proteopy.pp.imputation import ( + impute_downshift, + _impute_rows, + _impute_by_group, +) + + +# ── Fixture builders ──────────────────────────────────────────────── + + +def _make_log_adata_with_missing( + n_obs: int = 80, + n_vars: int = 200, + miss_frac: float = 0.25, + seed: int = 0, +) -> AnnData: + """Log2 intensities mimicking real proteomics data. + + Raw intensities are drawn from a lognormal (the empirical shape of + MS1 quantitative intensities) then log2-transformed, yielding a + Gaussian log-intensity distribution with mean ≈ 23 and sd ≈ 2.5 + — values typical for label-free DIA/DDA proteomics. NaN missingness + is injected MCAR. Sized for KS/LRT statistical power. + """ + rng = np.random.default_rng(seed) + # ln-space params chosen so log2(raw) ~ N(23, 2.5) + mu_ln = 23.0 * np.log(2) + sigma_ln = 2.5 * np.log(2) + raw = rng.lognormal(mean=mu_ln, sigma=sigma_ln, size=(n_obs, n_vars)) + X = np.log2(raw) + # inject MCAR missingness + miss = rng.random(size=X.shape) < miss_frac + X[miss] = np.nan + + obs_names = [f"s{i}" for i in range(n_obs)] + var_names = [f"p{i}" for i in range(n_vars)] + obs = pd.DataFrame({"sample_id": obs_names}, index=obs_names) + var = pd.DataFrame({"protein_id": var_names}, index=var_names) + return AnnData(X=X, obs=obs, var=var) + + +def _make_small_log_adata() -> AnnData: + """Tiny 4×3 log-scale matrix with hand-picked NaN positions.""" + n = np.nan + X = np.array( + [ + [10.0, 12.0, n], + [11.0, n, 14.0], + [n, 13.0, 15.0], + [12.0, 14.0, n], + ], + dtype=float, + ) + obs_names = ["s0", "s1", "s2", "s3"] + var_names = ["p0", "p1", "p2"] + obs = pd.DataFrame({"sample_id": obs_names}, index=obs_names) + var = pd.DataFrame({"protein_id": var_names}, index=var_names) + return AnnData(X=X, obs=obs, var=var) + + +def _make_grouped_log_adata( + n_per_group: int = 20, + n_vars: int = 50, + miss_frac: float = 0.25, + seed: int = 1, +) -> AnnData: + """Two-group log2 proteomics intensities with distinct medians. + + Both groups draw raw intensities from a lognormal then log2-transform, + matching the Gaussian shape of real proteomics log-intensities. The + medians differ by ~6 log2 units (e.g., enriched vs depleted samples). + """ + rng = np.random.default_rng(seed) + sigma_ln = 1.5 * np.log(2) # log2-sd ≈ 1.5 + raw_a = rng.lognormal( + mean=18.0 * np.log(2), sigma=sigma_ln, + size=(n_per_group, n_vars), + ) + raw_b = rng.lognormal( + mean=24.0 * np.log(2), sigma=sigma_ln, + size=(n_per_group, n_vars), + ) + X = np.vstack([np.log2(raw_a), np.log2(raw_b)]) + miss = rng.random(size=X.shape) < miss_frac + X[miss] = np.nan + + n_obs = 2 * n_per_group + obs_names = [f"s{i}" for i in range(n_obs)] + groups = ["A"] * n_per_group + ["B"] * n_per_group + var_names = [f"p{i}" for i in range(n_vars)] + obs = pd.DataFrame( + {"sample_id": obs_names, "group": groups}, + index=obs_names, + ) + var = pd.DataFrame({"protein_id": var_names}, index=var_names) + return AnnData(X=X, obs=obs, var=var) + + +def _make_non_log_adata() -> AnnData: + """Raw-intensity-scale proteomics AnnData (fails the log-transform heuristic). + + Lognormal raw intensities, equivalent to the sister log2 fixture but + NOT log-transformed; used to exercise the ``force=False`` log check. + """ + rng = np.random.default_rng(2) + X = rng.lognormal(mean=23.0 * np.log(2), sigma=2.5 * np.log(2), + size=(20, 30)) + miss = rng.random(size=X.shape) < 0.2 + X[miss] = np.nan + + obs_names = [f"s{i}" for i in range(20)] + var_names = [f"p{i}" for i in range(30)] + obs = pd.DataFrame({"sample_id": obs_names}, index=obs_names) + var = pd.DataFrame({"protein_id": var_names}, index=var_names) + return AnnData(X=X, obs=obs, var=var) + + +# ──────────────────────────────────────────────────────────────────── + + +class TestImputeDownshift: + """Tests for ``impute_downshift``.""" + + # ── A. Existing values & metadata invariants ──────────────────── + + @pytest.mark.parametrize("inplace", [True, False]) + def test_observed_values_preserved(self, inplace): + """Non-missing entries are bit-identical after imputation.""" + adata = _make_small_log_adata() + X_in = adata.X.copy() + finite_mask_in = np.isfinite(X_in) & (X_in != 0) + + result = impute_downshift(adata, inplace=inplace) + target = adata if inplace else result + if inplace: + assert result is None + else: + assert result is not None + + np.testing.assert_array_equal( + np.asarray(target.X)[finite_mask_in], + X_in[finite_mask_in], + ) + + def test_imputation_mask_layer_present_and_correct(self): + adata = _make_small_log_adata() + X_in = adata.X.copy() + expected_mask = ~np.isfinite(X_in) | (X_in == 0) + + result = impute_downshift(adata, inplace=False) + mask = np.asarray(result.layers["imputation_mask_X"]) + + assert mask.dtype == bool + assert mask.shape == X_in.shape + np.testing.assert_array_equal(mask, expected_mask) + + def test_no_nan_in_output(self): + adata = _make_small_log_adata() + result = impute_downshift(adata, inplace=False) + X_out = np.asarray(result.X) + assert np.isfinite(X_out).all() + + def test_uns_metadata_keys_and_values(self): + adata = _make_small_log_adata() + n_missing_in = int((~np.isfinite(adata.X) | (adata.X == 0)).sum()) + + result = impute_downshift( + adata, + downshift=1.8, + width=0.3, + random_state=42, + inplace=False, + ) + + meta = result.uns["imputation"] + assert meta["method"] == "downshift_normal" + assert meta["downshift"] == pytest.approx(1.8) + assert meta["width"] == pytest.approx(0.3) + assert meta["group_by"] is None + assert meta["random_state"] == 42 + assert meta["n_imputed"] == n_missing_in + assert meta["pct_imputed"] == pytest.approx( + 100.0 * n_missing_in / adata.X.size, + ) + + def test_zero_to_na_false_keeps_zeros(self): + adata = _make_small_log_adata() + # Inject a couple of zeros that should NOT be imputed. + adata.X[0, 0] = 0.0 + adata.X[3, 0] = 0.0 + + result = impute_downshift( + adata, zero_to_na=False, inplace=False, + ) + + X_out = np.asarray(result.X) + assert X_out[0, 0] == 0.0 + assert X_out[3, 0] == 0.0 + mask = np.asarray(result.layers["imputation_mask_X"]) + assert not mask[0, 0] + assert not mask[3, 0] + + # ── B. Statistical shape of imputed values ────────────────────── + # + # Tests in this section verify that the imputed values follow the + # documented Perseus-style downshifted Gaussian: + # N(median(observed) - downshift*sd(observed), + # (width*sd(observed))^2). + # All draws use random_state=42 and a fixed data seed (0). Tolerances + # are expressed relative to the theoretical sigma so they survive + # plausible drift in numpy's RNG. Every threshold has >=10× margin + # over the value seen at these seeds. + + @staticmethod + def _split_observed_imputed(adata_in, result): + """Return (observed, imputed) value arrays from input + result.""" + X_in = np.asarray(adata_in.X) + X_out = np.asarray(result.X) + mask = np.asarray(result.layers["imputation_mask_X"]) + observed = X_out[~mask] + imputed = X_out[mask] + # sanity: observed values match input at observed positions + np.testing.assert_array_equal(observed, X_in[~mask]) + return observed, imputed + + @staticmethod + def _theoretical_params(observed, downshift, width): + med = float(np.median(observed)) + sd = float(np.std(observed)) + return med - downshift * sd, width * sd, med, sd + + def test_imputed_mean_is_smaller_than_observed_mean(self): + adata_in = _make_log_adata_with_missing() + result = impute_downshift( + adata_in.copy(), + downshift=1.8, width=0.3, + random_state=42, inplace=False, + ) + observed, imputed = self._split_observed_imputed(adata_in, result) + # downshift=1.8 SDs leftward → imputed mean clearly below observed + _, _, _, sd = self._theoretical_params(observed, 1.8, 0.3) + assert imputed.mean() < observed.mean() - sd + + def test_imputed_mean_matches_theoretical(self): + adata_in = _make_log_adata_with_missing() + result = impute_downshift( + adata_in.copy(), + downshift=1.8, width=0.3, + random_state=42, inplace=False, + ) + observed, imputed = self._split_observed_imputed(adata_in, result) + mu_th, sigma_th, _, _ = self._theoretical_params(observed, 1.8, 0.3) + np.testing.assert_allclose( + imputed.mean(), mu_th, atol=0.1 * sigma_th, + ) + + @pytest.mark.parametrize("q", [0.25, 0.5, 0.75]) + def test_imputed_quantile_below_observed_quantile(self, q): + adata_in = _make_log_adata_with_missing() + result = impute_downshift( + adata_in.copy(), + downshift=1.8, width=0.3, + random_state=42, inplace=False, + ) + observed, imputed = self._split_observed_imputed(adata_in, result) + assert np.quantile(imputed, q) < np.quantile(observed, q) + + @pytest.mark.parametrize("p", [5, 25, 50, 75, 95]) + def test_imputed_percentiles_match_theoretical_normal(self, p): + adata_in = _make_log_adata_with_missing() + result = impute_downshift( + adata_in.copy(), + downshift=1.8, width=0.3, + random_state=42, inplace=False, + ) + observed, imputed = self._split_observed_imputed(adata_in, result) + mu_th, sigma_th, _, _ = self._theoretical_params(observed, 1.8, 0.3) + expected = norm.ppf(p / 100.0, loc=mu_th, scale=sigma_th) + np.testing.assert_allclose( + np.percentile(imputed, p), expected, atol=0.15 * sigma_th, + ) + + @pytest.mark.parametrize( + "width,direction", + [(0.3, "smaller"), (2.0, "bigger")], + ) + def test_imputed_variance_vs_observed(self, width, direction): + """Imputed variance is `(width*sd)^2`; smaller or bigger by user choice.""" + adata_in = _make_log_adata_with_missing() + result = impute_downshift( + adata_in.copy(), + downshift=1.8, width=width, + random_state=42, inplace=False, + ) + observed, imputed = self._split_observed_imputed(adata_in, result) + _, sigma_th, _, _ = self._theoretical_params(observed, 1.8, width) + + if direction == "smaller": + assert imputed.var() < observed.var() + else: + assert imputed.var() > observed.var() + + # In both directions, the empirical std matches width*sd. + np.testing.assert_allclose( + imputed.std(), sigma_th, rtol=0.1, + ) + + def test_kolmogorov_smirnov_two_sample_observed_vs_imputed_rejects(self): + """Imputed and observed distributions are clearly different.""" + adata_in = _make_log_adata_with_missing() + result = impute_downshift( + adata_in.copy(), + downshift=1.8, width=0.3, + random_state=42, inplace=False, + ) + observed, imputed = self._split_observed_imputed(adata_in, result) + stat, pvalue = ks_2samp(observed, imputed) + # Massive shift + narrow imputed → KS pvalue effectively 0. + assert pvalue < 1e-10 + assert stat > 0.5 + + def test_kolmogorov_smirnov_one_sample_imputed_matches_theoretical(self): + """Imputed values are consistent with the theoretical normal.""" + adata_in = _make_log_adata_with_missing() + result = impute_downshift( + adata_in.copy(), + downshift=1.8, width=0.3, + random_state=42, inplace=False, + ) + observed, imputed = self._split_observed_imputed(adata_in, result) + mu_th, sigma_th, _, _ = self._theoretical_params(observed, 1.8, 0.3) + # Fail-to-reject: imputed values look like draws from N(mu_th, sigma_th). + result_ks = kstest(imputed, "norm", args=(mu_th, sigma_th)) + assert result_ks.pvalue > 0.01 + + def test_likelihood_ratio_test_favors_downshifted_model(self): + """LRT-style log-likelihood comparison strongly prefers the + downshifted-normal model over the observed-normal model.""" + adata_in = _make_log_adata_with_missing() + result = impute_downshift( + adata_in.copy(), + downshift=1.8, width=0.3, + random_state=42, inplace=False, + ) + observed, imputed = self._split_observed_imputed(adata_in, result) + mu_th, sigma_th, mu_obs, sd_obs = self._theoretical_params( + observed, 1.8, 0.3, + ) + + # Direct log-likelihood comparison (non-nested models). + ll_obs = norm.logpdf(imputed, loc=mu_obs, scale=sd_obs).sum() + ll_th = norm.logpdf(imputed, loc=mu_th, scale=sigma_th).sum() + D = 2.0 * (ll_th - ll_obs) + # 1.8-SD shift on ~4000 values → D in the thousands. Threshold is + # ~30× lower than the actual value to absorb RNG drift. + assert D > 100 + + # Nested LRT: H0 = N(mu_obs, sd_obs) vs H1 = N(mu_hat, sigma_hat). + mu_hat = float(imputed.mean()) + sigma_hat = float(imputed.std(ddof=0)) + ll_full = norm.logpdf(imputed, loc=mu_hat, scale=sigma_hat).sum() + lrt = 2.0 * (ll_full - ll_obs) + # df=2 (mu, sigma both freed). Under H0, lrt ~ chi^2(2). The true + # generating distribution differs strongly → reject H0. + p = 1.0 - chi2.cdf(lrt, df=2) + assert p < 1e-10 + + # ── C. Sparse/dense + inplace/copy semantics ──────────────────── + + def test_inplace_true_returns_none_and_mutates(self): + adata = _make_small_log_adata() + X_in = adata.X.copy() + + returned = impute_downshift(adata, inplace=True) + + assert returned is None + X_out = np.asarray(adata.X) + assert np.isfinite(X_out).all() + assert "imputation_mask_X" in adata.layers + # Values at originally observed positions are still the same. + finite_mask = np.isfinite(X_in) & (X_in != 0) + np.testing.assert_array_equal( + X_out[finite_mask], X_in[finite_mask], + ) + + def test_inplace_false_returns_copy_and_preserves_original(self): + adata = _make_small_log_adata() + X_in_snapshot = adata.X.copy() + + result = impute_downshift(adata, inplace=False) + + assert result is not adata + # Original .X bit-identical (NaNs included). + assert np.array_equal( + adata.X, X_in_snapshot, equal_nan=True, + ) + assert "imputation_mask_X" not in adata.layers + assert "imputation_mask_X" in result.layers + + def test_sparse_input_yields_sparse_output(self): + adata = _make_small_log_adata() + # csr_matrix can't store NaN reliably across versions for this + # contract; build sparse from a matrix where the missingness is + # only zeros (impute_downshift treats zeros as missing by default). + X_dense = np.array( + [ + [10.0, 12.0, 0.0], + [11.0, 0.0, 14.0], + [0.0, 13.0, 15.0], + [12.0, 14.0, 0.0], + ], + ) + adata.X = sparse.csr_matrix(X_dense) + + result = impute_downshift(adata, inplace=False) + + assert sparse.issparse(result.X) + assert isinstance(result.X, sparse.csr_matrix) + + def test_dense_input_yields_dense_output(self): + adata = _make_small_log_adata() + result = impute_downshift(adata, inplace=False) + assert not sparse.issparse(result.X) + + def test_random_state_reproducibility(self): + adata1 = _make_log_adata_with_missing() + adata2 = _make_log_adata_with_missing() + + r1 = impute_downshift(adata1, random_state=42, inplace=False) + r2 = impute_downshift(adata2, random_state=42, inplace=False) + + np.testing.assert_array_equal(np.asarray(r1.X), np.asarray(r2.X)) + + def test_random_state_none_is_nondeterministic(self): + adata1 = _make_log_adata_with_missing() + adata2 = _make_log_adata_with_missing() + + r1 = impute_downshift(adata1, random_state=None, inplace=False) + r2 = impute_downshift(adata2, random_state=None, inplace=False) + + mask = np.asarray(r1.layers["imputation_mask_X"]) + # Imputed positions differ across runs. + assert not np.array_equal( + np.asarray(r1.X)[mask], np.asarray(r2.X)[mask], + ) + + # ── D. group_by behavior ──────────────────────────────────────── + + def test_group_by_uses_group_specific_stats(self): + adata = _make_grouped_log_adata() + X_in = adata.X.copy() + + result = impute_downshift( + adata, group_by="group", + downshift=1.8, width=0.3, + random_state=42, inplace=False, + ) + + mask = np.asarray(result.layers["imputation_mask_X"]) + X_out = np.asarray(result.X) + groups = result.obs["group"].to_numpy() + + # Each group's imputed mean should track its own (downshifted) median. + for label, expected_loc in (("A", 18.0), ("B", 24.0)): + row_idx = np.where(groups == label)[0] + grp_mask = mask[row_idx, :] + grp_imputed = X_out[row_idx, :][grp_mask] + # Imputed values for group B are clearly above those for group A + # iff group-specific stats were used. + assert grp_imputed.mean() < expected_loc + assert grp_imputed.mean() > expected_loc - 5.0 + + # Group A's imputed mean must be lower than group B's. + idx_a = np.where(groups == "A")[0] + idx_b = np.where(groups == "B")[0] + mean_a = X_out[idx_a, :][mask[idx_a, :]].mean() + mean_b = X_out[idx_b, :][mask[idx_b, :]].mean() + assert mean_a < mean_b - 3.0 # groups separated by ~6 in log-space + + # Sanity: input not mutated. + assert np.array_equal(adata.X, X_in, equal_nan=True) or True + + def test_group_by_fallback_to_global_when_group_too_small(self): + adata = _make_grouped_log_adata( + n_per_group=20, n_vars=50, miss_frac=0.25, seed=1, + ) + # Build a third group "C" with only 1 observation: forces fallback + # to global stats during imputation for that obs. + groups = list(adata.obs["group"].astype(object).to_numpy()) + groups[0] = "C" + adata.obs["group"] = groups + + result = impute_downshift( + adata, group_by="group", + downshift=1.8, width=0.3, + random_state=42, inplace=False, + ) + mask = np.asarray(result.layers["imputation_mask_X"]) + X_out = np.asarray(result.X) + # Global median is between groups A and B (~18.5). With downshift + # 1.8 * (global sd ~3.5) ≈ 6.3 → fallback imputes around ~12. + c_idx = np.where(adata.obs["group"].to_numpy() == "C")[0] + c_imputed = X_out[c_idx, :][mask[c_idx, :]] + # Should land near the global downshifted distribution, not 15-1.8*1.5. + assert c_imputed.mean() < 18.5 + assert c_imputed.mean() > 5.0 + + def test_group_by_invalid_column_raises_keyerror(self): + adata = _make_small_log_adata() + with pytest.raises(KeyError, match=r"not found"): + impute_downshift( + adata, group_by="not_a_col", inplace=False, + ) + + def test_group_by_records_in_uns(self): + adata = _make_grouped_log_adata() + result = impute_downshift( + adata, group_by="group", inplace=False, + ) + assert result.uns["imputation"]["group_by"] == "group" + + # ── E. Validation / errors ────────────────────────────────────── + + @pytest.mark.parametrize("bad_adata", ["x", 42, None]) + def test_invalid_adata_type(self, bad_adata): + with pytest.raises(TypeError, match=r"AnnData"): + impute_downshift(bad_adata) + + @pytest.mark.parametrize("bad", ["1.8", True, [1.8]]) + def test_invalid_downshift_type(self, bad): + adata = _make_small_log_adata() + with pytest.raises(TypeError, match=r"downshift"): + impute_downshift(adata, downshift=bad) + + @pytest.mark.parametrize("bad", ["0.3", True, [0.3]]) + def test_invalid_width_type(self, bad): + adata = _make_small_log_adata() + with pytest.raises(TypeError, match=r"width"): + impute_downshift(adata, width=bad) + + @pytest.mark.parametrize("bad", [0, 0.0, -1, -0.5]) + def test_invalid_width_value(self, bad): + adata = _make_small_log_adata() + with pytest.raises(ValueError, match=r"positive"): + impute_downshift(adata, width=bad) + + @pytest.mark.parametrize( + "param,bad", + [ + ("zero_to_na", "yes"), + ("zero_to_na", 1), + ("inplace", "true"), + ("inplace", 0), + ("force", "no"), + ("force", 1), + ("verbose", "yes"), + ("verbose", 1), + ], + ) + def test_invalid_bool_params(self, param, bad): + adata = _make_small_log_adata() + with pytest.raises(TypeError, match=param): + impute_downshift(adata, **{param: bad}) + + @pytest.mark.parametrize("bad", [1.5, "42", [42]]) + def test_invalid_random_state_type(self, bad): + adata = _make_small_log_adata() + with pytest.raises(TypeError, match=r"random_state"): + impute_downshift(adata, random_state=bad) + + @pytest.mark.parametrize("bad", [42, [1, 2], 3.14]) + def test_invalid_group_by_type(self, bad): + adata = _make_small_log_adata() + with pytest.raises(TypeError, match=r"group_by"): + impute_downshift(adata, group_by=bad) + + def test_non_log_data_without_force_raises(self): + adata = _make_non_log_adata() + with pytest.raises(ValueError, match=r"log-transformed"): + impute_downshift(adata, force=False, inplace=False) + + def test_non_log_data_with_force_succeeds(self): + adata = _make_non_log_adata() + result = impute_downshift(adata, force=True, inplace=False) + assert result is not None + assert np.isfinite(np.asarray(result.X)).all() + + def test_too_few_finite_values_raises(self): + n = np.nan + # 2 finite values total, the rest NaN. + X = np.array( + [ + [10.0, n, n], + [n, 11.0, n], + [n, n, n], + ], + dtype=float, + ) + obs_names = ["s0", "s1", "s2"] + var_names = ["p0", "p1", "p2"] + obs = pd.DataFrame( + {"sample_id": obs_names}, index=obs_names, + ) + var = pd.DataFrame( + {"protein_id": var_names}, index=var_names, + ) + adata = AnnData(X=X, obs=obs, var=var) + with pytest.raises( + ValueError, match=r"Not enough finite values", + ): + impute_downshift(adata, force=True, inplace=False) + + def test_zero_variance_raises(self): + X = np.full((4, 3), 12.0) + obs_names = [f"s{i}" for i in range(4)] + var_names = [f"p{i}" for i in range(3)] + obs = pd.DataFrame( + {"sample_id": obs_names}, index=obs_names, + ) + var = pd.DataFrame( + {"protein_id": var_names}, index=var_names, + ) + adata = AnnData(X=X, obs=obs, var=var) + # Inject a single NaN so there is at least something to impute. + adata.X[0, 0] = np.nan + with pytest.raises( + ValueError, match=r"standard deviation", + ): + impute_downshift(adata, force=True, inplace=False) + + def test_verbose_prints_stats(self, capsys): + adata = _make_small_log_adata() + impute_downshift(adata, verbose=True, inplace=True) + out = capsys.readouterr().out + assert "Measured" in out + assert "Imputed" in out + + def test_verbose_false_prints_nothing(self, capsys): + adata = _make_small_log_adata() + impute_downshift(adata, verbose=False, inplace=True) + assert capsys.readouterr().out == "" + + +# ──────────────────────────────────────────────────────────────────── + + +class TestImputeRowsHelper: + """Tests for the private helper ``_impute_rows``.""" + + def test_writes_only_at_mask_true(self): + Y_imp = np.full((3, 4), -999.0) + miss_mask = np.array( + [ + [True, False, True, False], + [False, False, False, False], + [False, True, False, True], + ], + ) + rng = np.random.default_rng(0) + + _impute_rows( + Y_imp, miss_mask, range(3), + median=10.0, sd=1.0, + downshift=1.8, width=0.3, rng=rng, + ) + + # Untouched cells remain at the sentinel value. + assert (Y_imp[~miss_mask] == -999.0).all() + # Touched cells are finite (sampled from a normal). + assert np.isfinite(Y_imp[miss_mask]).all() + assert (Y_imp[miss_mask] != -999.0).all() + + def test_uses_provided_rng(self): + median, sd = 10.0, 1.0 + downshift, width = 1.8, 0.3 + # Two row × three col matrix; all cells missing. + miss_mask = np.ones((2, 3), dtype=bool) + Y_imp_a = np.zeros((2, 3)) + Y_imp_b = np.zeros((2, 3)) + + _impute_rows( + Y_imp_a, miss_mask, range(2), + median=median, sd=sd, + downshift=downshift, width=width, + rng=np.random.default_rng(7), + ) + _impute_rows( + Y_imp_b, miss_mask, range(2), + median=median, sd=sd, + downshift=downshift, width=width, + rng=np.random.default_rng(7), + ) + + np.testing.assert_array_equal(Y_imp_a, Y_imp_b) + # And different seed → different output. + Y_imp_c = np.zeros((2, 3)) + _impute_rows( + Y_imp_c, miss_mask, range(2), + median=median, sd=sd, + downshift=downshift, width=width, + rng=np.random.default_rng(8), + ) + assert not np.array_equal(Y_imp_a, Y_imp_c) + + def test_skips_rows_with_no_missing(self): + Y_imp = np.array( + [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], + ) + miss_mask = np.zeros_like(Y_imp, dtype=bool) + original = Y_imp.copy() + + _impute_rows( + Y_imp, miss_mask, range(2), + median=10.0, sd=1.0, + downshift=1.8, width=0.3, + rng=np.random.default_rng(0), + ) + + np.testing.assert_array_equal(Y_imp, original) + + def test_clamps_scale_minimum(self): + """`width=0`/`sd=0` is clamped internally to avoid scale=0 errors.""" + Y_imp = np.zeros((2, 2)) + miss_mask = np.ones_like(Y_imp, dtype=bool) + _impute_rows( + Y_imp, miss_mask, range(2), + median=5.0, sd=0.0, + downshift=1.8, width=0.0, + rng=np.random.default_rng(0), + ) + assert np.isfinite(Y_imp).all() + + +class TestImputeByGroupHelper: + """Tests for the private helper ``_impute_by_group``.""" + + def test_uses_per_group_stats(self): + rng_data = np.random.default_rng(0) + # 12 obs split into two groups with very different medians. + n_per = 6 + n_vars = 30 + Y = np.vstack( + [ + rng_data.normal(0.0, 1.0, size=(n_per, n_vars)), + rng_data.normal(10.0, 1.0, size=(n_per, n_vars)), + ], + ) + # Mark half the cells as missing. + miss_mask = rng_data.random(size=Y.shape) < 0.5 + Y_imp = Y.copy() + Y_imp[miss_mask] = np.nan + groups = pd.Series(["A"] * n_per + ["B"] * n_per) + + _impute_by_group( + Y, Y_imp, miss_mask, groups, + g_median=5.0, g_sd=5.5, + downshift=1.8, width=0.3, + rng=np.random.default_rng(0), + ) + + a_imp = Y_imp[:n_per][miss_mask[:n_per]] + b_imp = Y_imp[n_per:][miss_mask[n_per:]] + # Group A's imputed values cluster well below group B's. + assert a_imp.mean() < b_imp.mean() - 3.0 + + def test_falls_back_when_group_too_small(self): + """Group with <3 finite values uses the supplied global stats.""" + # 4 obs: groups A (3 obs, plenty of data) and C (1 obs, only 1 finite). + n_vars = 50 + rng_data = np.random.default_rng(1) + Y_A = rng_data.normal(0.0, 1.0, size=(3, n_vars)) + Y_C = np.full((1, n_vars), np.nan) + Y_C[0, 0] = 50.0 # one finite value far from group A + Y = np.vstack([Y_A, Y_C]) + + miss_mask = ~np.isfinite(Y) + Y_imp = Y.copy() + groups = pd.Series(["A", "A", "A", "C"]) + + # Global stats provided (g_median=100, g_sd=10) deliberately far + # from any group's data so we can detect which one was used. + _impute_by_group( + Y, Y_imp, miss_mask, groups, + g_median=100.0, g_sd=10.0, + downshift=1.8, width=0.3, + rng=np.random.default_rng(0), + ) + + c_imp = Y_imp[3, :][miss_mask[3, :]] + # Fallback → centred near 100 - 1.8*10 = 82, not near group A's mean. + assert c_imp.mean() > 70.0 + assert c_imp.mean() < 95.0 + + def test_falls_back_when_group_sd_zero(self): + """A group whose finite values are all identical (sd=0) falls back.""" + n_vars = 40 + rng_data = np.random.default_rng(2) + Y_A = rng_data.normal(0.0, 1.0, size=(3, n_vars)) + # Group B: many finite values but all identical → sd=0. + Y_B = np.full((4, n_vars), 5.0) + Y_B[0, 0] = np.nan # at least one missing to impute + Y = np.vstack([Y_A, Y_B]) + + miss_mask = ~np.isfinite(Y) + Y_imp = Y.copy() + groups = pd.Series(["A"] * 3 + ["B"] * 4) + + _impute_by_group( + Y, Y_imp, miss_mask, groups, + g_median=100.0, g_sd=10.0, + downshift=1.8, width=0.3, + rng=np.random.default_rng(0), + ) + + b_imp = Y_imp[3:, :][miss_mask[3:, :]] + # Should fall back to global stats: mean ~ 100 - 1.8*10 = 82. + assert b_imp.mean() > 70.0 + assert b_imp.mean() < 95.0 From eeeaf003776a2241e4c69ed460265322ec8df744 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Levente=20Temesv=C3=A1ri-Nagy?= <147416790+leventetn@users.noreply.github.com> Date: Mon, 4 May 2026 14:39:14 +0200 Subject: [PATCH 2/5] test: extended impute_downshift --- tests/pp/test_imputation.py | 97 ++++++++++++++++++++++++++----------- 1 file changed, 69 insertions(+), 28 deletions(-) diff --git a/tests/pp/test_imputation.py b/tests/pp/test_imputation.py index 9800f08..c896fa4 100644 --- a/tests/pp/test_imputation.py +++ b/tests/pp/test_imputation.py @@ -25,9 +25,9 @@ def _make_log_adata_with_missing( Raw intensities are drawn from a lognormal (the empirical shape of MS1 quantitative intensities) then log2-transformed, yielding a - Gaussian log-intensity distribution with mean ≈ 23 and sd ≈ 2.5 - — values typical for label-free DIA/DDA proteomics. NaN missingness - is injected MCAR. Sized for KS/LRT statistical power. + Gaussian log-intensity distribution with mean ≈ 23 and sd ≈ 2.5. + NaN missingness is injected MCAR. + Sized for KS/LRT statistical power. """ rng = np.random.default_rng(seed) # ln-space params chosen so log2(raw) ~ N(23, 2.5) @@ -52,8 +52,8 @@ def _make_small_log_adata() -> AnnData: X = np.array( [ [10.0, 12.0, n], - [11.0, n, 14.0], - [n, 13.0, 15.0], + [11.0, n, 14.0], + [n, 13.0, 15.0], [12.0, 14.0, n], ], dtype=float, @@ -75,7 +75,7 @@ def _make_grouped_log_adata( Both groups draw raw intensities from a lognormal then log2-transform, matching the Gaussian shape of real proteomics log-intensities. The - medians differ by ~6 log2 units (e.g., enriched vs depleted samples). + medians differ by ~6 log2 units. """ rng = np.random.default_rng(seed) sigma_ln = 1.5 * np.log(2) # log2-sd ≈ 1.5 @@ -104,10 +104,11 @@ def _make_grouped_log_adata( def _make_non_log_adata() -> AnnData: - """Raw-intensity-scale proteomics AnnData (fails the log-transform heuristic). + """Raw-intensity-scale proteomics AnnData. - Lognormal raw intensities, equivalent to the sister log2 fixture but - NOT log-transformed; used to exercise the ``force=False`` log check. + Fails the log-transform heuristic. Lognormal raw intensities, + equivalent to the sister log2 fixture but NOT log-transformed; + used to exercise the ``force=False`` log check. """ rng = np.random.default_rng(2) X = rng.lognormal(mean=23.0 * np.log(2), sigma=2.5 * np.log(2), @@ -161,11 +162,12 @@ def test_imputation_mask_layer_present_and_correct(self): assert mask.shape == X_in.shape np.testing.assert_array_equal(mask, expected_mask) - def test_no_nan_in_output(self): + def test_no_nan_or_zero_in_output(self): adata = _make_small_log_adata() result = impute_downshift(adata, inplace=False) X_out = np.asarray(result.X) assert np.isfinite(X_out).all() + assert (X_out != 0).all() def test_uns_metadata_keys_and_values(self): adata = _make_small_log_adata() @@ -190,14 +192,42 @@ def test_uns_metadata_keys_and_values(self): 100.0 * n_missing_in / adata.X.size, ) + def test_no_missing_values_returns_input_unchanged(self): + """When there is nothing to impute, output equals input and the + mask is all False.""" + rng = np.random.default_rng(0) + raw = rng.lognormal( + mean=23.0 * np.log(2), sigma=2.5 * np.log(2), + size=(20, 30), + ) + X = np.log2(raw) + obs_names = [f"s{i}" for i in range(20)] + var_names = [f"p{i}" for i in range(30)] + obs = pd.DataFrame({"sample_id": obs_names}, index=obs_names) + var = pd.DataFrame({"protein_id": var_names}, index=var_names) + adata = AnnData(X=X, obs=obs, var=var) + X_in = adata.X.copy() + + result = impute_downshift( + adata, zero_to_na=False, inplace=False, + ) + + np.testing.assert_array_equal(np.asarray(result.X), X_in) + mask = np.asarray(result.layers["imputation_mask_X"]) + assert not mask.any() + assert result.uns["imputation"]["n_imputed"] == 0 + assert result.uns["imputation"]["pct_imputed"] == 0.0 + def test_zero_to_na_false_keeps_zeros(self): adata = _make_small_log_adata() # Inject a couple of zeros that should NOT be imputed. adata.X[0, 0] = 0.0 adata.X[3, 0] = 0.0 + # force=True decouples this test from the log-transform heuristic + # — its purpose is zero handling, not log detection. result = impute_downshift( - adata, zero_to_na=False, inplace=False, + adata, zero_to_na=False, force=True, inplace=False, ) X_out = np.asarray(result.X) @@ -292,7 +322,8 @@ def test_imputed_percentiles_match_theoretical_normal(self, p): [(0.3, "smaller"), (2.0, "bigger")], ) def test_imputed_variance_vs_observed(self, width, direction): - """Imputed variance is `(width*sd)^2`; smaller or bigger by user choice.""" + """Imputed variance is `(width*sd)^2`; smaller or bigger + by user choice.""" adata_in = _make_log_adata_with_missing() result = impute_downshift( adata_in.copy(), @@ -336,7 +367,7 @@ def test_kolmogorov_smirnov_one_sample_imputed_matches_theoretical(self): ) observed, imputed = self._split_observed_imputed(adata_in, result) mu_th, sigma_th, _, _ = self._theoretical_params(observed, 1.8, 0.3) - # Fail-to-reject: imputed values look like draws from N(mu_th, sigma_th). + # Fail-to-reject: imputed values look like theoretical normal draws. result_ks = kstest(imputed, "norm", args=(mu_th, sigma_th)) assert result_ks.pvalue > 0.01 @@ -485,17 +516,20 @@ def test_group_by_uses_group_specific_stats(self): assert mean_a < mean_b - 3.0 # groups separated by ~6 in log-space # Sanity: input not mutated. - assert np.array_equal(adata.X, X_in, equal_nan=True) or True + assert np.array_equal(adata.X, X_in, equal_nan=True) def test_group_by_fallback_to_global_when_group_too_small(self): adata = _make_grouped_log_adata( n_per_group=20, n_vars=50, miss_frac=0.25, seed=1, ) - # Build a third group "C" with only 1 observation: forces fallback - # to global stats during imputation for that obs. + # Build a third group "C" with only 1 observation AND fewer than + # 3 finite values across that row — this is what triggers the + # fallback path (`grp_vals.size >= 3` is False). groups = list(adata.obs["group"].astype(object).to_numpy()) groups[0] = "C" adata.obs["group"] = groups + # Keep only 2 finite values in the C-group row; rest become NaN. + adata.X[0, 2:] = np.nan result = impute_downshift( adata, group_by="group", @@ -504,13 +538,18 @@ def test_group_by_fallback_to_global_when_group_too_small(self): ) mask = np.asarray(result.layers["imputation_mask_X"]) X_out = np.asarray(result.X) - # Global median is between groups A and B (~18.5). With downshift - # 1.8 * (global sd ~3.5) ≈ 6.3 → fallback imputes around ~12. + + # With group A median ≈ 18 (sd≈1.5) and group B median ≈ 24 + # (sd≈1.5), the global pool has median ≈ 21 and sd ≈ 3.3, so + # the fallback imputes around 21 - 1.8*3.3 ≈ 15.0. Group-C's + # own 2 finite values (drawn from group A's distribution) would + # have given mean ≈ 18 - small shift → around 17. The interval + # below distinguishes the two paths. c_idx = np.where(adata.obs["group"].to_numpy() == "C")[0] c_imputed = X_out[c_idx, :][mask[c_idx, :]] - # Should land near the global downshifted distribution, not 15-1.8*1.5. - assert c_imputed.mean() < 18.5 - assert c_imputed.mean() > 5.0 + assert c_imputed.size > 40 # most of the row was imputed + assert c_imputed.mean() < 16.5 # fallback (global), not group-A + assert c_imputed.mean() > 12.0 def test_group_by_invalid_column_raises_keyerror(self): adata = _make_small_log_adata() @@ -597,9 +636,9 @@ def test_too_few_finite_values_raises(self): # 2 finite values total, the rest NaN. X = np.array( [ - [10.0, n, n], - [n, 11.0, n], - [n, n, n], + [10.0, n, n], + [n, 11.0, n], + [n, n, n], ], dtype=float, ) @@ -658,9 +697,9 @@ def test_writes_only_at_mask_true(self): Y_imp = np.full((3, 4), -999.0) miss_mask = np.array( [ - [True, False, True, False], + [True, False, True, False], [False, False, False, False], - [False, True, False, True], + [False, True, False, True], ], ) rng = np.random.default_rng(0) @@ -752,10 +791,12 @@ def test_uses_per_group_stats(self): rng_data.normal(10.0, 1.0, size=(n_per, n_vars)), ], ) - # Mark half the cells as missing. + # In the production flow, miss_mask is derived from Y itself + # (`miss_mask = ~np.isfinite(Y)`), so Y has NaN at miss positions + # before _impute_by_group is called. Mirror that here. miss_mask = rng_data.random(size=Y.shape) < 0.5 + Y[miss_mask] = np.nan Y_imp = Y.copy() - Y_imp[miss_mask] = np.nan groups = pd.Series(["A"] * n_per + ["B"] * n_per) _impute_by_group( From 4419280bd4596afe7ff7c3487c367b05546d1e0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Levente=20Temesv=C3=A1ri-Nagy?= <147416790+leventetn@users.noreply.github.com> Date: Fri, 29 May 2026 12:04:46 +0200 Subject: [PATCH 3/5] test: corrected for impute_downshift --- tests/pp/test_imputation.py | 748 +++++++++++++++++------------------- 1 file changed, 342 insertions(+), 406 deletions(-) diff --git a/tests/pp/test_imputation.py b/tests/pp/test_imputation.py index c896fa4..d2c48a6 100644 --- a/tests/pp/test_imputation.py +++ b/tests/pp/test_imputation.py @@ -3,13 +3,9 @@ import pandas as pd from anndata import AnnData from scipy import sparse -from scipy.stats import ks_2samp, kstest, norm, chi2 +from scipy.stats import norm -from proteopy.pp.imputation import ( - impute_downshift, - _impute_rows, - _impute_by_group, -) +from proteopy.pp.imputation import impute_downshift # ── Fixture builders ──────────────────────────────────────────────── @@ -27,7 +23,7 @@ def _make_log_adata_with_missing( MS1 quantitative intensities) then log2-transformed, yielding a Gaussian log-intensity distribution with mean ≈ 23 and sd ≈ 2.5. NaN missingness is injected MCAR. - Sized for KS/LRT statistical power. + Sized for statistical-test power. """ rng = np.random.default_rng(seed) # ln-space params chosen so log2(raw) ~ N(23, 2.5) @@ -71,15 +67,16 @@ def _make_grouped_log_adata( miss_frac: float = 0.25, seed: int = 1, ) -> AnnData: - """Two-group log2 proteomics intensities with distinct medians. + """Two-cell-type log2 proteomics intensities with distinct medians. - Both groups draw raw intensities from a lognormal then log2-transform, - matching the Gaussian shape of real proteomics log-intensities. The - medians differ by ~6 log2 units. + Both cell types draw raw intensities from a lognormal then log2- + transform, matching the Gaussian shape of real proteomics log- + intensities. Median log2 abundances differ by ~6 units (e.g., a + highly-expressing vs. lowly-expressing lineage). """ rng = np.random.default_rng(seed) sigma_ln = 1.5 * np.log(2) # log2-sd ≈ 1.5 - raw_a = rng.lognormal( + raw_t = rng.lognormal( mean=18.0 * np.log(2), sigma=sigma_ln, size=(n_per_group, n_vars), ) @@ -87,16 +84,16 @@ def _make_grouped_log_adata( mean=24.0 * np.log(2), sigma=sigma_ln, size=(n_per_group, n_vars), ) - X = np.vstack([np.log2(raw_a), np.log2(raw_b)]) + X = np.vstack([np.log2(raw_t), np.log2(raw_b)]) miss = rng.random(size=X.shape) < miss_frac X[miss] = np.nan n_obs = 2 * n_per_group obs_names = [f"s{i}" for i in range(n_obs)] - groups = ["A"] * n_per_group + ["B"] * n_per_group + cell_types = ["T_cell"] * n_per_group + ["B_cell"] * n_per_group var_names = [f"p{i}" for i in range(n_vars)] obs = pd.DataFrame( - {"sample_id": obs_names, "group": groups}, + {"sample_id": obs_names, "cell_type": cell_types}, index=obs_names, ) var = pd.DataFrame({"protein_id": var_names}, index=var_names) @@ -111,8 +108,10 @@ def _make_non_log_adata() -> AnnData: used to exercise the ``force=False`` log check. """ rng = np.random.default_rng(2) - X = rng.lognormal(mean=23.0 * np.log(2), sigma=2.5 * np.log(2), - size=(20, 30)) + X = rng.lognormal( + mean=23.0 * np.log(2), sigma=2.5 * np.log(2), + size=(20, 30), + ) miss = rng.random(size=X.shape) < 0.2 X[miss] = np.nan @@ -123,20 +122,159 @@ def _make_non_log_adata() -> AnnData: return AnnData(X=X, obs=obs, var=var) +# ── Statistical comparison helpers ────────────────────────────────── +# +# All four helpers below take a 1-D ``observed`` and 1-D ``imputed`` +# array (the imputed values pulled from ``result.X`` at mask=True +# positions, and the kept observed values at mask=False positions). +# They are reused for both global (section B) and per-group (section +# D) checks so the same statistical contract is enforced everywhere. + +# Posterior-probability threshold for the "same distribution" Bayes +# factor. >0.95 corresponds to "very strong" evidence under the +# uniform-prior interpretation of Kass & Raftery (1995). +BF_PH1_THRESHOLD = 0.95 + + +def _theoretical_downshift_params(observed, downshift, width): + """Return (mu_th, sigma_th) for the documented downshifted normal.""" + med = float(np.median(observed)) + sd = float(np.std(observed)) + return med - downshift * sd, width * sd + + +def _draw_reference(mu, sigma, n, seed=100): + """Draw a fixed-seed reference sample from N(mu, sigma).""" + return np.random.default_rng(seed).normal( + loc=mu, scale=sigma, size=n, + ) + + +def compare_relative_means(observed, imputed, downshift, width): + """Assert imputed mean sits below observed mean and matches theory.""" + mu_th, sigma_th = _theoretical_downshift_params( + observed, downshift, width, + ) + assert imputed.mean() < observed.mean() + np.testing.assert_allclose( + imputed.mean(), mu_th, atol=0.1 * sigma_th, + ) + + +def compare_quantiles(observed, imputed, qs=(0.25, 0.5, 0.75)): + """Assert each imputed quantile falls below the observed quantile.""" + for q in qs: + q_obs = float(np.quantile(observed, q)) + q_imp = float(np.quantile(imputed, q)) + assert q_imp < q_obs, ( + f"q={q}: imputed {q_imp:.3f} ≥ observed {q_obs:.3f}" + ) + + +def compare_percentiles( + observed, imputed, downshift, width, + ps=(5, 25, 50, 75, 95), +): + """Assert imputed percentiles match the theoretical downshifted normal.""" + mu_th, sigma_th = _theoretical_downshift_params( + observed, downshift, width, + ) + # 0.3 * sigma_th ≈ 2 standard errors of the sample 5th-percentile + # for the smaller per-group samples (n ≈ 250); much looser than + # needed at the global scale (n ≈ 4000). One threshold for both + # callers — generous enough to absorb RNG drift, tight enough + # that a sign flip or width misuse still breaks the test. + for p in ps: + expected = float( + norm.ppf(p / 100.0, loc=mu_th, scale=sigma_th) + ) + actual = float(np.percentile(imputed, p)) + np.testing.assert_allclose( + actual, expected, atol=0.3 * sigma_th, + err_msg=f"p={p}: expected {expected:.3f}, got {actual:.3f}", + ) + + +def bayes_factor_same_norm_distr(x, y): + """Bayes factor for "same normal distribution" (H1) vs "different" (H0). + + Uses a BIC approximation: + - H1 (same): one normal fit to concatenated data, 2 parameters. + - H0 (different): separate normal fits to x and y, 4 parameters. + + Returns + ------- + bf_10 : float + Bayes factor in favour of H1 (same distribution). + p_h1 : float + Posterior probability of H1 under a uniform prior on {H0, H1}, + i.e. ``bf_10 / (1 + bf_10)``. + """ + x = np.asarray(x).ravel() + y = np.asarray(y).ravel() + combined = np.concatenate([x, y]) + n = combined.size + + ll_same = norm.logpdf( + combined, + loc=combined.mean(), + scale=combined.std(ddof=0), + ).sum() + ll_diff = ( + norm.logpdf(x, loc=x.mean(), scale=x.std(ddof=0)).sum() + + norm.logpdf(y, loc=y.mean(), scale=y.std(ddof=0)).sum() + ) + # ΔBIC/2 = ln(n) + (ll_same - ll_diff); BF_10 ≈ exp(ΔBIC/2). + log_bf_10 = np.log(n) + (ll_same - ll_diff) + bf_10 = float(np.exp(log_bf_10)) + p_h1 = bf_10 / (1.0 + bf_10) + return bf_10, float(p_h1) + + +def _split_observed_imputed(adata_in, result): + """Return (observed, imputed) 1-D arrays from input + result.""" + X_in = np.asarray(adata_in.X) + X_out = np.asarray(result.X) + mask = np.asarray(result.layers["imputation_mask_X"]) + observed = X_out[~mask] + imputed = X_out[mask] + # sanity: observed values are bit-identical to the input + np.testing.assert_array_equal(observed, X_in[~mask]) + return observed, imputed + + # ──────────────────────────────────────────────────────────────────── class TestImputeDownshift: - """Tests for ``impute_downshift``.""" + """Tests for ``impute_downshift``. + + Organised by contract: + + - **A.** Value preservation, mask correctness, and ``uns`` + metadata. + - **B.** Statistical shape of the imputed values against the + documented downshifted normal — relative mean, quantile, and + percentile checks plus a Bayes-factor test that imputed values + are indistinguishable from draws of the theoretical normal. + - **C.** ``inplace``/copy semantics and RNG reproducibility. + - **D.** ``group_by`` behaviour, reusing the section-B statistical + helpers per group plus tests for the global-stats fallback. + - **E.** Input validation and verbose output. + + Random draws are seeded (``random_state=42``) so the statistical + assertions are deterministic. Thresholds carry ≥10× margin to + absorb numpy RNG drift. + """ # ── A. Existing values & metadata invariants ──────────────────── @pytest.mark.parametrize("inplace", [True, False]) def test_observed_values_preserved(self, inplace): - """Non-missing entries are bit-identical after imputation.""" + """Non-NaN entries are bit-identical after imputation.""" adata = _make_small_log_adata() X_in = adata.X.copy() - finite_mask_in = np.isfinite(X_in) & (X_in != 0) + finite_mask_in = np.isfinite(X_in) result = impute_downshift(adata, inplace=inplace) target = adata if inplace else result @@ -153,7 +291,7 @@ def test_observed_values_preserved(self, inplace): def test_imputation_mask_layer_present_and_correct(self): adata = _make_small_log_adata() X_in = adata.X.copy() - expected_mask = ~np.isfinite(X_in) | (X_in == 0) + expected_mask = ~np.isfinite(X_in) result = impute_downshift(adata, inplace=False) mask = np.asarray(result.layers["imputation_mask_X"]) @@ -162,16 +300,14 @@ def test_imputation_mask_layer_present_and_correct(self): assert mask.shape == X_in.shape np.testing.assert_array_equal(mask, expected_mask) - def test_no_nan_or_zero_in_output(self): + def test_no_nan_in_output(self): adata = _make_small_log_adata() result = impute_downshift(adata, inplace=False) - X_out = np.asarray(result.X) - assert np.isfinite(X_out).all() - assert (X_out != 0).all() + assert np.isfinite(np.asarray(result.X)).all() def test_uns_metadata_keys_and_values(self): adata = _make_small_log_adata() - n_missing_in = int((~np.isfinite(adata.X) | (adata.X == 0)).sum()) + n_missing_in = int((~np.isfinite(adata.X)).sum()) result = impute_downshift( adata, @@ -193,8 +329,7 @@ def test_uns_metadata_keys_and_values(self): ) def test_no_missing_values_returns_input_unchanged(self): - """When there is nothing to impute, output equals input and the - mask is all False.""" + """No NaN in input → output equals input and the mask is all False.""" rng = np.random.default_rng(0) raw = rng.lognormal( mean=23.0 * np.log(2), sigma=2.5 * np.log(2), @@ -208,9 +343,7 @@ def test_no_missing_values_returns_input_unchanged(self): adata = AnnData(X=X, obs=obs, var=var) X_in = adata.X.copy() - result = impute_downshift( - adata, zero_to_na=False, inplace=False, - ) + result = impute_downshift(adata, inplace=False) np.testing.assert_array_equal(np.asarray(result.X), X_in) mask = np.asarray(result.layers["imputation_mask_X"]) @@ -218,17 +351,14 @@ def test_no_missing_values_returns_input_unchanged(self): assert result.uns["imputation"]["n_imputed"] == 0 assert result.uns["imputation"]["pct_imputed"] == 0.0 - def test_zero_to_na_false_keeps_zeros(self): + def test_zeros_in_input_are_preserved_by_default(self): + """Default ``zero_to_na=False``: zeros are observations, kept as-is.""" adata = _make_small_log_adata() - # Inject a couple of zeros that should NOT be imputed. + # Inject zeros at observed positions; they must NOT be imputed. adata.X[0, 0] = 0.0 adata.X[3, 0] = 0.0 - # force=True decouples this test from the log-transform heuristic - # — its purpose is zero handling, not log detection. - result = impute_downshift( - adata, zero_to_na=False, force=True, inplace=False, - ) + result = impute_downshift(adata, inplace=False) X_out = np.asarray(result.X) assert X_out[0, 0] == 0.0 @@ -237,173 +367,76 @@ def test_zero_to_na_false_keeps_zeros(self): assert not mask[0, 0] assert not mask[3, 0] - # ── B. Statistical shape of imputed values ────────────────────── - # - # Tests in this section verify that the imputed values follow the - # documented Perseus-style downshifted Gaussian: - # N(median(observed) - downshift*sd(observed), - # (width*sd(observed))^2). - # All draws use random_state=42 and a fixed data seed (0). Tolerances - # are expressed relative to the theoretical sigma so they survive - # plausible drift in numpy's RNG. Every threshold has >=10× margin - # over the value seen at these seeds. - - @staticmethod - def _split_observed_imputed(adata_in, result): - """Return (observed, imputed) value arrays from input + result.""" - X_in = np.asarray(adata_in.X) - X_out = np.asarray(result.X) - mask = np.asarray(result.layers["imputation_mask_X"]) - observed = X_out[~mask] - imputed = X_out[mask] - # sanity: observed values match input at observed positions - np.testing.assert_array_equal(observed, X_in[~mask]) - return observed, imputed - - @staticmethod - def _theoretical_params(observed, downshift, width): - med = float(np.median(observed)) - sd = float(np.std(observed)) - return med - downshift * sd, width * sd, med, sd - - def test_imputed_mean_is_smaller_than_observed_mean(self): - adata_in = _make_log_adata_with_missing() - result = impute_downshift( - adata_in.copy(), - downshift=1.8, width=0.3, - random_state=42, inplace=False, - ) - observed, imputed = self._split_observed_imputed(adata_in, result) - # downshift=1.8 SDs leftward → imputed mean clearly below observed - _, _, _, sd = self._theoretical_params(observed, 1.8, 0.3) - assert imputed.mean() < observed.mean() - sd + def test_zero_to_na_true_treats_zeros_as_missing(self): + """Opt-in ``zero_to_na=True``: zeros are converted to NaN + and imputed.""" + adata = _make_small_log_adata() + adata.X[0, 0] = 0.0 + adata.X[3, 0] = 0.0 - def test_imputed_mean_matches_theoretical(self): - adata_in = _make_log_adata_with_missing() result = impute_downshift( - adata_in.copy(), - downshift=1.8, width=0.3, - random_state=42, inplace=False, - ) - observed, imputed = self._split_observed_imputed(adata_in, result) - mu_th, sigma_th, _, _ = self._theoretical_params(observed, 1.8, 0.3) - np.testing.assert_allclose( - imputed.mean(), mu_th, atol=0.1 * sigma_th, + adata, zero_to_na=True, inplace=False, ) - @pytest.mark.parametrize("q", [0.25, 0.5, 0.75]) - def test_imputed_quantile_below_observed_quantile(self, q): - adata_in = _make_log_adata_with_missing() - result = impute_downshift( - adata_in.copy(), - downshift=1.8, width=0.3, - random_state=42, inplace=False, - ) - observed, imputed = self._split_observed_imputed(adata_in, result) - assert np.quantile(imputed, q) < np.quantile(observed, q) + mask = np.asarray(result.layers["imputation_mask_X"]) + assert mask[0, 0] + assert mask[3, 0] + X_out = np.asarray(result.X) + assert X_out[0, 0] != 0.0 + assert X_out[3, 0] != 0.0 - @pytest.mark.parametrize("p", [5, 25, 50, 75, 95]) - def test_imputed_percentiles_match_theoretical_normal(self, p): - adata_in = _make_log_adata_with_missing() - result = impute_downshift( - adata_in.copy(), - downshift=1.8, width=0.3, - random_state=42, inplace=False, - ) - observed, imputed = self._split_observed_imputed(adata_in, result) - mu_th, sigma_th, _, _ = self._theoretical_params(observed, 1.8, 0.3) - expected = norm.ppf(p / 100.0, loc=mu_th, scale=sigma_th) - np.testing.assert_allclose( - np.percentile(imputed, p), expected, atol=0.15 * sigma_th, - ) + # ── B. Statistical shape of imputed values ────────────────────── + # + # The primary fixture has ~4000 imputed positions. + # Defaults: downshift=1.8, width=0.3, random_state=42. - @pytest.mark.parametrize( - "width,direction", - [(0.3, "smaller"), (2.0, "bigger")], - ) - def test_imputed_variance_vs_observed(self, width, direction): - """Imputed variance is `(width*sd)^2`; smaller or bigger - by user choice.""" + def test_imputed_mean_is_shifted_and_matches_theoretical(self): adata_in = _make_log_adata_with_missing() result = impute_downshift( adata_in.copy(), - downshift=1.8, width=width, + downshift=1.8, width=0.3, random_state=42, inplace=False, ) - observed, imputed = self._split_observed_imputed(adata_in, result) - _, sigma_th, _, _ = self._theoretical_params(observed, 1.8, width) + observed, imputed = _split_observed_imputed(adata_in, result) + compare_relative_means(observed, imputed, 1.8, 0.3) - if direction == "smaller": - assert imputed.var() < observed.var() - else: - assert imputed.var() > observed.var() - - # In both directions, the empirical std matches width*sd. - np.testing.assert_allclose( - imputed.std(), sigma_th, rtol=0.1, - ) - - def test_kolmogorov_smirnov_two_sample_observed_vs_imputed_rejects(self): - """Imputed and observed distributions are clearly different.""" + def test_imputed_quantiles_are_below_observed(self): adata_in = _make_log_adata_with_missing() result = impute_downshift( adata_in.copy(), downshift=1.8, width=0.3, random_state=42, inplace=False, ) - observed, imputed = self._split_observed_imputed(adata_in, result) - stat, pvalue = ks_2samp(observed, imputed) - # Massive shift + narrow imputed → KS pvalue effectively 0. - assert pvalue < 1e-10 - assert stat > 0.5 - - def test_kolmogorov_smirnov_one_sample_imputed_matches_theoretical(self): - """Imputed values are consistent with the theoretical normal.""" + observed, imputed = _split_observed_imputed(adata_in, result) + compare_quantiles(observed, imputed) + + def test_imputed_percentiles_match_theoretical_normal(self): adata_in = _make_log_adata_with_missing() result = impute_downshift( adata_in.copy(), downshift=1.8, width=0.3, random_state=42, inplace=False, ) - observed, imputed = self._split_observed_imputed(adata_in, result) - mu_th, sigma_th, _, _ = self._theoretical_params(observed, 1.8, 0.3) - # Fail-to-reject: imputed values look like theoretical normal draws. - result_ks = kstest(imputed, "norm", args=(mu_th, sigma_th)) - assert result_ks.pvalue > 0.01 - - def test_likelihood_ratio_test_favors_downshifted_model(self): - """LRT-style log-likelihood comparison strongly prefers the - downshifted-normal model over the observed-normal model.""" + observed, imputed = _split_observed_imputed(adata_in, result) + compare_percentiles(observed, imputed, 1.8, 0.3) + + def test_imputed_matches_theoretical_via_bayes_factor(self): + """Imputed values are indistinguishable from theoretical draws.""" adata_in = _make_log_adata_with_missing() result = impute_downshift( adata_in.copy(), downshift=1.8, width=0.3, random_state=42, inplace=False, ) - observed, imputed = self._split_observed_imputed(adata_in, result) - mu_th, sigma_th, mu_obs, sd_obs = self._theoretical_params( + observed, imputed = _split_observed_imputed(adata_in, result) + mu_th, sigma_th = _theoretical_downshift_params( observed, 1.8, 0.3, ) + reference = _draw_reference(mu_th, sigma_th, imputed.size) + _, p_h1 = bayes_factor_same_norm_distr(imputed, reference) + assert p_h1 > BF_PH1_THRESHOLD - # Direct log-likelihood comparison (non-nested models). - ll_obs = norm.logpdf(imputed, loc=mu_obs, scale=sd_obs).sum() - ll_th = norm.logpdf(imputed, loc=mu_th, scale=sigma_th).sum() - D = 2.0 * (ll_th - ll_obs) - # 1.8-SD shift on ~4000 values → D in the thousands. Threshold is - # ~30× lower than the actual value to absorb RNG drift. - assert D > 100 - - # Nested LRT: H0 = N(mu_obs, sd_obs) vs H1 = N(mu_hat, sigma_hat). - mu_hat = float(imputed.mean()) - sigma_hat = float(imputed.std(ddof=0)) - ll_full = norm.logpdf(imputed, loc=mu_hat, scale=sigma_hat).sum() - lrt = 2.0 * (ll_full - ll_obs) - # df=2 (mu, sigma both freed). Under H0, lrt ~ chi^2(2). The true - # generating distribution differs strongly → reject H0. - p = 1.0 - chi2.cdf(lrt, df=2) - assert p < 1e-10 - - # ── C. Sparse/dense + inplace/copy semantics ──────────────────── + # ── C. inplace/copy semantics ─────────────────────────────────── def test_inplace_true_returns_none_and_mutates(self): adata = _make_small_log_adata() @@ -415,8 +448,7 @@ def test_inplace_true_returns_none_and_mutates(self): X_out = np.asarray(adata.X) assert np.isfinite(X_out).all() assert "imputation_mask_X" in adata.layers - # Values at originally observed positions are still the same. - finite_mask = np.isfinite(X_in) & (X_in != 0) + finite_mask = np.isfinite(X_in) np.testing.assert_array_equal( X_out[finite_mask], X_in[finite_mask], ) @@ -428,18 +460,17 @@ def test_inplace_false_returns_copy_and_preserves_original(self): result = impute_downshift(adata, inplace=False) assert result is not adata - # Original .X bit-identical (NaNs included). assert np.array_equal( adata.X, X_in_snapshot, equal_nan=True, ) assert "imputation_mask_X" not in adata.layers assert "imputation_mask_X" in result.layers + @pytest.mark.skip( + reason="sparse handling not tested per project policy", + ) def test_sparse_input_yields_sparse_output(self): adata = _make_small_log_adata() - # csr_matrix can't store NaN reliably across versions for this - # contract; build sparse from a matrix where the missingness is - # only zeros (impute_downshift treats zeros as missing by default). X_dense = np.array( [ [10.0, 12.0, 0.0], @@ -449,12 +480,13 @@ def test_sparse_input_yields_sparse_output(self): ], ) adata.X = sparse.csr_matrix(X_dense) - result = impute_downshift(adata, inplace=False) - assert sparse.issparse(result.X) assert isinstance(result.X, sparse.csr_matrix) + @pytest.mark.skip( + reason="sparse handling not tested per project policy", + ) def test_dense_input_yields_dense_output(self): adata = _make_small_log_adata() result = impute_downshift(adata, inplace=False) @@ -467,7 +499,9 @@ def test_random_state_reproducibility(self): r1 = impute_downshift(adata1, random_state=42, inplace=False) r2 = impute_downshift(adata2, random_state=42, inplace=False) - np.testing.assert_array_equal(np.asarray(r1.X), np.asarray(r2.X)) + np.testing.assert_array_equal( + np.asarray(r1.X), np.asarray(r2.X), + ) def test_random_state_none_is_nondeterministic(self): adata1 = _make_log_adata_with_missing() @@ -477,43 +511,58 @@ def test_random_state_none_is_nondeterministic(self): r2 = impute_downshift(adata2, random_state=None, inplace=False) mask = np.asarray(r1.layers["imputation_mask_X"]) - # Imputed positions differ across runs. assert not np.array_equal( np.asarray(r1.X)[mask], np.asarray(r2.X)[mask], ) # ── D. group_by behavior ──────────────────────────────────────── - def test_group_by_uses_group_specific_stats(self): + def test_group_by_imputed_distribution_per_group_matches_theoretical( + self, + ): + """Within each group, imputed values match that group's + theoretical downshifted normal (mean, quantiles, percentiles, + Bayes-factor agreement).""" adata = _make_grouped_log_adata() X_in = adata.X.copy() result = impute_downshift( - adata, group_by="group", + adata, group_by="cell_type", downshift=1.8, width=0.3, random_state=42, inplace=False, ) - mask = np.asarray(result.layers["imputation_mask_X"]) X_out = np.asarray(result.X) - groups = result.obs["group"].to_numpy() + cell_types = result.obs["cell_type"].to_numpy() - # Each group's imputed mean should track its own (downshifted) median. - for label, expected_loc in (("A", 18.0), ("B", 24.0)): - row_idx = np.where(groups == label)[0] + per_group_means = {} + for label in ("T_cell", "B_cell"): + row_idx = np.where(cell_types == label)[0] grp_mask = mask[row_idx, :] + grp_observed = X_out[row_idx, :][~grp_mask] grp_imputed = X_out[row_idx, :][grp_mask] - # Imputed values for group B are clearly above those for group A - # iff group-specific stats were used. - assert grp_imputed.mean() < expected_loc - assert grp_imputed.mean() > expected_loc - 5.0 - - # Group A's imputed mean must be lower than group B's. - idx_a = np.where(groups == "A")[0] - idx_b = np.where(groups == "B")[0] - mean_a = X_out[idx_a, :][mask[idx_a, :]].mean() - mean_b = X_out[idx_b, :][mask[idx_b, :]].mean() - assert mean_a < mean_b - 3.0 # groups separated by ~6 in log-space + + compare_relative_means(grp_observed, grp_imputed, 1.8, 0.3) + compare_quantiles(grp_observed, grp_imputed) + compare_percentiles(grp_observed, grp_imputed, 1.8, 0.3) + + mu_th, sigma_th = _theoretical_downshift_params( + grp_observed, 1.8, 0.3, + ) + reference = _draw_reference( + mu_th, sigma_th, grp_imputed.size, + ) + _, p_h1 = bayes_factor_same_norm_distr( + grp_imputed, reference, + ) + assert p_h1 > BF_PH1_THRESHOLD, ( + f"{label}: BF p_h1={p_h1:.3f} below threshold" + ) + + per_group_means[label] = grp_imputed.mean() + + # The lower-median group's imputed mean is below the higher one's. + assert per_group_means["T_cell"] < per_group_means["B_cell"] # Sanity: input not mutated. assert np.array_equal(adata.X, X_in, equal_nan=True) @@ -522,34 +571,94 @@ def test_group_by_fallback_to_global_when_group_too_small(self): adata = _make_grouped_log_adata( n_per_group=20, n_vars=50, miss_frac=0.25, seed=1, ) - # Build a third group "C" with only 1 observation AND fewer than - # 3 finite values across that row — this is what triggers the - # fallback path (`grp_vals.size >= 3` is False). - groups = list(adata.obs["group"].astype(object).to_numpy()) - groups[0] = "C" - adata.obs["group"] = groups - # Keep only 2 finite values in the C-group row; rest become NaN. - adata.X[0, 2:] = np.nan + # Add a third cell type "monocyte" with only 1 observation + # AND fewer than 3 finite values, which is what triggers the + # global-stats fallback (`grp_vals.size >= 3` is False). + cell_types = list( + adata.obs["cell_type"].astype(object).to_numpy() + ) + cell_types[0] = "monocyte" + adata.obs["cell_type"] = cell_types + adata.X[0, 2:] = np.nan # leave 2 finite values in that row + + result = impute_downshift( + adata, group_by="cell_type", + downshift=1.8, width=0.3, + random_state=42, inplace=False, + ) + mask = np.asarray(result.layers["imputation_mask_X"]) + X_out = np.asarray(result.X) + + # Group statistics for this fixture: + # T_cell: median ≈ 18, sd ≈ 1.5 → downshifted ≈ 15.3 + # B_cell: median ≈ 24, sd ≈ 1.5 → downshifted ≈ 21.3 + # global: median ≈ 21, sd ≈ 3.3 → downshifted ≈ 15.0 + # monocyte (2 finite values from T_cell range) ≈ 18 → ~17 + # + # The two assertions below encode the fallback: + # + # UPPER bound: the global downshifted mean (~15) must sit BELOW + # this threshold AND BELOW what the monocyte's own per-group + # median would have produced (~17). Satisfying it rules out the + # per-group path. + UPPER = 16.5 + # LOWER bound: the global downshifted mean (~15) must sit ABOVE + # this threshold. Sanity floor so a runaway draw can't pass. + LOWER = 12.0 + + m_idx = np.where( + adata.obs["cell_type"].to_numpy() == "monocyte", + )[0] + m_imputed = X_out[m_idx, :][mask[m_idx, :]] + assert m_imputed.size > 40 # most of the row was imputed + assert m_imputed.mean() < UPPER + assert m_imputed.mean() > LOWER + + def test_group_by_fallback_when_group_sd_zero(self): + """A group whose finite values are identical (sd=0) falls back + to global stats.""" + rng = np.random.default_rng(0) + n_vars = 60 + # T_cell: 5 obs with normal variation (good per-group stats). + X_t = np.log2(rng.lognormal( + mean=20.0 * np.log(2), sigma=1.5 * np.log(2), + size=(5, n_vars), + )) + # B_cell: 4 obs with a constant finite value (sd=0). Inject a + # column of NaN so there are positions to impute. + X_b = np.full((4, n_vars), 23.0) + X_b[:, 0] = np.nan + X = np.vstack([X_t, X_b]) + + obs_names = [f"s{i}" for i in range(9)] + cell_types = ["T_cell"] * 5 + ["B_cell"] * 4 + obs = pd.DataFrame( + {"sample_id": obs_names, "cell_type": cell_types}, + index=obs_names, + ) + var_names = [f"p{i}" for i in range(n_vars)] + var = pd.DataFrame( + {"protein_id": var_names}, index=var_names, + ) + adata = AnnData(X=X, obs=obs, var=var) result = impute_downshift( - adata, group_by="group", + adata, group_by="cell_type", downshift=1.8, width=0.3, random_state=42, inplace=False, ) mask = np.asarray(result.layers["imputation_mask_X"]) X_out = np.asarray(result.X) - # With group A median ≈ 18 (sd≈1.5) and group B median ≈ 24 - # (sd≈1.5), the global pool has median ≈ 21 and sd ≈ 3.3, so - # the fallback imputes around 21 - 1.8*3.3 ≈ 15.0. Group-C's - # own 2 finite values (drawn from group A's distribution) would - # have given mean ≈ 18 - small shift → around 17. The interval - # below distinguishes the two paths. - c_idx = np.where(adata.obs["group"].to_numpy() == "C")[0] - c_imputed = X_out[c_idx, :][mask[c_idx, :]] - assert c_imputed.size > 40 # most of the row was imputed - assert c_imputed.mean() < 16.5 # fallback (global), not group-A - assert c_imputed.mean() > 12.0 + b_idx = np.where( + adata.obs["cell_type"].to_numpy() == "B_cell", + )[0] + b_imputed = X_out[b_idx, :][mask[b_idx, :]] + # If the per-group (sd=0) path had been used, all imputed + # values would collapse to the same constant. Real fallback + # → global stats → non-zero empirical spread. + assert np.isfinite(b_imputed).all() + assert b_imputed.std() > 0.01 def test_group_by_invalid_column_raises_keyerror(self): adata = _make_small_log_adata() @@ -561,9 +670,9 @@ def test_group_by_invalid_column_raises_keyerror(self): def test_group_by_records_in_uns(self): adata = _make_grouped_log_adata() result = impute_downshift( - adata, group_by="group", inplace=False, + adata, group_by="cell_type", inplace=False, ) - assert result.uns["imputation"]["group_by"] == "group" + assert result.uns["imputation"]["group_by"] == "cell_type" # ── E. Validation / errors ────────────────────────────────────── @@ -633,7 +742,6 @@ def test_non_log_data_with_force_succeeds(self): def test_too_few_finite_values_raises(self): n = np.nan - # 2 finite values total, the rest NaN. X = np.array( [ [10.0, n, n], @@ -667,200 +775,28 @@ def test_zero_variance_raises(self): {"protein_id": var_names}, index=var_names, ) adata = AnnData(X=X, obs=obs, var=var) - # Inject a single NaN so there is at least something to impute. adata.X[0, 0] = np.nan with pytest.raises( ValueError, match=r"standard deviation", ): impute_downshift(adata, force=True, inplace=False) - def test_verbose_prints_stats(self, capsys): + def test_verbose_prints_correct_counts_and_percentages(self, capsys): + """Verbose output reports the actual measured/imputed counts.""" adata = _make_small_log_adata() + n_total = adata.X.size + n_missing = int((~np.isfinite(adata.X)).sum()) + n_measured = n_total - n_missing + impute_downshift(adata, verbose=True, inplace=True) out = capsys.readouterr().out - assert "Measured" in out - assert "Imputed" in out + + assert f"Measured: {n_measured:,} values" in out + assert f"Imputed: {n_missing:,} values" in out + assert f"({100 * n_measured / n_total:.1f}%)" in out + assert f"({100 * n_missing / n_total:.1f}%)" in out def test_verbose_false_prints_nothing(self, capsys): adata = _make_small_log_adata() impute_downshift(adata, verbose=False, inplace=True) assert capsys.readouterr().out == "" - - -# ──────────────────────────────────────────────────────────────────── - - -class TestImputeRowsHelper: - """Tests for the private helper ``_impute_rows``.""" - - def test_writes_only_at_mask_true(self): - Y_imp = np.full((3, 4), -999.0) - miss_mask = np.array( - [ - [True, False, True, False], - [False, False, False, False], - [False, True, False, True], - ], - ) - rng = np.random.default_rng(0) - - _impute_rows( - Y_imp, miss_mask, range(3), - median=10.0, sd=1.0, - downshift=1.8, width=0.3, rng=rng, - ) - - # Untouched cells remain at the sentinel value. - assert (Y_imp[~miss_mask] == -999.0).all() - # Touched cells are finite (sampled from a normal). - assert np.isfinite(Y_imp[miss_mask]).all() - assert (Y_imp[miss_mask] != -999.0).all() - - def test_uses_provided_rng(self): - median, sd = 10.0, 1.0 - downshift, width = 1.8, 0.3 - # Two row × three col matrix; all cells missing. - miss_mask = np.ones((2, 3), dtype=bool) - Y_imp_a = np.zeros((2, 3)) - Y_imp_b = np.zeros((2, 3)) - - _impute_rows( - Y_imp_a, miss_mask, range(2), - median=median, sd=sd, - downshift=downshift, width=width, - rng=np.random.default_rng(7), - ) - _impute_rows( - Y_imp_b, miss_mask, range(2), - median=median, sd=sd, - downshift=downshift, width=width, - rng=np.random.default_rng(7), - ) - - np.testing.assert_array_equal(Y_imp_a, Y_imp_b) - # And different seed → different output. - Y_imp_c = np.zeros((2, 3)) - _impute_rows( - Y_imp_c, miss_mask, range(2), - median=median, sd=sd, - downshift=downshift, width=width, - rng=np.random.default_rng(8), - ) - assert not np.array_equal(Y_imp_a, Y_imp_c) - - def test_skips_rows_with_no_missing(self): - Y_imp = np.array( - [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], - ) - miss_mask = np.zeros_like(Y_imp, dtype=bool) - original = Y_imp.copy() - - _impute_rows( - Y_imp, miss_mask, range(2), - median=10.0, sd=1.0, - downshift=1.8, width=0.3, - rng=np.random.default_rng(0), - ) - - np.testing.assert_array_equal(Y_imp, original) - - def test_clamps_scale_minimum(self): - """`width=0`/`sd=0` is clamped internally to avoid scale=0 errors.""" - Y_imp = np.zeros((2, 2)) - miss_mask = np.ones_like(Y_imp, dtype=bool) - _impute_rows( - Y_imp, miss_mask, range(2), - median=5.0, sd=0.0, - downshift=1.8, width=0.0, - rng=np.random.default_rng(0), - ) - assert np.isfinite(Y_imp).all() - - -class TestImputeByGroupHelper: - """Tests for the private helper ``_impute_by_group``.""" - - def test_uses_per_group_stats(self): - rng_data = np.random.default_rng(0) - # 12 obs split into two groups with very different medians. - n_per = 6 - n_vars = 30 - Y = np.vstack( - [ - rng_data.normal(0.0, 1.0, size=(n_per, n_vars)), - rng_data.normal(10.0, 1.0, size=(n_per, n_vars)), - ], - ) - # In the production flow, miss_mask is derived from Y itself - # (`miss_mask = ~np.isfinite(Y)`), so Y has NaN at miss positions - # before _impute_by_group is called. Mirror that here. - miss_mask = rng_data.random(size=Y.shape) < 0.5 - Y[miss_mask] = np.nan - Y_imp = Y.copy() - groups = pd.Series(["A"] * n_per + ["B"] * n_per) - - _impute_by_group( - Y, Y_imp, miss_mask, groups, - g_median=5.0, g_sd=5.5, - downshift=1.8, width=0.3, - rng=np.random.default_rng(0), - ) - - a_imp = Y_imp[:n_per][miss_mask[:n_per]] - b_imp = Y_imp[n_per:][miss_mask[n_per:]] - # Group A's imputed values cluster well below group B's. - assert a_imp.mean() < b_imp.mean() - 3.0 - - def test_falls_back_when_group_too_small(self): - """Group with <3 finite values uses the supplied global stats.""" - # 4 obs: groups A (3 obs, plenty of data) and C (1 obs, only 1 finite). - n_vars = 50 - rng_data = np.random.default_rng(1) - Y_A = rng_data.normal(0.0, 1.0, size=(3, n_vars)) - Y_C = np.full((1, n_vars), np.nan) - Y_C[0, 0] = 50.0 # one finite value far from group A - Y = np.vstack([Y_A, Y_C]) - - miss_mask = ~np.isfinite(Y) - Y_imp = Y.copy() - groups = pd.Series(["A", "A", "A", "C"]) - - # Global stats provided (g_median=100, g_sd=10) deliberately far - # from any group's data so we can detect which one was used. - _impute_by_group( - Y, Y_imp, miss_mask, groups, - g_median=100.0, g_sd=10.0, - downshift=1.8, width=0.3, - rng=np.random.default_rng(0), - ) - - c_imp = Y_imp[3, :][miss_mask[3, :]] - # Fallback → centred near 100 - 1.8*10 = 82, not near group A's mean. - assert c_imp.mean() > 70.0 - assert c_imp.mean() < 95.0 - - def test_falls_back_when_group_sd_zero(self): - """A group whose finite values are all identical (sd=0) falls back.""" - n_vars = 40 - rng_data = np.random.default_rng(2) - Y_A = rng_data.normal(0.0, 1.0, size=(3, n_vars)) - # Group B: many finite values but all identical → sd=0. - Y_B = np.full((4, n_vars), 5.0) - Y_B[0, 0] = np.nan # at least one missing to impute - Y = np.vstack([Y_A, Y_B]) - - miss_mask = ~np.isfinite(Y) - Y_imp = Y.copy() - groups = pd.Series(["A"] * 3 + ["B"] * 4) - - _impute_by_group( - Y, Y_imp, miss_mask, groups, - g_median=100.0, g_sd=10.0, - downshift=1.8, width=0.3, - rng=np.random.default_rng(0), - ) - - b_imp = Y_imp[3:, :][miss_mask[3:, :]] - # Should fall back to global stats: mean ~ 100 - 1.8*10 = 82. - assert b_imp.mean() > 70.0 - assert b_imp.mean() < 95.0 From 80e9d127af3e6bd8b506c5a3fcef4638dedea56f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Levente=20Temesv=C3=A1ri-Nagy?= <147416790+leventetn@users.noreply.github.com> Date: Fri, 29 May 2026 12:20:04 +0200 Subject: [PATCH 4/5] test: pre-commit corrections impute_downshift --- tests/pp/test_imputation.py | 187 ++++++++++++++++++++++++------------ 1 file changed, 124 insertions(+), 63 deletions(-) diff --git a/tests/pp/test_imputation.py b/tests/pp/test_imputation.py index d2c48a6..1f14fe6 100644 --- a/tests/pp/test_imputation.py +++ b/tests/pp/test_imputation.py @@ -77,11 +77,13 @@ def _make_grouped_log_adata( rng = np.random.default_rng(seed) sigma_ln = 1.5 * np.log(2) # log2-sd ≈ 1.5 raw_t = rng.lognormal( - mean=18.0 * np.log(2), sigma=sigma_ln, + mean=18.0 * np.log(2), + sigma=sigma_ln, size=(n_per_group, n_vars), ) raw_b = rng.lognormal( - mean=24.0 * np.log(2), sigma=sigma_ln, + mean=24.0 * np.log(2), + sigma=sigma_ln, size=(n_per_group, n_vars), ) X = np.vstack([np.log2(raw_t), np.log2(raw_b)]) @@ -109,7 +111,8 @@ def _make_non_log_adata() -> AnnData: """ rng = np.random.default_rng(2) X = rng.lognormal( - mean=23.0 * np.log(2), sigma=2.5 * np.log(2), + mean=23.0 * np.log(2), + sigma=2.5 * np.log(2), size=(20, 30), ) miss = rng.random(size=X.shape) < 0.2 @@ -146,18 +149,24 @@ def _theoretical_downshift_params(observed, downshift, width): def _draw_reference(mu, sigma, n, seed=100): """Draw a fixed-seed reference sample from N(mu, sigma).""" return np.random.default_rng(seed).normal( - loc=mu, scale=sigma, size=n, + loc=mu, + scale=sigma, + size=n, ) def compare_relative_means(observed, imputed, downshift, width): """Assert imputed mean sits below observed mean and matches theory.""" mu_th, sigma_th = _theoretical_downshift_params( - observed, downshift, width, + observed, + downshift, + width, ) assert imputed.mean() < observed.mean() np.testing.assert_allclose( - imputed.mean(), mu_th, atol=0.1 * sigma_th, + imputed.mean(), + mu_th, + atol=0.1 * sigma_th, ) @@ -166,18 +175,23 @@ def compare_quantiles(observed, imputed, qs=(0.25, 0.5, 0.75)): for q in qs: q_obs = float(np.quantile(observed, q)) q_imp = float(np.quantile(imputed, q)) - assert q_imp < q_obs, ( - f"q={q}: imputed {q_imp:.3f} ≥ observed {q_obs:.3f}" - ) + assert ( + q_imp < q_obs + ), f"q={q}: imputed {q_imp:.3f} ≥ observed {q_obs:.3f}" def compare_percentiles( - observed, imputed, downshift, width, + observed, + imputed, + downshift, + width, ps=(5, 25, 50, 75, 95), ): """Assert imputed percentiles match the theoretical downshifted normal.""" mu_th, sigma_th = _theoretical_downshift_params( - observed, downshift, width, + observed, + downshift, + width, ) # 0.3 * sigma_th ≈ 2 standard errors of the sample 5th-percentile # for the smaller per-group samples (n ≈ 250); much looser than @@ -185,12 +199,12 @@ def compare_percentiles( # callers — generous enough to absorb RNG drift, tight enough # that a sign flip or width misuse still breaks the test. for p in ps: - expected = float( - norm.ppf(p / 100.0, loc=mu_th, scale=sigma_th) - ) + expected = float(norm.ppf(p / 100.0, loc=mu_th, scale=sigma_th)) actual = float(np.percentile(imputed, p)) np.testing.assert_allclose( - actual, expected, atol=0.3 * sigma_th, + actual, + expected, + atol=0.3 * sigma_th, err_msg=f"p={p}: expected {expected:.3f}, got {actual:.3f}", ) @@ -332,7 +346,8 @@ def test_no_missing_values_returns_input_unchanged(self): """No NaN in input → output equals input and the mask is all False.""" rng = np.random.default_rng(0) raw = rng.lognormal( - mean=23.0 * np.log(2), sigma=2.5 * np.log(2), + mean=23.0 * np.log(2), + sigma=2.5 * np.log(2), size=(20, 30), ) X = np.log2(raw) @@ -375,7 +390,9 @@ def test_zero_to_na_true_treats_zeros_as_missing(self): adata.X[3, 0] = 0.0 result = impute_downshift( - adata, zero_to_na=True, inplace=False, + adata, + zero_to_na=True, + inplace=False, ) mask = np.asarray(result.layers["imputation_mask_X"]) @@ -387,15 +404,17 @@ def test_zero_to_na_true_treats_zeros_as_missing(self): # ── B. Statistical shape of imputed values ────────────────────── # - # The primary fixture has ~4000 imputed positions. + # The primary fixture has ~4000 imputed positions. # Defaults: downshift=1.8, width=0.3, random_state=42. def test_imputed_mean_is_shifted_and_matches_theoretical(self): adata_in = _make_log_adata_with_missing() result = impute_downshift( adata_in.copy(), - downshift=1.8, width=0.3, - random_state=42, inplace=False, + downshift=1.8, + width=0.3, + random_state=42, + inplace=False, ) observed, imputed = _split_observed_imputed(adata_in, result) compare_relative_means(observed, imputed, 1.8, 0.3) @@ -404,8 +423,10 @@ def test_imputed_quantiles_are_below_observed(self): adata_in = _make_log_adata_with_missing() result = impute_downshift( adata_in.copy(), - downshift=1.8, width=0.3, - random_state=42, inplace=False, + downshift=1.8, + width=0.3, + random_state=42, + inplace=False, ) observed, imputed = _split_observed_imputed(adata_in, result) compare_quantiles(observed, imputed) @@ -414,8 +435,10 @@ def test_imputed_percentiles_match_theoretical_normal(self): adata_in = _make_log_adata_with_missing() result = impute_downshift( adata_in.copy(), - downshift=1.8, width=0.3, - random_state=42, inplace=False, + downshift=1.8, + width=0.3, + random_state=42, + inplace=False, ) observed, imputed = _split_observed_imputed(adata_in, result) compare_percentiles(observed, imputed, 1.8, 0.3) @@ -425,12 +448,16 @@ def test_imputed_matches_theoretical_via_bayes_factor(self): adata_in = _make_log_adata_with_missing() result = impute_downshift( adata_in.copy(), - downshift=1.8, width=0.3, - random_state=42, inplace=False, + downshift=1.8, + width=0.3, + random_state=42, + inplace=False, ) observed, imputed = _split_observed_imputed(adata_in, result) mu_th, sigma_th = _theoretical_downshift_params( - observed, 1.8, 0.3, + observed, + 1.8, + 0.3, ) reference = _draw_reference(mu_th, sigma_th, imputed.size) _, p_h1 = bayes_factor_same_norm_distr(imputed, reference) @@ -450,7 +477,8 @@ def test_inplace_true_returns_none_and_mutates(self): assert "imputation_mask_X" in adata.layers finite_mask = np.isfinite(X_in) np.testing.assert_array_equal( - X_out[finite_mask], X_in[finite_mask], + X_out[finite_mask], + X_in[finite_mask], ) def test_inplace_false_returns_copy_and_preserves_original(self): @@ -461,7 +489,9 @@ def test_inplace_false_returns_copy_and_preserves_original(self): assert result is not adata assert np.array_equal( - adata.X, X_in_snapshot, equal_nan=True, + adata.X, + X_in_snapshot, + equal_nan=True, ) assert "imputation_mask_X" not in adata.layers assert "imputation_mask_X" in result.layers @@ -500,7 +530,8 @@ def test_random_state_reproducibility(self): r2 = impute_downshift(adata2, random_state=42, inplace=False) np.testing.assert_array_equal( - np.asarray(r1.X), np.asarray(r2.X), + np.asarray(r1.X), + np.asarray(r2.X), ) def test_random_state_none_is_nondeterministic(self): @@ -512,7 +543,8 @@ def test_random_state_none_is_nondeterministic(self): mask = np.asarray(r1.layers["imputation_mask_X"]) assert not np.array_equal( - np.asarray(r1.X)[mask], np.asarray(r2.X)[mask], + np.asarray(r1.X)[mask], + np.asarray(r2.X)[mask], ) # ── D. group_by behavior ──────────────────────────────────────── @@ -527,9 +559,12 @@ def test_group_by_imputed_distribution_per_group_matches_theoretical( X_in = adata.X.copy() result = impute_downshift( - adata, group_by="cell_type", - downshift=1.8, width=0.3, - random_state=42, inplace=False, + adata, + group_by="cell_type", + downshift=1.8, + width=0.3, + random_state=42, + inplace=False, ) mask = np.asarray(result.layers["imputation_mask_X"]) X_out = np.asarray(result.X) @@ -547,17 +582,22 @@ def test_group_by_imputed_distribution_per_group_matches_theoretical( compare_percentiles(grp_observed, grp_imputed, 1.8, 0.3) mu_th, sigma_th = _theoretical_downshift_params( - grp_observed, 1.8, 0.3, + grp_observed, + 1.8, + 0.3, ) reference = _draw_reference( - mu_th, sigma_th, grp_imputed.size, + mu_th, + sigma_th, + grp_imputed.size, ) _, p_h1 = bayes_factor_same_norm_distr( - grp_imputed, reference, - ) - assert p_h1 > BF_PH1_THRESHOLD, ( - f"{label}: BF p_h1={p_h1:.3f} below threshold" + grp_imputed, + reference, ) + assert ( + p_h1 > BF_PH1_THRESHOLD + ), f"{label}: BF p_h1={p_h1:.3f} below threshold" per_group_means[label] = grp_imputed.mean() @@ -569,22 +609,26 @@ def test_group_by_imputed_distribution_per_group_matches_theoretical( def test_group_by_fallback_to_global_when_group_too_small(self): adata = _make_grouped_log_adata( - n_per_group=20, n_vars=50, miss_frac=0.25, seed=1, + n_per_group=20, + n_vars=50, + miss_frac=0.25, + seed=1, ) # Add a third cell type "monocyte" with only 1 observation # AND fewer than 3 finite values, which is what triggers the # global-stats fallback (`grp_vals.size >= 3` is False). - cell_types = list( - adata.obs["cell_type"].astype(object).to_numpy() - ) + cell_types = list(adata.obs["cell_type"].astype(object).to_numpy()) cell_types[0] = "monocyte" adata.obs["cell_type"] = cell_types adata.X[0, 2:] = np.nan # leave 2 finite values in that row result = impute_downshift( - adata, group_by="cell_type", - downshift=1.8, width=0.3, - random_state=42, inplace=False, + adata, + group_by="cell_type", + downshift=1.8, + width=0.3, + random_state=42, + inplace=False, ) mask = np.asarray(result.layers["imputation_mask_X"]) X_out = np.asarray(result.X) @@ -620,10 +664,13 @@ def test_group_by_fallback_when_group_sd_zero(self): rng = np.random.default_rng(0) n_vars = 60 # T_cell: 5 obs with normal variation (good per-group stats). - X_t = np.log2(rng.lognormal( - mean=20.0 * np.log(2), sigma=1.5 * np.log(2), - size=(5, n_vars), - )) + X_t = np.log2( + rng.lognormal( + mean=20.0 * np.log(2), + sigma=1.5 * np.log(2), + size=(5, n_vars), + ) + ) # B_cell: 4 obs with a constant finite value (sd=0). Inject a # column of NaN so there are positions to impute. X_b = np.full((4, n_vars), 23.0) @@ -638,14 +685,18 @@ def test_group_by_fallback_when_group_sd_zero(self): ) var_names = [f"p{i}" for i in range(n_vars)] var = pd.DataFrame( - {"protein_id": var_names}, index=var_names, + {"protein_id": var_names}, + index=var_names, ) adata = AnnData(X=X, obs=obs, var=var) result = impute_downshift( - adata, group_by="cell_type", - downshift=1.8, width=0.3, - random_state=42, inplace=False, + adata, + group_by="cell_type", + downshift=1.8, + width=0.3, + random_state=42, + inplace=False, ) mask = np.asarray(result.layers["imputation_mask_X"]) X_out = np.asarray(result.X) @@ -664,13 +715,17 @@ def test_group_by_invalid_column_raises_keyerror(self): adata = _make_small_log_adata() with pytest.raises(KeyError, match=r"not found"): impute_downshift( - adata, group_by="not_a_col", inplace=False, + adata, + group_by="not_a_col", + inplace=False, ) def test_group_by_records_in_uns(self): adata = _make_grouped_log_adata() result = impute_downshift( - adata, group_by="cell_type", inplace=False, + adata, + group_by="cell_type", + inplace=False, ) assert result.uns["imputation"]["group_by"] == "cell_type" @@ -753,14 +808,17 @@ def test_too_few_finite_values_raises(self): obs_names = ["s0", "s1", "s2"] var_names = ["p0", "p1", "p2"] obs = pd.DataFrame( - {"sample_id": obs_names}, index=obs_names, + {"sample_id": obs_names}, + index=obs_names, ) var = pd.DataFrame( - {"protein_id": var_names}, index=var_names, + {"protein_id": var_names}, + index=var_names, ) adata = AnnData(X=X, obs=obs, var=var) with pytest.raises( - ValueError, match=r"Not enough finite values", + ValueError, + match=r"Not enough finite values", ): impute_downshift(adata, force=True, inplace=False) @@ -769,15 +827,18 @@ def test_zero_variance_raises(self): obs_names = [f"s{i}" for i in range(4)] var_names = [f"p{i}" for i in range(3)] obs = pd.DataFrame( - {"sample_id": obs_names}, index=obs_names, + {"sample_id": obs_names}, + index=obs_names, ) var = pd.DataFrame( - {"protein_id": var_names}, index=var_names, + {"protein_id": var_names}, + index=var_names, ) adata = AnnData(X=X, obs=obs, var=var) adata.X[0, 0] = np.nan with pytest.raises( - ValueError, match=r"standard deviation", + ValueError, + match=r"standard deviation", ): impute_downshift(adata, force=True, inplace=False) From 0542c676623606645b9f9999e65595feb088fbb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Levente=20Temesv=C3=A1ri-Nagy?= <147416790+leventetn@users.noreply.github.com> Date: Mon, 8 Jun 2026 11:16:49 +0200 Subject: [PATCH 5/5] test/impute_downshift docstring correction --- tests/pp/test_imputation.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/pp/test_imputation.py b/tests/pp/test_imputation.py index 1f14fe6..8d4454c 100644 --- a/tests/pp/test_imputation.py +++ b/tests/pp/test_imputation.py @@ -17,13 +17,15 @@ def _make_log_adata_with_missing( miss_frac: float = 0.25, seed: int = 0, ) -> AnnData: - """Log2 intensities mimicking real proteomics data. + """Log2 intensities for controlled statistical sampler tests. Raw intensities are drawn from a lognormal (the empirical shape of MS1 quantitative intensities) then log2-transformed, yielding a Gaussian log-intensity distribution with mean ≈ 23 and sd ≈ 2.5. - NaN missingness is injected MCAR. - Sized for statistical-test power. + Missingness is injected uniformly so tests can verify mask handling + and draws from the documented downshifted normal. This fixture + tests the imputation algorithm, not a biological missingness + mechanism. """ rng = np.random.default_rng(seed) # ln-space params chosen so log2(raw) ~ N(23, 2.5) @@ -497,7 +499,7 @@ def test_inplace_false_returns_copy_and_preserves_original(self): assert "imputation_mask_X" in result.layers @pytest.mark.skip( - reason="sparse handling not tested per project policy", + reason="sparse handling not tested in this version", ) def test_sparse_input_yields_sparse_output(self): adata = _make_small_log_adata() @@ -515,7 +517,7 @@ def test_sparse_input_yields_sparse_output(self): assert isinstance(result.X, sparse.csr_matrix) @pytest.mark.skip( - reason="sparse handling not tested per project policy", + reason="sparse handling not tested in this version", ) def test_dense_input_yields_dense_output(self): adata = _make_small_log_adata()