Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions docs/debiasconvex_nonnegative_guardrails.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# DebiasConvex Non-Negative Guardrails

This document describes local branch changes for experimental non-negative
support in `DebiasConvex`. These notes are branch-local and are not package
release notes.

## Local Branch Changelog

- Marked `method_non_neg` in `DebiasConvex` as an experimental
point-estimation heuristic.
- Limited the supported upstream-facing non-negative method to
`method_non_neg="svd"`.
- Rejected `method_non_neg="nnmf"` with a clear top-level `ValueError` because
it changes the model family and is not currently compatible with the
debiasing and standard-error formulas.
- Emitted a `RuntimeWarning` whenever `method_non_neg` is used.
- Returned `std=None`, `inference_valid=False`, and `non_negative_method="svd"`
for non-negative fits.
- Added diagnostics for baseline negativity, rank, stable rank, projected
treatment-design conditioning, and residual norm.
- Kept raw DebiasConvex mode backward compatible: `std` is still returned and
`inference_valid=True`.
- Added tests for invalid methods, `nnmf` policy, raw-mode inference outputs,
non-negative baseline invariants, wrapper behavior, and diagnostics.

## Rationale

The non-negative option should not be presented as inference-valid
DebiasConvex estimation. The standard-error calculation depends on the
low-rank geometry used by the debiasing step, and non-negative projections
modify that geometry. The implementation therefore treats the option as a
transparent point-estimation heuristic:

```python
from causaltensor.cauest.DebiasConvex import DCPanelSolver

solver = DCPanelSolver(Z=Z, O=O)
res = solver.fit(suggest_r=2, method="non-convex", method_non_neg="svd")

print(res.tau)
print(res.std) # None
print(res.inference_valid) # False
print(res.non_negative_method) # "svd"
print(res.diagnostics)
```

The `nnmf` path remains research-only for now. It is intentionally rejected in
this branch because it is a different non-negative factorization family and was
not consistently compatible with the convex debiasing continuation path.

## Issue Context

This follows the non-negativity discussion from
https://github.com/TianyiPeng/causaltensor/issues/12. In particular, hard
clipping inside the iterative DebiasConvex update should not be treated as a
safe default: preliminary experiments ran but produced extremely large
standard errors, which is why this implementation disables standard-error
outputs when the non-negative heuristic is requested.
124 changes: 115 additions & 9 deletions src/causaltensor/cauest/DebiasConvex.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import numpy as np
import warnings
import causaltensor.matlib.util as util
from causaltensor.matlib.util import transform_to_3D
from causaltensor.cauest.result import Result
from causaltensor.cauest.panel_solver import PanelSolver
from causaltensor.cauest.panel_solver import FixedEffectPanelSolver

class DCResult(Result):
def __init__(self, baseline = None, tau=None, std=None, return_tau_scalar=False):
def __init__(self, baseline = None, tau=None, std=None, return_tau_scalar=False,
inference_valid=True, non_negative_method=None, diagnostics=None):
super().__init__(baseline = baseline, tau = tau, return_tau_scalar = return_tau_scalar)
self.std = std
self.M = baseline # for backward compatability
self.inference_valid = inference_valid
self.non_negative_method = non_negative_method
self.diagnostics = {} if diagnostics is None else diagnostics

class DCPanelSolver(PanelSolver):
def __init__(self, Z=None, O=None, suggest_r=None):
Expand All @@ -28,6 +33,8 @@ def __init__(self, Z=None, O=None, suggest_r=None):
self.Z = transform_to_3D(Z) ## Z is (n1 x n2 x num_treat) numpy array
self.suggest_R = suggest_r
self.small_index, self.X, self.Xinv = self.prepare_OLS()
self._non_negative_warning_emitted = False
self.last_non_negative_diagnostics = None


def prepare_OLS(self):
Expand All @@ -38,15 +45,55 @@ def prepare_OLS(self):
Xinv = np.linalg.inv(X.T @ X)
return small_index, X, Xinv

def _validate_method_non_neg(self, method_non_neg):
if method_non_neg is None:
return None
if method_non_neg != "svd":
raise ValueError(
"method_non_neg must be None or 'svd'. The 'nnmf' option is "
"not supported for DebiasConvex because it changes the model "
"family and is not currently compatible with the debiasing and "
"standard-error formulas."
)
return method_non_neg

def _warn_if_non_negative_method(self, method_non_neg):
if method_non_neg is None or self._non_negative_warning_emitted:
return
warnings.warn(
"DebiasConvex non-negative modes are experimental point-estimation "
"heuristics. They modify the low-rank geometry used by the debiasing "
"and standard-error formulas, so standard errors are not reported as "
"inference-valid when method_non_neg is used.",
RuntimeWarning,
stacklevel=3,
)
self._non_negative_warning_emitted = True

def fit(self, suggest_r=None, auto_rank=True, spectrum_cut=0.002, method='auto', method_non_neg=None):
method_non_neg = self._validate_method_non_neg(method_non_neg)
self._warn_if_non_negative_method(method_non_neg)
if suggest_r is not None:
M, tau, std = self.DC_PR_with_suggested_rank(suggest_r=suggest_r, method=method, method_non_neg=method_non_neg)
elif auto_rank:
M, tau, std = self.DC_PR_auto_rank(spectrum_cut=spectrum_cut, method=method, method_non_neg=method_non_neg)
else:
raise ValueError("Either suggest_r or auto_rank must be provided")

res = DCResult(baseline=M, tau=tau, std=std)
inference_valid = method_non_neg is None
diagnostics = None
if method_non_neg is not None:
std = None
diagnostics = self.last_non_negative_diagnostics

res = DCResult(
baseline=M,
tau=tau,
std=std,
inference_valid=inference_valid,
non_negative_method=method_non_neg,
diagnostics=diagnostics,
)
return res

def solve_tau(self, O):
Expand All @@ -55,6 +102,8 @@ def solve_tau(self, O):
return tau

def als(self, tau, eps, l=None, r=None, method_non_neg=None):
method_non_neg = self._validate_method_non_neg(method_non_neg)
self._warn_if_non_negative_method(method_non_neg)
for T in range(2000):
M_new = self.O - np.tensordot(self.Z, tau, axes=([2], [0]))
#### SVD to find low-rank M
Expand All @@ -74,9 +123,48 @@ def als(self, tau, eps, l=None, r=None, method_non_neg=None):

#### Check convergence
if (np.linalg.norm(tau_new - tau) < eps * np.linalg.norm(tau)):
return M, tau
return M, tau_new
tau = tau_new
return M, tau

def _projected_treatment_design(self, u, vh):
X = np.zeros((self.Z.shape[0]*self.Z.shape[1], self.Z.shape[2]))
for k in np.arange(self.Z.shape[2]):
X[:, k] = util.remove_tangent_space_component(u, vh, self.Z[:, :, k]).reshape(-1)
return X

def non_negative_diagnostics(self, M, tau, E, method_non_neg):
u, s, vh = util.svd_fast(M)
rank = int(np.linalg.matrix_rank(M))
stable_rank = 0.0
if len(s) > 0 and s[0] > 0:
stable_rank = float(np.sum(s**2) / (s[0]**2))

X = self._projected_treatment_design(u[:, :rank], vh[:rank, :])
XtX = X.T @ X
eigvals = np.linalg.eigvalsh((XtX + XtX.T) / 2)
max_eig = float(np.max(eigvals)) if eigvals.size else np.nan
min_eig = float(np.min(eigvals)) if eigvals.size else np.nan
if eigvals.size == 0 or min_eig <= 1e-12:
condition_number = np.inf
else:
condition_number = float(max_eig / min_eig)

negative_part = np.minimum(M, 0)
return {
"method_non_neg": method_non_neg,
"inference_valid": False,
"std_available": False,
"baseline_min": float(np.min(M)),
"negative_fraction": float(np.mean(M < 0)),
"negative_mass": float(np.sum(-negative_part)),
"max_negative_magnitude": float(np.max(-negative_part)),
"rank": rank,
"stable_rank": stable_rank,
"projected_design_min_eigenvalue": min_eig,
"projected_design_condition_number": condition_number,
"residual_frobenius_norm": float(np.linalg.norm(E)),
}

def debias(self, M, tau, l):
u, s, vh = util.svd_fast(M)
Expand Down Expand Up @@ -131,6 +219,8 @@ def DC_PR_with_l(self, l, initial_tau = None, eps = 1e-6, method_non_neg=None):
tau : (num_treat,) float numpy array
Estimated treatment effects.
"""
method_non_neg = self._validate_method_non_neg(method_non_neg)
self._warn_if_non_negative_method(method_non_neg)
if initial_tau is None:
tau = np.zeros(self.Z.shape[2])
else:
Expand Down Expand Up @@ -160,6 +250,8 @@ def non_convex_PR(self, r, initial_tau = None, eps = 1e-6, method_non_neg=None):
tau : (num_treat,) float numpy array
Estimated treatment effects.
"""
method_non_neg = self._validate_method_non_neg(method_non_neg)
self._warn_if_non_negative_method(method_non_neg)
if initial_tau is None:
tau = np.zeros(self.Z.shape[2])
else:
Expand All @@ -177,6 +269,8 @@ def DC_PR_with_suggested_rank(self, suggest_r = 1, method = 'non-convex', method
:param Z: intervention matrix

"""
method_non_neg = self._validate_method_non_neg(method_non_neg)
self._warn_if_non_negative_method(method_non_neg)
## determine pre_tau
pre_tau = self.solve_tau(self.O)

Expand Down Expand Up @@ -213,15 +307,29 @@ def DC_PR_with_suggested_rank(self, suggest_r = 1, method = 'non-convex', method
M = M1
tau = tau1

CI = self.panel_regression_CI(M, self.O-M-np.tensordot(self.Z, tau, axes=([2], [0])))
standard_deviation = np.sqrt(np.diag(CI))
E = self.O-M-np.tensordot(self.Z, tau, axes=([2], [0]))
if method_non_neg is not None:
standard_deviation = None
self.last_non_negative_diagnostics = self.non_negative_diagnostics(
M=M,
tau=tau,
E=E,
method_non_neg=method_non_neg,
)
else:
CI = self.panel_regression_CI(M, E)
standard_deviation = np.sqrt(np.diag(CI))
if len(tau) == 1:
if standard_deviation is None:
return M, tau[0], None
return M, tau[0], standard_deviation[0]
else:
return M, tau, standard_deviation


def DC_PR_auto_rank(self, spectrum_cut = 0.002, method='convex', method_non_neg=None):
method_non_neg = self._validate_method_non_neg(method_non_neg)
self._warn_if_non_negative_method(method_non_neg)
s = np.linalg.svd(self.O, full_matrices = False, compute_uv=False)
suggest_r = np.sum(np.cumsum(s**2) / np.sum(s**2) <= 1-spectrum_cut)
return self.DC_PR_with_suggested_rank(suggest_r = suggest_r, method=method, method_non_neg=method_non_neg)
Expand All @@ -247,9 +355,7 @@ def panel_regression_CI(self, M, E):
u = u[:, :r]
vh = vh[:r, :]

X = np.zeros((self.Z.shape[0]*self.Z.shape[1], self.Z.shape[2]))
for k in np.arange(self.Z.shape[2]):
X[:, k] = util.remove_tangent_space_component(u, vh, self.Z[:, :, k]).reshape(-1)
X = self._projected_treatment_design(u, vh)

A = (np.linalg.inv(X.T@X)@X.T)
CI = (A * np.reshape(E**2, -1)) @ A.T
Expand All @@ -266,4 +372,4 @@ def DC_PR_auto_rank(O, Z, spectrum_cut = 0.002, method='convex', method_non_neg=
def DC_PR_with_suggested_rank(O, Z, suggest_r = 1, method = 'convex', method_non_neg=None):
solver = DCPanelSolver(Z, O)
res = solver.fit(auto_rank=False, suggest_r=suggest_r, method=method, method_non_neg=method_non_neg)
return res.M, res.tau, res.std
return res.M, res.tau, res.std
93 changes: 93 additions & 0 deletions tests/test_debiasconvex_nonnegative_guardrails.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import warnings

import numpy as np
import pytest

from causaltensor.cauest.DebiasConvex import (
DCPanelSolver,
DC_PR_with_suggested_rank,
)


def make_positive_panel(shape=(6, 6)):
M0 = np.outer(
np.linspace(1.0, 2.0, shape[0]),
np.linspace(0.5, 1.5, shape[1]),
)
Z = np.zeros_like(M0)
Z[shape[0] // 2 :, shape[1] // 2 :] = 1
O = M0 + 0.25 * Z
return O, Z


def test_nonnegative_fit_warns_disables_std_and_adds_diagnostics():
O, Z = make_positive_panel()

solver = DCPanelSolver(Z=Z, O=O)
with pytest.warns(RuntimeWarning, match="experimental"):
res = solver.fit(suggest_r=1, method="non-convex", method_non_neg="svd")

assert res.std is None
assert res.inference_valid is False
assert res.non_negative_method == "svd"
assert res.diagnostics["std_available"] is False
assert res.diagnostics["inference_valid"] is False
assert res.diagnostics["method_non_neg"] == "svd"
assert "projected_design_condition_number" in res.diagnostics
assert "residual_frobenius_norm" in res.diagnostics


def test_raw_mode_keeps_inference_outputs():
O, Z = make_positive_panel()

solver = DCPanelSolver(Z=Z, O=O)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
res = solver.fit(suggest_r=1, method="non-convex")

runtime_warnings = [item for item in caught if issubclass(item.category, RuntimeWarning)]
assert runtime_warnings == []
assert res.std is not None
assert np.isfinite(res.std)
assert res.inference_valid is True
assert res.non_negative_method is None
assert res.diagnostics == {}


def test_nonnegative_fit_returns_nonnegative_baseline():
O, Z = make_positive_panel()

solver = DCPanelSolver(Z=Z, O=O)
with pytest.warns(RuntimeWarning, match="experimental"):
res = solver.fit(suggest_r=1, method="non-convex", method_non_neg="svd")

assert np.min(res.baseline) >= -1e-10
assert res.diagnostics["baseline_min"] >= -1e-10
assert res.diagnostics["negative_fraction"] == 0


def test_nonnegative_wrapper_returns_none_std():
O, Z = make_positive_panel()

with pytest.warns(RuntimeWarning, match="experimental"):
M, tau, std = DC_PR_with_suggested_rank(
O,
Z,
suggest_r=1,
method="non-convex",
method_non_neg="svd",
)

assert M.shape == O.shape
assert np.min(M) >= -1e-10
assert np.isfinite(tau)
assert std is None


@pytest.mark.parametrize("method_non_neg", ["clip", "nnmf"])
def test_rejects_unsupported_nonnegative_methods(method_non_neg):
O, Z = make_positive_panel()

solver = DCPanelSolver(Z=Z, O=O)
with pytest.raises(ValueError, match="method_non_neg must be None or 'svd'"):
solver.fit(suggest_r=1, method="non-convex", method_non_neg=method_non_neg)
Loading