diff --git a/pyproject.toml b/pyproject.toml index d451797..c231db4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,12 +10,14 @@ repository = "https://github.com/PySATL/pysatl-expert" packages = [{include = "pysatl_expert"}] [tool.poetry.dependencies] -python = ">=3.10,<3.13" +python = ">=3.11,<3.13" numpy = ">=1.25.1" scipy = ">=1.11.2" pandas = ">=2.2.1" typing-extensions = ">=4.12.2" pysatl-criterion = { git = "https://github.com/PySATL/pysatl-criterion.git", branch = "main" } +tqdm = "^4.67.3" +scikit-learn = "^1.8.0" [tool.poetry.group.dev.dependencies] markdown = "3.7" diff --git a/pysatl_expert/criteria/calculate/generic.py b/pysatl_expert/criteria/calculate/generic.py index 0ea4147..a7d2e8f 100644 --- a/pysatl_expert/criteria/calculate/generic.py +++ b/pysatl_expert/criteria/calculate/generic.py @@ -1,34 +1,61 @@ +import inspect +import logging + from pysatl_expert.core.criterion import AbstractCriterion +logger = logging.getLogger(__name__) + + class GenericCriterion(AbstractCriterion): """ - Adapter class for external statistical engines (e.g., 'pysatl-criterion'). + Adapter for integrating 'pysatl-criterion' engines into the expert system. - Integrates third-party mathematical implementations into the system's - 'AbstractCriterion' interface, ensuring scalability without code duplication. + This class decouples the statistical calculation logic from the pipeline. + It performs: + 1. Parameter Normalization: Maps SciPy-style parameter names (e.g., 'shape') + to specific engine attributes (e.g., 'a', 's', 'df') using internal aliases. + 2. Dynamic Introspection: Uses Python's 'inspect' module to determine if the + target statistic requires a Cumulative Distribution Function (CDF). This + ensures lazy evaluation, calculating the CDF only when necessary. Attributes: - engine: Underlying statistic instance (KS, AD, etc.) from the external library. - name: Criterion identifier resolved from the engine or custom display name. + PARAM_ALIASES (dict): A map used to resolve naming discrepancies between + distribution fitting results and GoF test requirements. """ - def __init__(self, statistic_instance, display_name: str | None = None): - """ - Wraps a concrete statistical engine. + PARAM_ALIASES = { + "shape": ["a", "s", "c", "k", "df"], + "lambda": ["lam"], + "mu": ["loc", "mean"], + "std": ["scale", "sigma"], + } - Args: - statistic_instance: Low-level engine implementing 'execute_statistic()'. - display_name: Optional override for the criterion's name. - """ + def __init__(self, statistic_instance, display_name: str | None = None): name = display_name or statistic_instance.code() super().__init__(name=name) self.engine = statistic_instance def calculate(self, data, dist, params): - """ - Computes the fit score by delegating math to the wrapped engine. - Uses the candidate distribution's CDF as the theoretical basis. - """ - cdf_vals = dist.cdf(data, params) - return self.engine.execute_statistic(rvs=data, cdf_vals=cdf_vals) + for p_name, p_value in params.items(): + potential_targets = [p_name] + self.PARAM_ALIASES.get(p_name, []) + for target in potential_targets: + if hasattr(self.engine, target): + setattr(self.engine, target, p_value) + break + + sig = inspect.signature(self.engine.execute_statistic) + params_in_method = sig.parameters + + needs_cdf = "cdf_vals" in params_in_method + has_kwargs = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params_in_method.values()) + + try: + if needs_cdf or has_kwargs: + cdf_vals = dist.cdf(data, params) + return self.engine.execute_statistic(rvs=data, cdf_vals=cdf_vals) + else: + return self.engine.execute_statistic(rvs=data) + except Exception as e: + logger.debug(f"Error execute {self.name}: {e}") + raise e diff --git a/pysatl_expert/criteria/selectors/dynamic_selector.py b/pysatl_expert/criteria/selectors/dynamic_selector.py new file mode 100644 index 0000000..599a66e --- /dev/null +++ b/pysatl_expert/criteria/selectors/dynamic_selector.py @@ -0,0 +1,71 @@ +import inspect +import logging + +from pysatl_criterion.util.distribution import DistributionType +from pysatl_criterion.util.statistic import get_available_criteria + +from pysatl_expert.core.criterion_selector import AbstractCriterionSelector +from pysatl_expert.criteria.calculate.generic import GenericCriterion + + +logger = logging.getLogger(__name__) + + +class DynamicCriterionSelector(AbstractCriterionSelector): + """ + Selector for automated statistical test discovery. + + Dynamically scans the 'pysatl-criterion' library to identify all applicable + Goodness-of-Fit tests for a given distribution. + + Features: + - Runtime Safety: Utilizes a 'blacklist' to skip computationally expensive + tests that might cause system timeouts. + """ + + def __init__(self): + super().__init__() + self._criteria_cache = {} + self.BLACKLIST = ["bhs", "kl_int", "kl_sup", "cq*", "rs", "ahs", "hp"] + + def get_applicable_criteria(self, data, distribution) -> list: + dist_name = distribution.name.lower() + + if dist_name in self._criteria_cache: + return self._criteria_cache[dist_name] + + criteria_list = [] + try: + dist_type = DistributionType(dist_name) + except ValueError: + logger.warning(f"'{distribution.name}' distribution not found in DistributionType.") + return [] + + available_short_codes = get_available_criteria(dist_type) + base_class = dist_type.base_class + + def get_all_concrete_subclasses(cls): + subclasses = set() + for subclass in cls.__subclasses__(): + if not inspect.isabstract(subclass) and not subclass.__name__.startswith("Abstract"): + subclasses.add(subclass) + subclasses.update(get_all_concrete_subclasses(subclass)) + return subclasses + + for stat_class in get_all_concrete_subclasses(base_class): + try: + if hasattr(stat_class, 'short_code') and stat_class.short_code() in available_short_codes: + criterion_name = stat_class.short_code().lower() + + if criterion_name in self.BLACKLIST: + continue + + instance = stat_class() + criterion = GenericCriterion(instance, display_name=criterion_name) + criteria_list.append(criterion) + available_short_codes.remove(stat_class.short_code()) + except Exception as e: + logger.debug(f"Initial error {stat_class.__name__}: {e}") + + self._criteria_cache[dist_name] = criteria_list + return criteria_list diff --git a/pysatl_expert/distributions/beta.py b/pysatl_expert/distributions/beta.py new file mode 100644 index 0000000..0036855 --- /dev/null +++ b/pysatl_expert/distributions/beta.py @@ -0,0 +1,30 @@ +import numpy as np +import scipy.stats as st + +from pysatl_expert.core.distribution import AbstractDistribution + + +class BetaDistribution(AbstractDistribution): + """ + Two-parameter implementation of the Beta probability distribution. + + Defined by two positive shape parameters (alpha, beta). Features strictly + bounded theoretical support of[0, 1], making it ideal for modeling + proportions, probabilities, or percentages. The pipeline will automatically + reject any data sample containing values outside this range. + + Mapping to SciPy: 'alpha' maps to 'a', 'beta' maps to 'b', with + location fixed to 0 and scale fixed to 1. + """ + def __init__(self): + super().__init__(name="Beta", support=(0, 1)) + + def fit(self, data: np.ndarray) -> dict: + a, b, loc, scale = st.beta.fit(data, floc=0, fscale=1) + return {"alpha": a, "beta": b} + + def pdf(self, data: np.ndarray, params: dict) -> np.ndarray: + return st.beta.pdf(data, a=params["alpha"], b=params["beta"]) + + def cdf(self, data: np.ndarray, params: dict) -> np.ndarray: + return st.beta.cdf(data, a=params["alpha"], b=params["beta"]) diff --git a/pysatl_expert/distributions/gamma.py b/pysatl_expert/distributions/gamma.py new file mode 100644 index 0000000..944a687 --- /dev/null +++ b/pysatl_expert/distributions/gamma.py @@ -0,0 +1,29 @@ +import numpy as np +import scipy.stats as st + +from pysatl_expert.core.distribution import AbstractDistribution + + +class GammaDistribution(AbstractDistribution): + """ + Two-parameter implementation of the Gamma probability distribution. + + Defined by a shape parameter (a) and a scale parameter. Features[0, inf) + support, which allows for early-fail validation of samples containing + negative values. Frequently used to model waiting times or positively + skewed continuous variables. + + Mapping to SciPy: 'shape' maps to 'a', location is fixed to zero (floc=0). + """ + def __init__(self): + super().__init__(name="Gamma", support=(0, np.inf)) + + def fit(self, data: np.ndarray) -> dict: + shape, loc, scale = st.gamma.fit(data, floc=0) + return {"shape": shape, "scale": scale} + + def pdf(self, data: np.ndarray, params: dict) -> np.ndarray: + return st.gamma.pdf(data, a=params["shape"], scale=params["scale"]) + + def cdf(self, data: np.ndarray, params: dict) -> np.ndarray: + return st.gamma.cdf(data, a=params["shape"], scale=params["scale"]) diff --git a/pysatl_expert/distributions/log_normal.py b/pysatl_expert/distributions/log_normal.py new file mode 100644 index 0000000..f85de47 --- /dev/null +++ b/pysatl_expert/distributions/log_normal.py @@ -0,0 +1,29 @@ +import numpy as np +import scipy.stats as st + +from pysatl_expert.core.distribution import AbstractDistribution + + +class LogNormalDistribution(AbstractDistribution): + """ + Two-parameter implementation of the Log-Normal probability distribution. + + Defined by a shape parameter (s) and scale. A variable X is log-normally + distributed if its natural logarithm is normally distributed. + Features strictly positive theoretical support (0, inf). + + Mapping to SciPy: 's' maps to shape, 'scale' is exp(mean) with + location fixed to zero. + """ + def __init__(self): + super().__init__(name="LogNormal", support=(0, np.inf)) + + def fit(self, data: np.ndarray) -> dict: + shape, loc, scale = st.lognorm.fit(data, floc=0) + return {"s": shape, "scale": scale} + + def pdf(self, data: np.ndarray, params: dict) -> np.ndarray: + return st.lognorm.pdf(data, s=params["s"], scale=params["scale"]) + + def cdf(self, data: np.ndarray, params: dict) -> np.ndarray: + return st.lognorm.cdf(data, s=params["s"], scale=params["scale"]) diff --git a/pysatl_expert/distributions/student.py b/pysatl_expert/distributions/student.py new file mode 100644 index 0000000..12ee0cc --- /dev/null +++ b/pysatl_expert/distributions/student.py @@ -0,0 +1,28 @@ +import numpy as np +import scipy.stats as st + +from pysatl_expert.core.distribution import AbstractDistribution + + +class StudentDistribution(AbstractDistribution): + """ + Three-parameter implementation of the Student's t-distribution. + + Defined by degrees of freedom (df), location (loc), and scale. + Features universal theoretical support (-inf, inf). It is particularly + useful for modeling data with 'heavy tails' compared to the Normal distribution. + + Mapping to SciPy: 'df', 'loc', and 'scale' are fitted dynamically. + """ + def __init__(self): + super().__init__(name="Student", support=(-np.inf, np.inf)) + + def fit(self, data: np.ndarray) -> dict: + df, loc, scale = st.t.fit(data) + return {"df": df, "loc": loc, "scale": scale} + + def pdf(self, data: np.ndarray, params: dict) -> np.ndarray: + return st.t.pdf(data, df=params["df"], loc=params["loc"], scale=params["scale"]) + + def cdf(self, data: np.ndarray, params: dict) -> np.ndarray: + return st.t.cdf(data, df=params["df"], loc=params["loc"], scale=params["scale"]) diff --git a/pysatl_expert/distributions/uniform.py b/pysatl_expert/distributions/uniform.py new file mode 100644 index 0000000..7d1824a --- /dev/null +++ b/pysatl_expert/distributions/uniform.py @@ -0,0 +1,28 @@ +import numpy as np +import scipy.stats as st + +from pysatl_expert.core.distribution import AbstractDistribution + + +class UniformDistribution(AbstractDistribution): + """ + Two-parameter implementation of the Continuous Uniform distribution. + + Defined by boundary parameters 'a' (minimum) and 'b' (maximum). + While its theoretical support is (-inf, inf) for the purpose of fitting, + its actual probability mass is strictly constrained within [a, b]. + + Mapping to SciPy: 'a' maps to 'loc', 'b' is derived as 'loc + scale'. + """ + def __init__(self): + super().__init__(name="Uniform", support=(-np.inf, np.inf)) + + def fit(self, data: np.ndarray) -> dict: + loc, scale = st.uniform.fit(data) + return {"a": float(np.min(data)) - 1e-9, "b": float(np.max(data)) + 1e-9} + + def pdf(self, data: np.ndarray, params: dict) -> np.ndarray: + return st.uniform.pdf(data, loc=params["a"], scale=params["b"] - params["a"]) + + def cdf(self, data: np.ndarray, params: dict) -> np.ndarray: + return st.uniform.cdf(data, loc=params["a"], scale=params["b"] - params["a"]) diff --git a/pysatl_expert/models/feature_vector.py b/pysatl_expert/models/feature_vector.py index ceba18f..ee806ea 100644 --- a/pysatl_expert/models/feature_vector.py +++ b/pysatl_expert/models/feature_vector.py @@ -1,74 +1,62 @@ +from pysatl_criterion.util.distribution import DistributionType +from pysatl_criterion.util.statistic import get_available_criteria + + class FeatureVector: """ - Data Transfer Object (DTO) that standardizes the feature space for decision strategies. + Data Transfer Object defining the feature space for ML classifiers. + + Aggregates disparate statistical evidence into a high-dimensional, + fixed-length numerical array. - This class aggregates disparate data points—intrinsic sample statistics and - multi-distribution goodness-of-fit scores—into a unified structure. Its primary - purpose is to provide a consistent numerical representation of the statistical - evidence, suitable for both heuristic analysis and machine learning inference. + The vector is composed of: + 1. Sample Statistics: Fundamental shape and complexity metrics (skew, entropy). + 2. GoF Scores: Results from a dynamic array of criteria defined by the + global CRITERIA_SCHEMA. - Attributes: - STAT_KEYS (list): Standardized set of sample-profiling keys. - CRITERIA_KEYS (list): Predefined sequence of statistical criteria to - maintain a fixed-length feature vector. + If a test is mathematically inapplicable or fails, its position in the + vector is preserved and filled with a 'missing_value' (-1.0), serving + as a categorical indicator for the decision tree nodes. """ STAT_KEYS = ["sample_size", "skew", "kurtosis", "coef_of_variation", "relative_iqr", "entropy"] - CRITERIA_KEYS = [ - "shapiro_wilk", - "anderson_darling", - "ks_test", - "jarque_bera", - "lilliefors", - "cramer_von_mises", - "gini_index", - "moran_test", - "ahs_test", - "msf_test", - "tiku_singh", - ] - def __init__(self, sample_stats: dict, candidates_scores: dict): - """ - Initializes the vector with filtered sample stats and candidate scores. - - Args: - sample_stats (dict): Metadata describing the raw data sample. - candidates_scores (dict): Nested mapping of distribution names to - their respective criterion scores. - """ - self.sample_stats = {k: v for k, v in sample_stats.items() if k in self.STAT_KEYS} - self.candidates_scores = candidates_scores + CRITERIA_SCHEMA = [] + BLACKLIST = ["bhs", "kl_int", "kl_sup", "cq*", "rs", "ahs", "hp"] - def as_flat_list(self) -> list[float]: - """ - Transforms structured statistical data into a flattened numerical array. + for dist in DistributionType: + dist_name = dist.value.lower() + available_tests = get_available_criteria(dist) - This method ensures a deterministic order of features, which is critical for - predictive models. It iterates through fixed keys and sorted distribution - names to produce a stable input vector for ML classifiers. + for crit_code in available_tests: + clean_code = crit_code.lower() + if clean_code not in BLACKLIST: + CRITERIA_SCHEMA.append((dist_name, clean_code)) - Returns: - list[float]: A flat list of features representing the entire state - of the identification experiment. - """ + CRITERIA_SCHEMA = sorted(CRITERIA_SCHEMA) + + def __init__(self, sample_stats: dict, candidates_scores: dict): + self.sample_stats = {k: v for k, v in sample_stats.items() if k in self.STAT_KEYS} + self.candidates_scores = { + k.lower(): {ck.lower(): cv for ck, cv in v.items()} + for k, v in candidates_scores.items() + } + + def as_flat_list(self, missing_value: float = -1.0) -> list[float]: flat_vector = [] for key in self.STAT_KEYS: - flat_vector.append(self.sample_stats.get(key, 0.0)) - - sorted_dist_names = sorted(self.candidates_scores.keys()) - for dist_name in sorted_dist_names: - dist_scores = self.candidates_scores[dist_name] + val = self.sample_stats.get(key, missing_value) + flat_vector.append(float(val)) - for crit_key in self.CRITERIA_KEYS: - val = dist_scores.get(crit_key, 0.0) + for dist_name, crit_key in self.CRITERIA_SCHEMA: + if dist_name in self.candidates_scores: + val = self.candidates_scores[dist_name].get(crit_key, missing_value) flat_vector.append(float(val)) + else: + flat_vector.append(float(missing_value)) return flat_vector def as_dict(self) -> dict: - """ - Returns a dictionary representation for logging or reporting purposes. - """ return {"stats": self.sample_stats, "scores": self.candidates_scores}