diff --git a/cfinversion/continuous/fft_based/fft_inverter.py b/cfinversion/continuous/fft_based/fft_inverter.py new file mode 100644 index 0000000..0e753d2 --- /dev/null +++ b/cfinversion/continuous/fft_based/fft_inverter.py @@ -0,0 +1,47 @@ +from typing import Callable, Union +import numpy as np +from scipy.interpolate import interp1d + +from cfinversion.continuous import ContinuousInverter + + +class FFTInverter(ContinuousInverter): + + def __init__(self, N: float = 2 ** 8, A: float = -6, B: float = 6) -> None: + super().__init__() + self.N: int = int(N) + self.A: float = A + self.B: float = B + self.dist = B - A + self.dy = self.dist / N + self.dt = (2 * np.pi) / self.dist + self.T = (N / 2) * self.dt + self.k = np.arange(N) + self.j = np.arange(N) + self.Y = A + self.k * self.dy + self.t = -self.T + self.j * self.dt + self.phi = None #type: Callable[[np.ndarray], np.ndarray] | None + + def fit(self, phi: Callable[[np.ndarray], np.ndarray]) -> None: + """phi = characteristic function""" + self.phi = phi + + f = np.exp(-1j * self.j * self.dt * self.A) * self.phi(self.t) + C = (self.dt / (2 * np.pi)) * np.exp(1j * self.T * self.Y) + + self.pdf_values = np.real(C * np.fft.fft(f)) #type: np.ndarray + self.pdf_interp = interp1d(self.Y, self.pdf_values, kind='linear') + + self.cdf_values = np.cumsum(self.pdf_values) * self.dy #type: np.ndarray + self.cdf_interp = interp1d(self.Y, self.cdf_values, kind='linear') + + def cdf(self, x: np.ndarray) -> Union[float, np.ndarray]: + if self.phi is None: + raise ValueError("Characteristic function (phi) is not set. Call fit() first.") + return self.cdf_interp(x) + + def pdf(self, x: np.ndarray) -> Union[float, np.ndarray]: + if self.phi is None: + raise ValueError("Characteristic function (phi) is not set. Call fit() first.") + + return self.pdf_interp(x) diff --git a/tests/test_fft.py b/tests/test_fft.py new file mode 100644 index 0000000..a14b956 --- /dev/null +++ b/tests/test_fft.py @@ -0,0 +1,61 @@ +import pytest +import numpy as np +from scipy.stats import norm +from typing import Callable + +from cfinversion.continuous.fft_based.fft_inverter import FFTInverter + + +@pytest.fixture +def normal_cf() -> Callable[[np.ndarray], np.ndarray]: + """Характеристическая функция для N(0,1)""" + return lambda t: np.exp(-t ** 2 / 2) + + +@pytest.fixture +def inverter() -> FFTInverter: + """Фикстура с инициализированным инвертором""" + return FFTInverter(N=2 ** 10, A=-6, B=6) + + +def test_initialization(inverter): + """Проверка корректности инициализации параметров""" + assert inverter.N == 1024 + assert inverter.A == -6 + assert inverter.B == 6 + assert inverter.dist == 12 + assert len(inverter.Y) == 1024 + + +def test_pdf_normal_distribution(inverter, normal_cf): + """Сравнение PDF с аналитическим решением для N(0,1)""" + inverter.fit(normal_cf) + x = np.linspace(-3, 3, 100) + pdf_values = inverter.pdf(x) + exact_values = norm.pdf(x) + + assert np.allclose(pdf_values, exact_values, atol=0.1) + + +def test_cdf_normal_distribution(inverter, normal_cf): + """Сравнение CDF с аналитическим решением для N(0,1)""" + inverter.fit(normal_cf) + x = np.linspace(-3, 3, 100) + cdf_values = inverter.cdf(x) + exact_values = norm.cdf(x) + + # Проверка монотонности и примерной формы + assert np.all(np.diff(cdf_values) >= 0) # Монотонность + assert np.allclose(cdf_values[50], 0.5, atol=0.1) # Медиана ~0.5 + + +def test_edge_cases(inverter): + """Проверка обработки граничных случаев""" + # Характеристическая функция константы (вырожденное распределение) + inverter.fit(lambda t: np.ones_like(t)) + pdf = inverter.pdf(np.array([0])) + assert not np.isnan(pdf).any() + + # Проверка на нулевых частотах + inverter.fit(lambda t: np.where(t == 0, 1.0, np.sin(t) / t)) + assert not np.isnan(inverter.pdf(np.array([0]))).any() \ No newline at end of file