Source code for zensols.dataset.outlier

"""A simple outlier detection class.

"""
__author__ = 'Paul Landes'

from typing import List, Union, Iterable
from dataclasses import dataclass, field
import itertools as it
import pandas as pd
import numpy as np
from sklearn.covariance import MinCovDet
from scipy import stats
from zensols.persist import persisted
from . import DatasetError


[docs] @dataclass class OutlierDetector(object): """Simple outlier detection utility that provides a few differnt methods of calculation. These include :meth:`z-score`, :meth:`mahalanobis` and :meth:`robust_mahalanobis`. This class removes either using a method specific :obj:`threshold` or by a :obj:`proportion` of the data set. """ DETECTION_METHODS = frozenset({ 'z_score', 'mahalanobis', 'robust_mahalanobis'}) data: Union[np.ndarray, pd.DataFrame] = field() """The dataframe on which to find outliers given the data. Data points are rows and the feature vectors are columns. """ default_method: str = field(default='mahalanobis') """The method used when invoking as a :class:`.Callable` with the :meth:`__call__` method. This must be one of :obj:`DETECTION_METHODS`. """ threshold: float = field(default=None) """The outlier threshold, which is method dependent. This is ignored if :obj:`proportion` is set. """ proportion: float = field(default=None) """The proportion of the dataset to use for outliers. The higher the number the more outliers. :see: :obj:`threshold` """ return_indicators: bool = field(default=None) """Whether to return a list of ``False`` (not outlier) or ``True`` (outlier) instead of indexes in to the input matrix/dataframe (:obj:`data`). """ def __post_init__(self): if self.default_method not in self.DETECTION_METHODS: raise DatasetError( f'No such detection method: {self.default_method}') @property @persisted('_numpy') def numpy(self) -> np.ndarray: """The numpy form of :obj:`data`. If :obj:`data` is a dataframe, it is converted to a numpy array. """ return self._get_arr() def _get_arr(self) -> np.ndarray: data = self.data if isinstance(data, pd.DataFrame): data = self.data.to_numpy() return data def _to_indicators(self, indicies: np.ndarray) -> np.ndarray: """Convert row indexes in to a mask usable in :meth:`numpy.where`. :param indicies: row indexes in to :obj:`numpy` """ # shape: (R, C) arr: np.ndarray = self.numpy mask: np.ndarray = np.repeat(False, arr.shape[0]) for oix in indicies: mask[oix] = True return mask def _select_indicies(self, dists: Iterable[Union[int, float]], threshold: Union[int, float]) -> np.ndarray: """Find outliers.""" if self.proportion is None: threshold = threshold if self.threshold is None else self.threshold outliers: List[int] = [] for i, v in enumerate(dists): if v > threshold: outliers.append(i) else: drs = sorted(zip(dists, it.count()), key=lambda x: x[0]) take = 1 - int(self.proportion * len(drs)) outliers = sorted(map(lambda x: x[1], drs[take:])) if self.return_indicators: outliers = self._to_indicators(outliers) return outliers
[docs] def z_score(self, column: Union[int, str]) -> np.ndarray: """Use a Z-score to detect anomolies. :param column: the column to use for the z-score analysis. :param threshold: the threshold above which a data point is considered an outlier :return: indexes in to :obj:`data` rows (indexes of a dataframe) of the outliers """ if isinstance(column, str): if not isinstance(self.data, pd.DataFrame): raise DatasetError( 'Can not index numpy arrays as string column: {column}') column = self.data.columns.get_loc(column) # shape: (R, C) arr: np.ndarray = self.numpy z = np.abs(stats.zscore(arr[:, column])) return self._select_indicies(z, 3.)
def _set_chi_threshold(self, sig: float) -> float: # shape: (R, C) arr: np.ndarray = self.numpy # degrees of freedom (df parameter) are number of variables C = np.sqrt(stats.chi2.ppf((1. - sig), df=arr.shape[1])) return C
[docs] def mahalanobis(self, significance: float = 0.001) -> np.ndarray: """Detect outliers using the Mahalanbis distince in high dimension. Assuming a multivariate normal distribution of the data with K variables, the Mahalanobis distance follows a chi-squared distribution with K degrees of freedom. For this reason, the cut-off is defined by the square root of the Chi^2 percent pointwise function. :param significance: 1 - the Chi^2 percent point function (inverse of cdf / percentiles) outlier threshold; reasonable values include 2.5%, 1%, 0.01%); if `None` use :obj:`threshold` or :obj:`proportion` :return: indexes in to :obj:`data` rows (indexes of a dataframe) of the outliers """ # shape: (R, C) arr: np.ndarray = self.numpy # M-Distance, shape: (R,) x_minus_mu: pd.DataFrame = arr - np.mean(arr, axis=0) # covariance, shape: (C, C) cov: np.ndarray = np.cov(arr.T) # inverse covariance, shape: (C, C) inv_cov: np.ndarray = np.linalg.inv(cov) # shape: (R, C) left_term: np.ndarray = np.dot(x_minus_mu, inv_cov) # shape: (R, R) dist: np.ndarray = np.dot(left_term, x_minus_mu.T) # shape (R,) md: np.ndarray = np.sqrt(dist.diagonal()) C = self._set_chi_threshold(significance) return self._select_indicies(md, C)
[docs] def robust_mahalanobis(self, significance: float = 0.001, random_state: int = 0) -> np.ndarray: """Like :meth:`mahalanobis` but use a robust mean and covarance matrix by sampling the dataset. :param significance: 1 - the Chi^2 percent point function (inverse of cdf / percentiles) outlier threshold; reasonable values include 2.5%, 1%, 0.01%); if `None` use :obj:`threshold` or :obj:`proportion` :return: indexes in to :obj:`data` rows (indexes of a dataframe) of the outliers """ arr: np.ndarray = self.numpy # minimum covariance determinant rng = np.random.RandomState(random_state) # random sample of data, shape: (R, C) X: np.ndarray = rng.multivariate_normal( mean=np.mean(arr, axis=0), cov=np.cov(arr.T), size=arr.shape[0]) # get robust estimates for the mean and covariance cov = MinCovDet(random_state=random_state).fit(X) # robust covariance metric; shape: (C, C) mcd: np.ndarray = cov.covariance_ # robust mean, shape: (C,) rmean: np.ndarray = cov.location_ # inverse covariance metric, shape: (C, C) inv_cov: np.ndarray = np.linalg.inv(mcd) # robust M-Distance, shape: (R, C) x_minus_mu: np.ndarray = arr - rmean # shape: (R, C) left_term: np.ndarray = np.dot(x_minus_mu, inv_cov) # m distance: shape: (R, R) dist: np.ndarray = np.dot(left_term, x_minus_mu.T) # distances: shape: (R,) md: np.ndarray = np.sqrt(dist.diagonal()) C = self._set_chi_threshold(significance) return self._select_indicies(md, C)
def __call__(self, *args, **kwargs) -> np.ndarray: """Return the output of the method provided by :obj:`default_method`. All (keyword) arguments are passed on to the respective method. :return: indexes in to :obj:`data` rows (indexes of a dataframe) of the outliers """ meth = getattr(self, self.default_method) return meth(*args, **kwargs)