From 7822a73a1017b75eb36530c091d9e24fd6e5836f Mon Sep 17 00:00:00 2001 From: gaurav <721466+soodoku@users.noreply.github.com> Date: Sun, 17 Aug 2025 12:30:50 -0700 Subject: [PATCH] Add analytic Hessian bandwidth selection --- src/derivatives.py | 105 +++++++++++++++++++++++++++++++++++ src/kde_analytic_hessian.py | 93 +++++++++++++++++++++++++++++++ src/nw_analytic_hessian.py | 106 ++++++++++++++++++++++++++++++++++++ 3 files changed, 304 insertions(+) create mode 100644 src/derivatives.py create mode 100644 src/kde_analytic_hessian.py create mode 100644 src/nw_analytic_hessian.py diff --git a/src/derivatives.py b/src/derivatives.py new file mode 100644 index 0000000..9ea4f89 --- /dev/null +++ b/src/derivatives.py @@ -0,0 +1,105 @@ +import numpy as np + +SQRT_2PI = np.sqrt(2 * np.pi) + + +def _poly_mask(u: np.ndarray, mask: np.ndarray, expr: np.ndarray) -> np.ndarray: + """Return piecewise polynomial values for Epanechnikov kernels. + + Parameters + ---------- + u : np.ndarray + Evaluation points. + mask : np.ndarray + Boolean mask where the polynomial expression is valid. + expr : np.ndarray + Polynomial expression evaluated elementwise on ``u``. + """ + out = np.zeros_like(u, dtype=float) + if np.isscalar(u): + return expr if mask else 0.0 + out[mask] = expr[mask] + return out + + +# Gaussian kernel and its convolution ----------------------------------------- + +def gauss(u: np.ndarray) -> np.ndarray: + """Standard Gaussian kernel K(u).""" + return np.exp(-0.5 * u * u) / SQRT_2PI + + +def gauss_p(u: np.ndarray) -> np.ndarray: + """First derivative of the Gaussian kernel.""" + return -u * gauss(u) + + +def gauss_pp(u: np.ndarray) -> np.ndarray: + """Second derivative of the Gaussian kernel.""" + return (u * u - 1.0) * gauss(u) + + +def gauss_conv(u: np.ndarray) -> np.ndarray: + """Convolution K*K of the Gaussian kernel.""" + return np.exp(-0.25 * u * u) / np.sqrt(4 * np.pi) + + +def gauss_conv_p(u: np.ndarray) -> np.ndarray: + """First derivative of the Gaussian kernel convolution.""" + return -0.5 * u * gauss_conv(u) + + +def gauss_conv_pp(u: np.ndarray) -> np.ndarray: + """Second derivative of the Gaussian kernel convolution.""" + return (0.25 * u * u - 0.5) * gauss_conv(u) + + +# Epanechnikov kernel and its convolution ------------------------------------ + +def _abs(u: np.ndarray) -> np.ndarray: + return np.abs(u) + + +def epan(u: np.ndarray) -> np.ndarray: + """Epanechnikov kernel K(u).""" + return _poly_mask(u, _abs(u) <= 1, 0.75 * (1 - u * u)) + + +def epan_p(u: np.ndarray) -> np.ndarray: + """First derivative of the Epanechnikov kernel.""" + return _poly_mask(u, _abs(u) <= 1, -1.5 * u) + + +def epan_pp(u: np.ndarray) -> np.ndarray: + """Second derivative of the Epanechnikov kernel.""" + return _poly_mask(u, _abs(u) <= 1, -1.5 + 0.0 * u) + + +def epan_conv(u: np.ndarray) -> np.ndarray: + """Convolution K*K of the Epanechnikov kernel (valid for |u|≤2).""" + absu = _abs(u) + poly = 0.6 - 0.75 * absu**2 + 0.375 * absu**3 - 0.01875 * absu**5 + return _poly_mask(u, absu <= 2, poly) + + +def epan_conv_p(u: np.ndarray) -> np.ndarray: + """First derivative of the Epanechnikov kernel convolution.""" + absu = _abs(u) + poly = np.sign(u) * (-0.09375 * absu**4 + 1.125 * absu**2 - 1.5 * absu) + return _poly_mask(u, absu <= 2, poly) + + +def epan_conv_pp(u: np.ndarray) -> np.ndarray: + """Second derivative of the Epanechnikov kernel convolution.""" + absu = _abs(u) + poly = -0.375 * absu**3 + 2.25 * absu - 1.5 + return _poly_mask(u, absu <= 2, poly) + + +# Convenience dictionaries ---------------------------------------------------- + +KERNELS = { + "gauss": (gauss, gauss_p, gauss_pp, gauss_conv, gauss_conv_p, gauss_conv_pp), + "epan": (epan, epan_p, epan_pp, epan_conv, epan_conv_p, epan_conv_pp), +} + diff --git a/src/kde_analytic_hessian.py b/src/kde_analytic_hessian.py new file mode 100644 index 0000000..1355ef1 --- /dev/null +++ b/src/kde_analytic_hessian.py @@ -0,0 +1,93 @@ +"""Newton–Armijo bandwidth selection for univariate KDE.""" + +import argparse +from typing import Callable, Tuple + +import numpy as np + +from derivatives import KERNELS + + +def lscv_generic(x: np.ndarray, h: float, kernel: str) -> Tuple[float, float, float]: + """Return LSCV score, gradient and Hessian for bandwidth *h*. + + Parameters + ---------- + x : np.ndarray + Sample data. + h : float + Bandwidth. + kernel : str + Kernel name: ``"gauss"`` or ``"epan"``. + """ + K, Kp, Kpp, K2, K2p, K2pp = KERNELS[kernel] + n = len(x) + u = (x[:, None] - x[None, :]) / h + + # score + term1 = K2(u).sum() / (n ** 2 * h) + Ku = K(u) + term2 = (Ku.sum() - np.sum(np.diag(Ku))) / (n * (n - 1) * h) + score = term1 - 2 * term2 + + # gradient + S_F = (K2(u) + u * K2p(u)).sum() + S_K_matrix = Ku + u * Kp(u) + S_K = S_K_matrix.sum() - np.sum(np.diag(S_K_matrix)) + grad = -S_F / (n ** 2 * h ** 2) + 2 * S_K / (n * (n - 1) * h ** 2) + + # Hessian + S_F2 = (2 * K2p(u) + u * K2pp(u)).sum() + Kp_u = Kp(u) + Kpp_u = Kpp(u) + S_K2_matrix = 2 * Kp_u + u * Kpp_u + S_K2 = S_K2_matrix.sum() - np.sum(np.diag(S_K2_matrix)) + hess = 2 * S_F / (n ** 2 * h ** 3) - S_F2 / (n ** 2 * h ** 2) + hess += -4 * S_K / (n * (n - 1) * h ** 3) + 2 * S_K2 / (n * (n - 1) * h ** 2) + return score, grad, hess + + +def newton_armijo( + x: np.ndarray, + h0: float, + kernel: str = "gauss", + tol: float = 1e-5, + max_iter: int = 12, +) -> Tuple[float, int]: + """Run Newton–Armijo to minimise LSCV and return (h_opt, evaluations).""" + h = float(h0) + evals = 0 + for _ in range(max_iter): + f, g, H = lscv_generic(x, h, kernel) + evals += 1 + if abs(g) < tol: + break + step = -g / H if (H > 0 and np.isfinite(H)) else -0.25 * g + if abs(step) / h < 1e-3: + break + for _ in range(10): + h_new = max(h + step, 1e-6) + if lscv_generic(x, h_new, kernel)[0] < f: + h = h_new + break + step *= 0.5 + return h, evals + + +def main() -> None: + parser = argparse.ArgumentParser(description="Analytic-Hessian KDE bandwidth selection") + parser.add_argument("data", nargs="?", help="Path to 1D data (one value per line)") + parser.add_argument("--kernel", choices=["gauss", "epan"], default="gauss") + parser.add_argument("--h0", type=float, default=1.0, help="Initial bandwidth guess") + args = parser.parse_args() + + if args.data: + x = np.loadtxt(args.data, ndmin=1) + else: + x = np.random.randn(200) + h, evals = newton_armijo(x, args.h0, kernel=args.kernel) + print(f"Optimal h={h:.5f} after {evals} evaluations") + + +if __name__ == "__main__": + main() diff --git a/src/nw_analytic_hessian.py b/src/nw_analytic_hessian.py new file mode 100644 index 0000000..5209241 --- /dev/null +++ b/src/nw_analytic_hessian.py @@ -0,0 +1,106 @@ +"""Newton–Armijo bandwidth selection for Nadaraya–Watson regression.""" + +import argparse +from typing import Tuple + +import numpy as np + +SQRT_2PI = np.sqrt(2 * np.pi) + + +def _weights(u: np.ndarray, h: float, kernel: str) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + """Return weights w, w', w'' for given kernel.""" + if kernel == "gauss": + base = np.exp(-0.5 * u * u) / (h * SQRT_2PI) + w1 = base * (u * u - 1) / h + w2 = base * (u**4 - 3 * u * u + 1) / (h * h) + return base, w1, w2 + elif kernel == "epan": + mask = np.abs(u) <= 1 + w = np.zeros_like(u, dtype=float) + w1 = np.zeros_like(u, dtype=float) + w2 = np.zeros_like(u, dtype=float) + uu = u * u + w[mask] = 0.75 * (1 - uu[mask]) / h + w1[mask] = 0.75 * (-1 + 3 * uu[mask]) / (h * h) + w2[mask] = 1.5 * (1 - 6 * uu[mask]) / (h ** 3) + return w, w1, w2 + else: + raise ValueError("Unknown kernel") + + +def loocv_mse(x: np.ndarray, y: np.ndarray, h: float, kernel: str) -> Tuple[float, float, float]: + """Return LOOCV MSE, gradient and Hessian for bandwidth ``h``.""" + n = len(x) + u = (x[:, None] - x[None, :]) / h + w, w1, w2 = _weights(u, h, kernel) + np.fill_diagonal(w, 0.0) + np.fill_diagonal(w1, 0.0) + np.fill_diagonal(w2, 0.0) + + num = w @ y + den = w.sum(axis=1) + m = num / den + + num1 = w1 @ y + den1 = w1.sum(axis=1) + m1 = (num1 * den - num * den1) / (den ** 2) + + num2 = w2 @ y + den2 = w2.sum(axis=1) + m2 = (num2 * den - num * den2) / (den ** 2) - 2 * m1 * den1 / den + + resid = y - m + loss = np.mean(resid**2) + grad = (-2.0 / n) * np.sum(resid * m1) + hess = (2.0 / n) * np.sum(m1 * m1 - resid * m2) + return loss, grad, hess + + +def newton_armijo( + x: np.ndarray, + y: np.ndarray, + h0: float, + kernel: str = "gauss", + tol: float = 1e-5, + max_iter: int = 12, +) -> Tuple[float, int]: + """Run Newton–Armijo to minimise LOOCV MSE.""" + h = float(h0) + evals = 0 + for _ in range(max_iter): + f, g, H = loocv_mse(x, y, h, kernel) + evals += 1 + if abs(g) < tol: + break + step = -g / H if (H > 0 and np.isfinite(H)) else -0.25 * g + if abs(step) / h < 1e-3: + break + for _ in range(10): + h_new = max(h + step, 1e-6) + if loocv_mse(x, y, h_new, kernel)[0] < f: + h = h_new + break + step *= 0.5 + return h, evals + + +def main() -> None: + parser = argparse.ArgumentParser(description="Analytic-Hessian NW bandwidth selection") + parser.add_argument("data", nargs="?", help="Path to data with two columns x,y") + parser.add_argument("--kernel", choices=["gauss", "epan"], default="gauss") + parser.add_argument("--h0", type=float, default=1.0, help="Initial bandwidth guess") + args = parser.parse_args() + + if args.data: + arr = np.loadtxt(args.data) + x, y = arr[:, 0], arr[:, 1] + else: + x = np.linspace(-2, 2, 200) + y = np.sin(x) + 0.1 * np.random.randn(len(x)) + h, evals = newton_armijo(x, y, args.h0, kernel=args.kernel) + print(f"Optimal h={h:.5f} after {evals} evaluations") + + +if __name__ == "__main__": + main()