Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 68 additions & 9 deletions dowhy/causal_estimators/linear_regression_estimator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import itertools
from typing import List, Optional, Union

import numpy as np
import pandas as pd
import scipy.stats
import statsmodels.api as sm

from dowhy.causal_estimator import CausalEstimator
Expand Down Expand Up @@ -105,16 +107,65 @@ def _build_model(self, data: pd.DataFrame):
model = sm.OLS(data[self._target_estimand.outcome_variable[0]], features).fit()
return (features, model)

def _ate_and_se_for_treatment(self, treatment_index: int):
"""Compute the unscaled ATE and its standard error for one treatment variable.

Uses the Delta method: for ATE = c'β, SE = sqrt(c' Σ c), where Σ is the
OLS parameter covariance matrix and c is the contrast vector.

The feature column order produced by ``_build_features`` (after ``sm.add_constant``) is:
[const, T_0, …, T_k, W_0, …, W_m, T_0·X_0, …, T_0·X_n, T_1·X_0, …]

:param treatment_index: 0-based index into the treatment variable list.
:returns: Tuple of (ate_unscaled, se_unscaled).
"""
n_treatments = len(self._target_estimand.treatment_variable)
# Use the actual number of encoded columns, not the number of variable names.
# Categorical variables are one-hot encoded (drop_first=True), so a variable
# with k levels produces k-1 columns — len(names) would be wrong.
n_common_causes = self._observed_common_causes.shape[1] if self._observed_common_causes is not None else 0
em_means = self._effect_modifiers.mean(axis=0).to_numpy()
n_effect_modifiers = len(em_means)

params = self.model.params.to_numpy()
cov = self.model.cov_params().to_numpy()

n_params = len(params)
expected_params = 1 + n_treatments + n_common_causes + n_treatments * n_effect_modifiers
assert n_params == expected_params, (
f"Model has {n_params} params but expected {expected_params}. "
"Column ordering assumption in _ate_and_se_for_treatment may be broken "
"(check that encoded column counts are used, not variable name counts)."
)
c = np.zeros(n_params)
# Direct treatment coefficient (offset by 1 for the intercept)
c[1 + treatment_index] = 1.0
# Interaction coefficients T_i · X_j start at:
# 1 (const) + n_treatments + n_common_causes + treatment_index * n_effect_modifiers
interaction_start = 1 + n_treatments + n_common_causes + treatment_index * n_effect_modifiers
c[interaction_start : interaction_start + n_effect_modifiers] = em_means

ate = float(c @ params)
var_ate = float(c @ cov @ c)
se = float(np.sqrt(max(var_ate, 0.0)))
return ate, se

def _estimate_confidence_intervals(self, confidence_level, method=None):
if self._effect_modifier_names:
# The average treatment effect is a combination of different
# regression coefficients. Complicated to compute the confidence
# interval analytically. For example, if y=a + b1.t + b2.tx, then
# the average treatment effect is b1+b2.mean(x).
# Refer Gelman, Hill. ARM Book. Chapter 9
# http://www.stat.columbia.edu/~gelman/arm/chap9.pdf
# TODO: Looking for contributions
raise NotImplementedError
# Use the Delta method to compute asymptotic confidence intervals for the
# ATE when effect modifiers are present. The ATE is a linear combination
# of the OLS coefficients: ATE = b_T + b_{TX_1}*E[X_1] + …
# Reference: Gelman & Hill, ARM Book, Chapter 9
n_treatments = len(self._target_estimand.treatment_variable)
scale = self._treatment_value - self._control_value
t_score = scipy.stats.t.ppf((1.0 + confidence_level) / 2.0, df=self.model.df_resid)
rows = []
for i in range(n_treatments):
ate_unscaled, se_unscaled = self._ate_and_se_for_treatment(i)
ate_scaled = scale * ate_unscaled
margin = abs(scale) * t_score * se_unscaled
rows.append([ate_scaled - margin, ate_scaled + margin])
return np.array(rows)
else:
conf_ints = self.model.conf_int(alpha=1 - confidence_level)
# For a linear regression model, the causal effect of a variable is equal to the coefficient corresponding to the
Expand All @@ -126,7 +177,15 @@ def _estimate_confidence_intervals(self, confidence_level, method=None):

def _estimate_std_error(self, method=None):
if self._effect_modifier_names:
raise NotImplementedError
# Delta method: SE(scale * ATE) = |scale| * sqrt(c' Σ c)
scale = self._treatment_value - self._control_value
ses = np.array(
[
abs(scale) * self._ate_and_se_for_treatment(i)[1]
for i in range(len(self._target_estimand.treatment_variable))
]
)
return ses
else:
std_error = self.model.bse[1 : (len(self._target_estimand.treatment_variable) + 1)]

Expand Down
185 changes: 185 additions & 0 deletions tests/causal_estimators/test_linear_regression_estimator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import numpy as np
from pytest import mark

import dowhy.datasets
from dowhy import EstimandType, identify_effect_auto
from dowhy.causal_estimators.linear_regression_estimator import LinearRegressionEstimator
from dowhy.graph import build_graph_from_str

from .base import SimpleEstimator, TestGraphObject, example_graph

Expand Down Expand Up @@ -187,3 +190,185 @@ def test_general_adjustment_estimation_on_example_graphs(self, example_graph: Te
data["df"] = data["df"][example_graph.observed_nodes]
estimator_tester = SimpleEstimator(0.1, LinearRegressionEstimator, identifier_method="general_adjustment")
estimator_tester.custom_data_average_treatment_effect_test(data)


class TestLinearRegressionAsymptoticCI:
"""Tests for the Delta-method asymptotic CI/SE with effect modifiers (issue #336)."""

def _make_dataset_and_estimand(self, num_effect_modifiers=1, num_common_causes=1, num_treatments=1, seed=42):
np.random.seed(seed)
data = dowhy.datasets.linear_dataset(
beta=5,
num_common_causes=num_common_causes,
num_instruments=0,
num_effect_modifiers=num_effect_modifiers,
num_treatments=num_treatments,
num_samples=2000,
treatment_is_binary=False,
)
gml_graph = data["gml_graph"]
df = data["df"]
target_estimand = identify_effect_auto(
build_graph_from_str(gml_graph),
observed_nodes=list(df.columns),
action_nodes=data["treatment_name"],
outcome_nodes=data["outcome_name"],
estimand_type=EstimandType.NONPARAMETRIC_ATE,
)
target_estimand.set_identifier_method("backdoor")
return data, target_estimand

def test_ci_returned_not_raises_single_treatment_single_em(self):
"""No NotImplementedError for single treatment + single effect modifier."""
data, estimand = self._make_dataset_and_estimand(num_effect_modifiers=1)
estimator = LinearRegressionEstimator(
identified_estimand=estimand,
confidence_intervals=True,
)
estimator.fit(data["df"], effect_modifier_names=data["effect_modifier_names"])
estimate = estimator.estimate_effect(
data["df"],
treatment_value=1,
control_value=0,
confidence_intervals=True,
)
ci = estimate.get_confidence_intervals()
assert ci is not None
assert ci.shape == (1, 2), f"Expected shape (1,2), got {ci.shape}"
lower, upper = ci[0]
assert lower < upper, "CI lower bound must be less than upper bound"

def test_ci_contains_true_ate_with_high_probability(self):
"""95% CI should bracket the true ATE on a large sample."""
data, estimand = self._make_dataset_and_estimand(num_effect_modifiers=2, num_common_causes=1, seed=0)
estimator = LinearRegressionEstimator(
identified_estimand=estimand,
confidence_intervals=True,
confidence_level=0.95,
)
estimator.fit(data["df"], effect_modifier_names=data["effect_modifier_names"])
estimate = estimator.estimate_effect(
data["df"],
treatment_value=1,
control_value=0,
confidence_intervals=True,
)
ci = estimate.get_confidence_intervals()
lower, upper = ci[0]
true_ate = data["ate"]
assert lower <= true_ate <= upper, f"True ATE {true_ate:.4f} not inside 95% CI [{lower:.4f}, {upper:.4f}]"

def test_std_error_positive_with_effect_modifier(self):
"""Standard error should be positive and finite when effect modifiers are present."""
data, estimand = self._make_dataset_and_estimand(num_effect_modifiers=1)
estimator = LinearRegressionEstimator(
identified_estimand=estimand,
test_significance=True,
confidence_intervals=True,
)
estimator.fit(data["df"], effect_modifier_names=data["effect_modifier_names"])
estimate = estimator.estimate_effect(
data["df"],
treatment_value=1,
control_value=0,
confidence_intervals=True,
)
se = estimate.get_standard_error()
assert se is not None
assert np.all(np.isfinite(se)), "SE should be finite"
assert np.all(se > 0), "SE should be positive"

def test_ci_consistent_with_no_effect_modifier(self):
"""With no effect modifiers, Delta-method and direct statsmodels CI should agree."""
data, estimand = self._make_dataset_and_estimand(num_effect_modifiers=0)
estimator = LinearRegressionEstimator(
identified_estimand=estimand,
confidence_intervals=True,
confidence_level=0.95,
)
estimator.fit(data["df"], effect_modifier_names=[])
estimate = estimator.estimate_effect(
data["df"],
treatment_value=1,
control_value=0,
confidence_intervals=True,
)
ci = estimate.get_confidence_intervals()
assert ci is not None
lower, upper = ci[0]
assert lower < upper

def _make_dataset_with_categorical_common_cause(self, n_levels=3, seed=42):
"""Build a simple synthetic dataset with one continuous effect modifier and
one categorical common cause (n_levels levels), for testing categorical encoding."""
import pandas as pd
from dowhy import CausalModel

rng = np.random.default_rng(seed)
n = 500
# Categorical common cause W with n_levels levels
w = rng.integers(0, n_levels, size=n).astype(str)
# Continuous effect modifier X
x = rng.standard_normal(n)
# Treatment T (continuous, affected by W)
t = (w == "0").astype(float) + rng.standard_normal(n) * 0.5
# Outcome Y depends on T, T*X interaction, and W (true ATE ~2)
beta_t = 2.0
y = beta_t * t + 0.5 * t * x + 0.3 * (w == "1").astype(float) + rng.standard_normal(n) * 0.2

df = pd.DataFrame({"T": t, "Y": y, "W": pd.Categorical(w), "X": x})
graph_str = "digraph { W -> T; W -> Y; T -> Y; X -> Y }"
model = CausalModel(data=df, treatment="T", outcome="Y", graph=graph_str)
estimand = model.identify_effect(proceed_when_unidentifiable=True)
estimand.set_identifier_method("backdoor")
return df, estimand, beta_t

def test_ci_no_error_with_categorical_common_cause(self):
"""Delta-method CI should work when a common cause is categorical (multi-level)."""
df, estimand, _ = self._make_dataset_with_categorical_common_cause(n_levels=3)
estimator = LinearRegressionEstimator(
identified_estimand=estimand,
confidence_intervals=True,
)
estimator.fit(df, effect_modifier_names=["X"])
estimate = estimator.estimate_effect(
df,
treatment_value=1,
control_value=0,
confidence_intervals=True,
)
ci = estimate.get_confidence_intervals()
assert ci is not None
lower, upper = ci[0]
assert lower < upper, "CI lower bound must be less than upper bound"

def test_ci_uses_encoded_column_count_not_name_count(self):
"""Regression test: interaction_start must use encoded column width, not len(names).

A 3-level categorical common cause W encodes to 2 columns (drop_first=True).
If we use len(observed_common_causes_names)==1 instead of shape[1]==2, the
interaction_start index is off-by-one and we silently pick the wrong coefficient.
This test verifies the CI is finite and the assertion inside
_ate_and_se_for_treatment does not fire.
"""
df, estimand, true_ate = self._make_dataset_with_categorical_common_cause(n_levels=4, seed=7)
estimator = LinearRegressionEstimator(
identified_estimand=estimand,
confidence_intervals=True,
confidence_level=0.95,
)
estimator.fit(df, effect_modifier_names=["X"])
estimate = estimator.estimate_effect(
df,
treatment_value=1,
control_value=0,
confidence_intervals=True,
)
ci = estimate.get_confidence_intervals()
assert ci is not None
lower, upper = ci[0]
assert np.isfinite(lower) and np.isfinite(upper), "CI bounds must be finite"
assert lower < upper, "CI lower bound must be less than upper bound"
se = estimate.get_standard_error()
assert se is not None
assert np.all(np.isfinite(se)) and np.all(se > 0), "SE must be positive and finite"
Loading