Source code for chemotools.outliers._base

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional

import numpy as np
from sklearn.base import BaseEstimator, OutlierMixin
from sklearn.cross_decomposition._pls import _PLS
from sklearn.decomposition._base import _BasePCA
from sklearn.pipeline import Pipeline
from sklearn.utils._param_validation import Interval, Real
from sklearn.utils.validation import check_is_fitted, validate_data

if TYPE_CHECKING:
    from typing_extensions import Self

from chemotools._types import EstimatorType, ModelInput
from chemotools._validation import get_model_parameters, validate_and_extract_model

# Backward-compatible alias – existing consumers import this name.
ModelTypes = EstimatorType


class _ModelResidualsBase(ABC, BaseEstimator, OutlierMixin):
    """Base class for model outlier calculations.

    Implements statistical calculations for outlier detection in dimensionality
    reduction models like PCA and PLS.

    Parameters
    ----------
    model : Union[ModelTypes, Pipeline]
        A fitted _BasePCA or _PLS models or Pipeline ending with such a model
    confidence : float
        Confidence level for statistical calculations (between 0 and 1)

    Attributes
    ----------
    estimator_ : ModelTypes
        The fitted model of type _BasePCA or _PLS

    transformer_ : Optional[Pipeline]
        Preprocessing steps before the model

    n_features_in_ : int
        Number of features in the input data

    n_components_ : int
        Number of components in the model

    n_samples_ : int
        Number of samples used to train the model

    critical_value_ : float
        The calculated critical value for outlier detection
    """

    _parameter_constraints: dict = {
        "model": [Pipeline, _BasePCA, _PLS],
        "confidence": [Interval(Real, 0, 1, closed="neither")],
    }

    critical_value_: float

    def __init__(
        self,
        model: ModelInput,
        confidence: float,
    ) -> None:
        self.model = model
        self.confidence = confidence

    def fit(self, X: np.ndarray, y: Optional[np.ndarray] = None) -> Self:
        """Fit the model to the input data.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Input data.

        y : array-like of shape (n_samples,), default=None
            Target values.

        Returns
        -------
        self
            Fitted estimator.
        """
        self._validate_params()

        # Validate and extract the model and its parameters
        self.estimator_, self.transformer_ = validate_and_extract_model(self.model)
        self.n_features_in_, self.n_components_, self.n_samples_ = get_model_parameters(
            self.estimator_
        )
        self.confidence_ = self.confidence

        # Validate input data
        X = validate_data(
            self, X, y="no_validation", ensure_2d=True, reset=True, dtype=np.float64
        )

        # Transform through preprocessing pipeline if available
        X_transformed = self.transformer_.transform(X) if self.transformer_ else X

        # Subclass-specific fitting
        self._fit_residuals(X_transformed, y)

        return self

    def predict(self, X: np.ndarray, y: Optional[np.ndarray] = None) -> np.ndarray:
        """Identify outliers in the input data.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Input data.

        y : array-like of shape (n_samples,), default=None
            Target values. Required by some subclasses (e.g. StudentizedResiduals).

        Returns
        -------
        ndarray of shape (n_samples,)
            Returns -1 for outliers and 1 for inliers.
        """
        residuals = self.predict_residuals(X, y)
        return np.where(residuals > self.critical_value_, -1, 1)

    def predict_residuals(
        self, X: np.ndarray, y: Optional[np.ndarray] = None, validate: bool = True
    ) -> np.ndarray:
        """Calculate the residuals of the model.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Input data.

        y : array-like of shape (n_samples,), default=None
            Target values. Required by some subclasses (e.g. StudentizedResiduals).

        validate : bool, default=True
            Whether to validate the input data.

        Returns
        -------
        ndarray of shape (n_samples,)
            The residuals of the model.
        """
        check_is_fitted(self, ["critical_value_"])
        if validate:
            X = validate_data(
                self,
                X,
                y="no_validation",
                ensure_2d=True,
                reset=False,
                dtype=np.float64,
            )
        if self.transformer_:
            X = self.transformer_.transform(X)
        return self._compute_residuals(X, y)

    def score_samples(self, X: np.ndarray) -> np.ndarray:
        """Return anomaly scores for each sample.

        Provides an sklearn-compatible interface. Returns the residuals
        computed by ``predict_residuals``. Higher values indicate
        more anomalous samples.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Input data.

        Returns
        -------
        ndarray of shape (n_samples,)
            Anomaly scores for each sample.
        """
        return self.predict_residuals(X)

    def fit_predict_residuals(
        self, X: np.ndarray, y: Optional[np.ndarray] = None
    ) -> np.ndarray:
        """Fit the model to the input data and calculate the residuals.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Input data.

        y : array-like of shape (n_samples,), default=None
            Target values.

        Returns
        -------
        ndarray of shape (n_samples,)
            The residuals of the model.
        """
        self.fit(X, y)
        return self.predict_residuals(X, y)

    @abstractmethod
    def _fit_residuals(self, X: np.ndarray, y: Optional[np.ndarray]) -> None:
        """Compute subclass-specific training state and set ``critical_value_``.

        Called at the end of ``fit`` with validated and transformed data.

        Parameters
        ----------
        X : ndarray of shape (n_samples, n_features)
            Validated and transformed training data.

        y : ndarray or None
            Target values.
        """

    @abstractmethod
    def _compute_residuals(self, X: np.ndarray, y: Optional[np.ndarray]) -> np.ndarray:
        """Compute residuals from validated, transformed data.

        Called by ``predict_residuals`` after validation and transformation.

        Parameters
        ----------
        X : ndarray of shape (n_samples, n_features)
            Validated and transformed input data.

        y : ndarray or None
            Target values.

        Returns
        -------
        ndarray of shape (n_samples,)
            Computed residuals.
        """