"""Dimension reduction wrapper and utility classes."""__author__='Paul Landes'fromtypingimportDict,List,Tuple,Union,Anyfromdataclassesimportdataclass,fieldimportloggingimportnumpyasnpfromsklearn.preprocessingimportnormalize,StandardScalerfromsklearn.decompositionimportPCA,TruncatedSVDfromsklearn.manifoldimportTSNEfromzensols.utilimportAPIErrorfromzensols.configimportDictablefromzensols.persistimportpersistedlogger=logging.getLogger(__name__)
[docs]@dataclassclassDimensionReducer(Dictable):"""Reduce the dimensionality of a dataset. """_DICTABLE_ATTRIBUTES={'n_points'}data:np.ndarray=field(repr=False)"""The data that will be dimensionally reduced."""dim:int=field()"""The lowered dimension spaace."""reduction_meth:str=field(default='pca')"""One of ``pca``, ``svd``, or ``tsne``."""normalize:str=field(default='unit')"""One of: * ``unit``: normalize to unit vectors * ``standardize``: standardize by removing the mean and scaling to unit variance * ``None``: make no modifications to the data """model_args:Dict[str,Any]=field(default_factory=dict)"""Additional kwargs to pass to the model initializer."""def_normalize(self,data:np.ndarray)->np.ndarray:ifself.normalize=='standarize':x=StandardScaler().fit_transform(data)elifself.normalize=='unit':x=normalize(data)returnx@persisted('_dim_reduced')def_dim_reduce(self)->np.ndarray:model=Nonedata=self.dataiflogger.isEnabledFor(logging.DEBUG):logger.debug(f'using {self.reduction_meth} ({self.dim}) '+f'on {data.shape}')ifself.normalize:ifself.normalize=='standardize':data=StandardScaler().fit_transform(data)elifself.normalize=='unit':data=normalize(data)else:raiseAPIError(f'Unknown normalization method: {self.normalize}')ifself.reduction_meth=='pca':model=PCA(self.dim,**self.model_args)data=model.fit_transform(data)elifself.reduction_meth=='svd':model=TruncatedSVD(self.dim,**self.model_args)data=model.fit_transform(data)elifself.reduction_meth=='tsne':ifdata.shape[-1]>50:data=PCA(50).fit_transform(data)params=dict(init='pca',learning_rate='auto')params.update(self.model_args)model=TSNE(self.dim,**params)data=model.fit_transform(data)else:raiseAPIError('Unknown dimension reduction method: '+self.reduction_meth)iflogger.isEnabledFor(logging.DEBUG):logger.debug(f'reduced shape: {data.shape}')returndata,model@propertydefn_points(self)->Tuple[int]:returnself.data.shape[0]@property@persisted('_reduced')defreduced(self)->np.ndarray:returnself._dim_reduce()[0]@propertydefmodel(self)->Union[PCA,TruncatedSVD,TSNE]:returnself._dim_reduce()[1]def_get_reduced_data(self,data:np.ndarray)->np.ndarray:data:np.ndarray=self.reducedifdataisNoneelsedataifdata.shape[-1]!=self.data.shape[-1]:X=self.model.inverse_transform(data)else:X:np.ndarray=datareturnX
[docs]@dataclassclassDecomposeDimensionReducer(DimensionReducer):"""A dimensionality reducer that uses eigenvector decomposition such as PCA or SVD. """_DICTABLE_ATTRIBUTES=DimensionReducer._DICTABLE_ATTRIBUTES| \
{'description'}def__post_init__(self):assertself.is_decompose_method(self.reduction_meth)
[docs]@staticmethoddefis_decompose_method(reduction_meth:str)->bool:"""Return whether the reduction is a decomposition method. :see: :obj:`reduction_meth` """returnreduction_meth=='pca'orreduction_meth=='svd'
[docs]defget_components(self,data:np.ndarray=None,one_dir:bool=True)->Tuple[np.ndarray,np.ndarray]:"""Create a start and end points that make the PCA component, which is useful for rendering lines for visualization. :param: use in place of the :obj:`data` for component calculation using the (already) trained model :param one_dir: whether or not to create components one way from the mean, or two way (forward and backward) from the mean :return: a tuple of numpy arrays, each as a start and end stacked for each component """comps:List[np.ndarray]=[]X=self._get_reduced_data(data)# fit a covariance matrix on the datacov_matrix:np.ndarray=np.cov(X.T)# find the center from where the PCA component startstrans_mean:np.ndarray=data.mean(axis=0)# the components of the model are the eigenvectors of the covarience# matrixevecs:np.ndarray=self.model.components_# the eigenvalues of the covariance matrixevs:np.ndarray=self.model.explained_variance_forn_comp,(eigenvector,eigenvalue)inenumerate(zip(evecs,evs)):# map a data point as a component back to the original data spaceend:np.ndarray=np.dot(cov_matrix,eigenvector)/eigenvalue# map to the reduced dimensional spaceend=self.model.transform([end])[0]start=trans_meanifnotone_dir:# make the component "double sided"start=start-endcomps.append(np.stack((start,end)))returncomps
@propertydefdescription(self)->Dict[str,Any]:"""A object graph of data that describes the results of the model."""tot_ev=0model=self.modelevs=[]fori,evinenumerate(model.explained_variance_ratio_):evs.append(ev)tot_ev+=evnoise:float=Noneifhasattr(model,'noise_variance_'):noise=model.noise_variance_return{'components':len(model.components_),'noise':noise,'total_variance':tot_ev,'explained_varainces':evs}