Source code for zensols.dataset.dimreduce

"""Dimension reduction wrapper and utility classes.

"""
__author__ = 'Paul Landes'

from typing import Dict, List, Tuple, Union, Any
from dataclasses import dataclass, field
import logging
import numpy as np
from sklearn.preprocessing import normalize, StandardScaler
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.manifold import TSNE
from zensols.util import APIError
from zensols.config import Dictable
from zensols.persist import persisted

logger = logging.getLogger(__name__)


[docs] @dataclass class DimensionReducer(Dictable): """Reduce the dimensionality of a dataset. """ _DICTABLE_ATTRIBUTES = {'n_points'} data: np.ndarray = field(repr=False) """The data that will be dimensionally reduced.""" dim: int = field() """The lowered dimension spaace.""" reduction_meth: str = field(default='pca') """One of ``pca``, ``svd``, or ``tsne``.""" normalize: str = field(default='unit') """One of: * ``unit``: normalize to unit vectors * ``standardize``: standardize by removing the mean and scaling to unit variance * ``None``: make no modifications to the data """ model_args: Dict[str, Any] = field(default_factory=dict) """Additional kwargs to pass to the model initializer.""" def _normalize(self, data: np.ndarray) -> np.ndarray: if self.normalize == 'standarize': x = StandardScaler().fit_transform(data) elif self.normalize == 'unit': x = normalize(data) return x @persisted('_dim_reduced') def _dim_reduce(self) -> np.ndarray: model = None data = self.data if logger.isEnabledFor(logging.DEBUG): logger.debug(f'using {self.reduction_meth} ({self.dim}) ' + f'on {data.shape}') if self.normalize: if self.normalize == 'standardize': data = StandardScaler().fit_transform(data) elif self.normalize == 'unit': data = normalize(data) else: raise APIError( f'Unknown normalization method: {self.normalize}') if self.reduction_meth == 'pca': model = PCA(self.dim, **self.model_args) data = model.fit_transform(data) elif self.reduction_meth == 'svd': model = TruncatedSVD(self.dim, **self.model_args) data = model.fit_transform(data) elif self.reduction_meth == 'tsne': if data.shape[-1] > 50: data = PCA(50).fit_transform(data) params = dict(init='pca', learning_rate='auto') params.update(self.model_args) model = TSNE(self.dim, **params) data = model.fit_transform(data) else: raise APIError('Unknown dimension reduction method: ' + self.reduction_meth) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'reduced shape: {data.shape}') return data, model @property def n_points(self) -> Tuple[int]: return self.data.shape[0] @property @persisted('_reduced') def reduced(self) -> np.ndarray: return self._dim_reduce()[0] @property def model(self) -> Union[PCA, TruncatedSVD, TSNE]: return self._dim_reduce()[1] def _get_reduced_data(self, data: np.ndarray) -> np.ndarray: data: np.ndarray = self.reduced if data is None else data if data.shape[-1] != self.data.shape[-1]: X = self.model.inverse_transform(data) else: X: np.ndarray = data return X
[docs] @dataclass class DecomposeDimensionReducer(DimensionReducer): """A dimensionality reducer that uses eigenvector decomposition such as PCA or SVD. """ _DICTABLE_ATTRIBUTES = DimensionReducer._DICTABLE_ATTRIBUTES | \ {'description'} def __post_init__(self): assert self.is_decompose_method(self.reduction_meth)
[docs] @staticmethod def is_decompose_method(reduction_meth: str) -> bool: """Return whether the reduction is a decomposition method. :see: :obj:`reduction_meth` """ return reduction_meth == 'pca' or reduction_meth == 'svd'
[docs] def get_components(self, data: np.ndarray = None, one_dir: bool = True) -> Tuple[np.ndarray, np.ndarray]: """Create a start and end points that make the PCA component, which is useful for rendering lines for visualization. :param: use in place of the :obj:`data` for component calculation using the (already) trained model :param one_dir: whether or not to create components one way from the mean, or two way (forward and backward) from the mean :return: a tuple of numpy arrays, each as a start and end stacked for each component """ comps: List[np.ndarray] = [] X = self._get_reduced_data(data) # fit a covariance matrix on the data cov_matrix: np.ndarray = np.cov(X.T) # find the center from where the PCA component starts trans_mean: np.ndarray = data.mean(axis=0) # the components of the model are the eigenvectors of the covarience # matrix evecs: np.ndarray = self.model.components_ # the eigenvalues of the covariance matrix evs: np.ndarray = self.model.explained_variance_ for n_comp, (eigenvector, eigenvalue) in enumerate(zip(evecs, evs)): # map a data point as a component back to the original data space end: np.ndarray = np.dot(cov_matrix, eigenvector) / eigenvalue # map to the reduced dimensional space end = self.model.transform([end])[0] start = trans_mean if not one_dir: # make the component "double sided" start = start - end comps.append(np.stack((start, end))) return comps
@property def description(self) -> Dict[str, Any]: """A object graph of data that describes the results of the model.""" tot_ev = 0 model = self.model evs = [] for i, ev in enumerate(model.explained_variance_ratio_): evs.append(ev) tot_ev += ev noise: float = None if hasattr(model, 'noise_variance_'): noise = model.noise_variance_ return {'components': len(model.components_), 'noise': noise, 'total_variance': tot_ev, 'explained_varainces': evs}