Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions cfinversion/continuous/fft_based/fft_inverter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Callable, Union
import numpy as np
from scipy.interpolate import interp1d

from cfinversion.continuous import ContinuousInverter


class FFTInverter(ContinuousInverter):

def __init__(self, N: float = 2 ** 8, A: float = -6, B: float = 6) -> None:
super().__init__()
self.N: int = int(N)
self.A: float = A
self.B: float = B
self.dist = B - A
self.dy = self.dist / N
self.dt = (2 * np.pi) / self.dist
self.T = (N / 2) * self.dt
self.k = np.arange(N)
self.j = np.arange(N)
self.Y = A + self.k * self.dy
self.t = -self.T + self.j * self.dt
self.phi = None #type: Callable[[np.ndarray], np.ndarray] | None

def fit(self, phi: Callable[[np.ndarray], np.ndarray]) -> None:
"""phi = characteristic function"""
self.phi = phi

f = np.exp(-1j * self.j * self.dt * self.A) * self.phi(self.t)
C = (self.dt / (2 * np.pi)) * np.exp(1j * self.T * self.Y)

self.pdf_values = np.real(C * np.fft.fft(f)) #type: np.ndarray
self.pdf_interp = interp1d(self.Y, self.pdf_values, kind='linear')

self.cdf_values = np.cumsum(self.pdf_values) * self.dy #type: np.ndarray
self.cdf_interp = interp1d(self.Y, self.cdf_values, kind='linear')

def cdf(self, x: np.ndarray) -> Union[float, np.ndarray]:
if self.phi is None:
raise ValueError("Characteristic function (phi) is not set. Call fit() first.")
return self.cdf_interp(x)

def pdf(self, x: np.ndarray) -> Union[float, np.ndarray]:
if self.phi is None:
raise ValueError("Characteristic function (phi) is not set. Call fit() first.")

return self.pdf_interp(x)
61 changes: 61 additions & 0 deletions tests/test_fft.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import pytest
import numpy as np
from scipy.stats import norm
from typing import Callable

from cfinversion.continuous.fft_based.fft_inverter import FFTInverter


@pytest.fixture
def normal_cf() -> Callable[[np.ndarray], np.ndarray]:
"""Характеристическая функция для N(0,1)"""
return lambda t: np.exp(-t ** 2 / 2)


@pytest.fixture
def inverter() -> FFTInverter:
"""Фикстура с инициализированным инвертором"""
return FFTInverter(N=2 ** 10, A=-6, B=6)


def test_initialization(inverter):
"""Проверка корректности инициализации параметров"""
assert inverter.N == 1024
assert inverter.A == -6
assert inverter.B == 6
assert inverter.dist == 12
assert len(inverter.Y) == 1024


def test_pdf_normal_distribution(inverter, normal_cf):
"""Сравнение PDF с аналитическим решением для N(0,1)"""
inverter.fit(normal_cf)
x = np.linspace(-3, 3, 100)
pdf_values = inverter.pdf(x)
exact_values = norm.pdf(x)

assert np.allclose(pdf_values, exact_values, atol=0.1)


def test_cdf_normal_distribution(inverter, normal_cf):
"""Сравнение CDF с аналитическим решением для N(0,1)"""
inverter.fit(normal_cf)
x = np.linspace(-3, 3, 100)
cdf_values = inverter.cdf(x)
exact_values = norm.cdf(x)

# Проверка монотонности и примерной формы
assert np.all(np.diff(cdf_values) >= 0) # Монотонность
assert np.allclose(cdf_values[50], 0.5, atol=0.1) # Медиана ~0.5


def test_edge_cases(inverter):
"""Проверка обработки граничных случаев"""
# Характеристическая функция константы (вырожденное распределение)
inverter.fit(lambda t: np.ones_like(t))
pdf = inverter.pdf(np.array([0]))
assert not np.isnan(pdf).any()

# Проверка на нулевых частотах
inverter.fit(lambda t: np.where(t == 0, 1.0, np.sin(t) / t))
assert not np.isnan(inverter.pdf(np.array([0]))).any()