Source code for zensols.deepnlp.vectorize.vectorizers

"""Generate and vectorize language features.

"""
__author__ = 'Paul Landes'

from typing import List, Tuple, Set, Union, Dict, Iterable
from dataclasses import dataclass, field
import logging
import sys
from functools import reduce
import torch
import numpy as np
from torch import Tensor
from zensols.deeplearn.vectorize import (
    VectorizerError,
    FeatureContext,
    TensorFeatureContext,
    SparseTensorFeatureContext,
    MultiFeatureContext,
    EncodableFeatureVectorizer,
    OneHotEncodedEncodableFeatureVectorizer,
    AggregateEncodableFeatureVectorizer,
    TransformableFeatureVectorizer,
)
from zensols.nlp import (
    FeatureToken, FeatureSentence, FeatureDocument, TokenContainer,
)
from ..embed import WordEmbedModel
from . import (
    SpacyFeatureVectorizer, FeatureDocumentVectorizer,
    TextFeatureType, MultiDocumentVectorizer,
)

logger = logging.getLogger(__name__)


[docs] @dataclass class EnumContainerFeatureVectorizer(FeatureDocumentVectorizer): """Encode tokens found in the container by aggregating the spaCy vectorizers output. The result is a concatenated binary representation of all configured token level features for each token. This adds only token vectorizer features generated by the spaCy vectorizers (subclasses of :class:`.SpacyFeatureVectorizer`), and not the features themselves (such as ``is_stop`` etc). All spaCy features are encoded given by :obj:`~.FeatureDocumentVectorizerManager.spacy_vectorizers`. However, only those given in :obj:`decoded_feature_ids` are produced in the output tensor after decoding. The motivation for encoding all, but decoding a subset of features is for feature selection during training. This is because encoding the features (in a sparse matrix) takes comparatively less time and space over having to re-encode all batches. Rows are tokens, columns intervals of features. The encoded matrix is sparse, and decoded as a dense matrix. :shape: (|sentences|, |sentinel tokens|, |decoded features|) :see: :class:`.SpacyFeatureVectorizer` """ ATTR_EXP_META = ('decoded_feature_ids',) DESCRIPTION = 'spacy feature vectorizer' FEATURE_TYPE = TextFeatureType.TOKEN decoded_feature_ids: Set[str] = field(default=None) """The spaCy generated features used during *only* decoding (see class docs). Examples include ``norm``, ``ent``, ``dep``, ``tag``. When set to ``None``, use all those given in the :obj:`~.FeatureDocumentVectorizerManager.spacy_vectorizers`. """ def _get_shape_with_feature_ids(self, feature_ids: Set[str]): """Compute the shape based on what spacy feature ids are given. :param feature_ids: the spacy feature ids used to filter the result """ flen = 0 for fvec in self.manager.spacy_vectorizers.values(): if feature_ids is None or fvec.feature_id in feature_ids: flen += fvec.shape[1] return -1, self.token_length, flen def _get_shape_decode(self) -> Tuple[int, int]: """Return the shape needed for the tensor when encoding.""" return self._get_shape_with_feature_ids(None) def _get_shape_for_document(self, doc: FeatureDocument): """Return the shape of the vectorized output for the given document.""" return (len(doc.sents), self.manager.get_token_length(doc), self._get_shape_decode()[-1]) def _get_shape(self) -> Tuple[int, int]: """Compute the shape based on what spacy feature ids are given.""" return self._get_shape_with_feature_ids(self.decoded_feature_ids) def _populate_feature_vectors(self, sent: FeatureSentence, six: int, fvec: SpacyFeatureVectorizer, arr: Tensor, col_start: int, col_end: int): """Populate ``arr`` with every feature available from the vectorizer set defined in the manager. This fills in the corresponding vectors from the spacy vectorizer ``fvec`` across all tokens for a column range. """ attr_name = fvec.feature_id col_end = col_start + fvec.shape[1] toks = sent.tokens[:arr.shape[1]] for tix, tok in enumerate(toks): val = getattr(tok, attr_name) vec = fvec.from_spacy(val) if vec is not None: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'adding vec {fvec} for {tok}: {vec.shape}') arr[six, tix, col_start:col_end] = vec def _encode(self, doc: FeatureDocument) -> FeatureContext: """Encode tokens found in the container by aggregating the spaCy vectorizers output. """ arr = self.torch_config.zeros(self._get_shape_for_document(doc)) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'type array shape: {arr.shape}') sent: FeatureSentence for six, sent in enumerate(doc.sents): col_start = 0 for fvec in self.manager.spacy_vectorizers.values(): col_end = col_start + fvec.shape[1] self._populate_feature_vectors( sent, six, fvec, arr, col_start, col_end) col_start = col_end if logger.isEnabledFor(logging.DEBUG): logger.debug(f'encoded array shape: {arr.shape}') return SparseTensorFeatureContext.instance( self.feature_id, arr, self.torch_config) def _slice_by_attributes(self, arr: Tensor) -> Tensor: """Create a new tensor from column based slices of the encoded tensor for each specified feature id given in :obj:`decoded_feature_ids`. """ keeps = set(self.decoded_feature_ids) col_start = 0 tensors = [] for fvec in self.manager.spacy_vectorizers.values(): col_end = col_start + fvec.shape[1] fid = fvec.feature_id if logger.isEnabledFor(logging.DEBUG): logger.debug(f'type={fid}, to keep={keeps}') if fid in keeps: tensors.append(arr[:, :, col_start:col_end]) keeps.remove(fid) col_start = col_end if len(keeps) > 0: raise VectorizerError(f'Unknown feature type IDs: {keeps}') sarr = torch.cat(tensors, dim=2) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'slice dim: {sarr.shape}') return sarr
[docs] def to_symbols(self, tensor: Tensor) -> List[List[Dict[str, float]]]: """Reverse map the tensor to spaCy features. :return: a list of sentences, each with a list of tokens, each having a map of name/count pairs """ sents = [] for six in range(tensor.size(0)): toks = [] sents.append(toks) for tix in range(tensor.size(1)): col_start = 0 by_fid = {} toks.append(by_fid) for fvec in self.manager.spacy_vectorizers.values(): col_end = col_start + fvec.shape[1] fid = fvec.feature_id vec = tensor[six, tix, col_start:col_end] cnts = dict(filter(lambda x: x[1] > 0, zip(fvec.as_list, vec.tolist()))) by_fid[fid] = cnts col_start = col_end return sents
def _decode(self, context: FeatureContext) -> Tensor: arr = super()._decode(context) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'decoded features: {self.decoded_feature_ids}, ' + f'shape: {arr.shape}') self._assert_decoded_doc_dim(arr, 3) if self.decoded_feature_ids is not None: arr = self._slice_by_attributes(arr) return arr
[docs] @dataclass class CountEnumContainerFeatureVectorizer(FeatureDocumentVectorizer): """Vectorize the counts of parsed spaCy features. This generates the count of tokens as a S X M * N tensor where S is the number of sentences, M is the number of token feature ids and N is the number of columns of the output of the :class:`.SpacyFeatureVectorizer` vectorizer. Each column position's count represents the number of counts for that spacy symol for that index position in the output of :class:`.SpacyFeatureVectorizer`. This class uses the same efficiency in decoding features given in :class:`.EnumContainerFeatureVectorizer`. :shape: (|sentences|, |decoded features|) """ ATTR_EXP_META = ('decoded_feature_ids',) DESCRIPTION = 'token level feature counts' FEATURE_TYPE = TextFeatureType.DOCUMENT decoded_feature_ids: Set[str] = field(default=None) def _get_shape(self) -> Tuple[int, int]: """Compute the shape based on what spacy feature ids are given. """ feature_ids = self.decoded_feature_ids flen = 0 for fvec in self.manager.spacy_vectorizers.values(): if feature_ids is None or fvec.feature_id in feature_ids: flen += fvec.shape[1] return -1, flen
[docs] def get_feature_counts(self, sent: FeatureSentence, fvec: SpacyFeatureVectorizer) -> Tensor: """Return the count of all tokens as a S X N tensor where S is the number of sentences, N is the columns of the ``fvec`` vectorizer. Each column position's count represents the number of counts for that spacy symol for that index position in the ``fvec``. """ fid = fvec.feature_id fcounts = self.torch_config.zeros(fvec.shape[1]) for tok in sent.tokens: val = getattr(tok, fid) fnid = fvec.id_from_spacy(val, -1) if fnid > -1: fcounts[fnid] += 1 return fcounts
def _encode(self, doc: FeatureDocument) -> FeatureContext: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'encoding doc: {doc}') sent_arrs = [] for sent in doc.sents: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'encoding sentence: {sent}') tok_arrs = [] for fvec in self.manager.spacy_vectorizers.values(): cnts: Tensor = self.get_feature_counts(sent, fvec) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'encoding with {fvec}') tok_arrs.append(cnts) sent_arrs.append(torch.cat(tok_arrs)) arr = torch.stack(sent_arrs) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'encoded shape: {arr.shape}') return SparseTensorFeatureContext.instance( self.feature_id, arr, self.torch_config) def _slice_by_attributes(self, arr: Tensor) -> Tensor: """Create a new tensor from column based slices of the encoded tensor for each specified feature id given in :obj:`decoded_feature_ids`. """ keeps = set(self.decoded_feature_ids) col_start = 0 tensors = [] for fvec in self.manager.spacy_vectorizers.values(): col_end = col_start + fvec.shape[1] fid = fvec.feature_id if logger.isEnabledFor(logging.DEBUG): logger.debug(f'type={fid}, to keep={keeps}') if fid in keeps: keep_vec = arr[:, col_start:col_end] tensors.append(keep_vec) keeps.remove(fid) col_start = col_end if len(keeps) > 0: raise VectorizerError(f'Unknown feature type IDs: {keeps}') sarr = torch.cat(tensors, dim=1) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'slice dim: {sarr.shape}') return sarr
[docs] def to_symbols(self, tensor: Tensor) -> List[Dict[str, float]]: """Reverse map the tensor to spaCy features. :return: a list of sentences, each a map of name/count pairs. """ sents = [] for six in range(tensor.size(0)): col_start = 0 by_fid = {} sents.append(by_fid) arr = tensor[six] for fvec in self.manager.spacy_vectorizers.values(): col_end = col_start + fvec.shape[1] fid = fvec.feature_id vec = arr[col_start:col_end] cnts = dict(filter(lambda x: x[1] > 0, zip(fvec.as_list, vec.tolist()))) by_fid[fid] = cnts col_start = col_end return sents
def _decode(self, context: FeatureContext) -> Tensor: arr = super()._decode(context) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'decoded features: {self.decoded_feature_ids}, ' + f'shape: {arr.shape}') if self.decoded_feature_ids is not None: arr = self._slice_by_attributes(arr) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'decoded shape: {arr.shape}') return arr
[docs] @dataclass class DepthFeatureDocumentVectorizer(FeatureDocumentVectorizer): """Generate the depths of tokens based on how deep they are in a head dependency tree. Even though this is a document level vectorizer and is usually added in a join layer rather than stacked on to the embedded layer, it still assumes congruence with the token length, which is used in its shape. **Important**: do not combine sentences in to a single document with :meth:`~zensols.nlp.container.FeatureDocument.combine_sentences` since features are created as a dependency parse tree at the sentence level. Otherwise, the dependency relations are broken and results in a zeored tensor. :shape: (|sentences|, |sentinel tokens|, 1) """ DESCRIPTION = 'head depth' FEATURE_TYPE = TextFeatureType.TOKEN def _get_shape(self) -> Tuple[int, int]: return -1, self.token_length, 1
[docs] def encode(self, doc: Union[Tuple[FeatureDocument], FeatureDocument]) -> \ FeatureContext: ctx: TensorFeatureContext if isinstance(doc, (tuple, list)): self._assert_doc(doc) docs = doc comb_doc = FeatureDocument.combine_documents(docs) n_toks = self.manager.get_token_length(comb_doc) arrs = tuple(map(lambda d: self._encode_doc(d.combine_sentences(), n_toks), docs)) arr = torch.cat(arrs, dim=0) arr = arr.unsqueeze(-1) ctx = SparseTensorFeatureContext.instance( self.feature_id, arr, self.torch_config) else: ctx = super().encode(doc) return ctx
def _encode(self, doc: FeatureDocument) -> FeatureContext: n_toks = self.manager.get_token_length(doc) arr = self._encode_doc(doc, n_toks) arr = arr.unsqueeze(-1) return SparseTensorFeatureContext.instance( self.feature_id, arr, self.torch_config) def _encode_doc(self, doc: FeatureDocument, n_toks: int) -> Tensor: n_sents = len(doc.sents) arr = self.torch_config.zeros((n_sents, n_toks)) u_doc = doc.uncombine_sentences() if logger.isEnabledFor(logging.DEBUG): logger.debug(f'encoding doc: {len(doc)}/{len(u_doc)}: {doc}') # if the doc is combined as several sentences concatenated in one, un # pack and write all features in one row if len(doc) != len(u_doc): soff = 0 for sent in u_doc.sents: self._transform_sent(sent, arr, 0, soff, n_toks) soff += len(sent) else: # otherwise, each row is a separate sentence for six, sent in enumerate(doc.sents): self._transform_sent(sent, arr, six, 0, n_toks) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'encoded shape: {arr.shape}') return arr def _transform_sent(self, sent: FeatureSentence, arr: Tensor, six: int, soff: int, slen: int): head_depths = self._get_head_depth(sent) for tix, tok, depth in head_depths: off = tix + soff val = 1. / depth in_range = (off < slen) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'setting ({six}, {off}) = {val}: set={in_range}') if in_range: arr[six, off] = val def _dep_branch(self, node: FeatureToken, toks: Tuple[FeatureToken], tid_to_idx: Dict[int, int], depth: int, depths: Dict[int, int]) -> \ Dict[FeatureToken, List[FeatureToken]]: idx = tid_to_idx.get(node.i) if idx is not None: depths[idx] = depth for c in node.children: cix = tid_to_idx.get(c) if cix is not None: child = toks[cix] self._dep_branch(child, toks, tid_to_idx, depth + 1, depths) def _get_head_depth(self, sent: FeatureSentence) -> \ Tuple[Tuple[int, FeatureToken, int]]: """Calculate the depth of tokens in a sentence. :param sent: the sentence that has the tokens to get depts :return: a tuple of (sentence token index, token, depth) """ tid_to_idx: Dict[int, int] = {} toks = sent.tokens for i, tok in enumerate(toks): tid_to_idx[tok.i] = i if logger.isEnabledFor(logging.DEBUG): logger.debug('|'.join( map(lambda t: f'{tid_to_idx[t.i]}:{t.i}:{t.text}({t.dep_})', sent.token_iter()))) logger.debug(f'tree: {sent.dependency_tree}') if logger.isEnabledFor(logging.DEBUG): logger.debug(f'tokens: {toks}') root = tuple( filter(lambda t: t.dep_ == 'ROOT' and not t.is_punctuation, toks)) if len(root) == 1: root = root[0] tree = {tid_to_idx[root.i]: 0} try: self._dep_branch(root, toks, tid_to_idx, 1, tree) except Exception as e: dstr: str = 'Could not vectorize depth for' try: dstr = f'sentence <{dstr}>, root: {root}, tree: {tree}' except Exception as e: dstr = f'{dstr} <error: {e}>' raise VectorizerError( f'Could not vectorize depth for : <{dstr}>') from e return map(lambda x: (x[0], toks[x[0]], x[1]), tree.items()) else: return ()
[docs] @dataclass class OneHotEncodedFeatureDocumentVectorizer( FeatureDocumentVectorizer, OneHotEncodedEncodableFeatureVectorizer): """Vectorize nominal enumerated features in to a one-hot encoded vectors. The feature is taken from a :class:`~zensols.nlp.FeatureToken`. If :obj:`level` is ``token`` then the features are token attributes identified by :obj:`feature_attribute`. If the :obj:`level` is ``document`` feature is taken from the document. :shape: * level = document: (1, |categories|) * level = token: (|<sentences>|, |<sentinel tokens>|, |categories|) """ DESCRIPTION = 'encoded feature document vectorizer' feature_attribute: Tuple[str] = field(default=None) """The feature attributes to vectorize.""" level: str = field(default='token') """The level at which to take the attribute value, which is ``document``, ``sentence`` or ``token``. """ def __post_init__(self): super().__post_init__() self.optimize_bools = False @property def feature_type(self) -> TextFeatureType: return {'document': TextFeatureType.DOCUMENT, 'token': TextFeatureType.TOKEN, }[self.level] def _get_shape(self) -> Tuple[int, int]: if self.level == 'document': return -1, super()._get_shape()[1] else: return -1, self.token_length, super()._get_shape()[1] def _encode(self, doc: FeatureDocument) -> FeatureContext: attr = self.feature_attribute if self.level == 'document': arr = self.torch_config.zeros((1, self.shape[1])) feats = [getattr(doc, attr)] self._encode_cats(feats, arr) elif self.level == 'token': # not tested tlen = self.manager.get_token_length(doc) arr = self.torch_config.zeros((len(doc), tlen, self.shape[2])) for six, sent in enumerate(doc.sents): feats = tuple(map(lambda s: getattr(s, attr), sent)) self._encode_cats(feats, arr[six]) else: raise VectorizerError(f'Unknown doc level: {self.level}') if logger.isEnabledFor(logging.DEBUG): logger.debug(f'vectorized: {len(doc)} sents in to {arr.shape}') return SparseTensorFeatureContext.instance( self.feature_id, arr, self.torch_config)
[docs] @dataclass class TokenEmbeddingFeatureVectorizer( AggregateEncodableFeatureVectorizer, FeatureDocumentVectorizer): """A :class:`~zensols.deepnlp.vectorize.AggregateEncodableFeatureVectorizer` that is useful for token level classification (i.e. NER). It uses a delegate to first vectorizer the features, then concatenates in to one aggregate. In shape terms, this takes the single sentence position. The additional unsqueezed dimensions set with :obj:`n_unsqueeze` is useful when the delegate vectorizer encodes booleans or any other value that does not take an additional dimension. :shape: (1, |tokens|, <delegate vectorizer shape>[, <unsqueeze dimensions]) """ DESCRIPTION = 'token aggregate vectorizer' level: TextFeatureType = field(default=TextFeatureType.TOKEN) """The level at which to take the attribute value, which is ``document``, ``sentence`` or ``token``. """ add_dims: int = field(default=0) """The number of dimensions to add (see class docs).""" def _get_shape(self): dim = [1] dim.extend(super()._get_shape()) dim.extend([1] * self.add_dims) return tuple(dim) @property def feature_type(self) -> TextFeatureType: return self.level
[docs] def encode(self, doc: Union[Tuple[FeatureDocument], FeatureDocument]) -> \ FeatureContext: return TransformableFeatureVectorizer.encode(self, doc)
def _decode(self, context: MultiFeatureContext) -> Tensor: tensor: Tensor = super()._decode(context) for _ in range(self.add_dims): return tensor.unsqueeze(-1) return tensor
[docs] @dataclass class StatisticsFeatureDocumentVectorizer(FeatureDocumentVectorizer): """Vectorizes basic surface language statics which include: * character count * token count * min token length in characters * max token length in characters * average token length in characters (|characters| / |tokens|) * sentence count (for FeatureDocuments) * average sentence length (|tokens| / |sentences|) * min sentence length * max sentence length :shape: (1, 9,) """ DESCRIPTION = 'statistics' FEATURE_TYPE = TextFeatureType.DOCUMENT def _get_shape(self) -> Tuple[int, int]: return -1, 9 def _encode(self, doc: FeatureDocument) -> FeatureContext: n_toks = len(doc.tokens) n_sents = 1 min_tlen = sys.maxsize max_tlen = 0 ave_tlen = 1 min_slen = sys.maxsize max_slen = 0 ave_slen = 1 n_char = 0 for t in doc.tokens: tlen = len(t.norm) n_char += tlen min_tlen = min(min_tlen, tlen) max_tlen = max(max_tlen, tlen) ave_tlen = n_char / n_toks if isinstance(doc, FeatureDocument): n_sents = len(doc.sents) ave_slen = n_toks / n_sents for s in doc.sents: slen = len(s.tokens) min_slen = min(min_slen, slen) max_slen = max(max_slen, slen) stats = (n_char, n_toks, min_tlen, max_tlen, ave_tlen, n_sents, ave_slen, min_slen, max_slen) arr = self.torch_config.from_iterable(stats).unsqueeze(0) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'array shape: {arr.shape}') return TensorFeatureContext(self.feature_id, arr)
[docs] @dataclass class OverlappingFeatureDocumentVectorizer(MultiDocumentVectorizer): """Vectorize the number of normalized and lemmatized tokens (in this order) across multiple documents. The input to this feature vectorizer are a tuple N of :class:`.FeatureDocument` instances. :shape: (2,) """ DESCRIPTION = 'overlapping token counts' def _get_shape(self) -> Tuple[int, int]: return 2, @staticmethod def _norms(ac: TokenContainer, bc: TokenContainer) -> Tuple[int]: a = set(map(lambda s: s.norm.lower(), ac.token_iter())) b = set(map(lambda s: s.norm.lower(), bc.token_iter())) return a & b @staticmethod def _lemmas(ac: TokenContainer, bc: TokenContainer) -> Tuple[int]: a = set(map(lambda s: s.lemma_.lower(), ac.token_iter())) b = set(map(lambda s: s.lemma_.lower(), bc.token_iter())) return a & b def _encode(self, docs: Tuple[FeatureDocument]) -> FeatureContext: norms = reduce(self._norms, docs) lemmas = reduce(self._lemmas, docs) arr = self.torch_config.from_iterable((len(norms), len(lemmas))) return TensorFeatureContext(self.feature_id, arr)
[docs] @dataclass class MutualFeaturesContainerFeatureVectorizer(MultiDocumentVectorizer): """Vectorize the shared count of all tokens as a S X M * N tensor, where S is the number of sentences, M is the number of token feature ids and N is the columns of the output of the :class:`.SpacyFeatureVectorizer` vectorizer. This uses an instance of :class:`CountEnumContainerFeatureVectorizer` to compute across each spacy feature and then sums them up for only those features shared. If at least one shared document has a zero count, the features is zeroed. The input to this feature vectorizer are a tuple of N :class:`.TokenContainer` instances. :shape: (|sentences|, |decoded features|,) from the referenced :class:`CountEnumContainerFeatureVectorizer` given by :obj:`count_vectorizer_feature_id` """ DESCRIPTION = 'mutual feature counts' count_vectorizer_feature_id: str = field() """The string feature ID configured in the :class:`.FeatureDocumentVectorizerManager` of the :class:`CountEnumContainerFeatureVectorizer` to use for the count features. """ @property def count_vectorizer(self) -> CountEnumContainerFeatureVectorizer: """Return the count vectorizer used for the count features. :see: :obj:`count_vectorizer_feature_id` """ return self.manager[self.count_vectorizer_feature_id] @property def ones(self) -> Tensor: """Return a tensor of ones for the shape of this instance. """ return self.torch_config.ones((1, self.shape[1])) def _get_shape(self) -> Tuple[int, int]: return -1, self.count_vectorizer.shape[1] def _encode(self, docs: Tuple[FeatureDocument]) -> FeatureContext: ctxs = tuple(map(self.count_vectorizer.encode, map(lambda doc: doc.combine_sentences(), docs))) return MultiFeatureContext(self.feature_id, ctxs) def _decode(self, context: MultiFeatureContext) -> Tensor: def decode_context(ctx): sents = self.count_vectorizer.decode(ctx) return torch.sum(sents, axis=0) ones = self.ones arrs = tuple(map(decode_context, context.contexts)) if len(arrs) == 1: # return the single document as a mutual count against itself return arrs[0] else: arrs = torch.stack(arrs, axis=0).squeeze(1) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'combined counts (doc/row): {arrs.shape}') # clone so the operations of this vectorizer do not effect the # tensors from the delegate count vectorizer cnts = self.torch_config.clone(arrs) # multiple counts of all docs so any 0 count feature will be 0 in # the mask prod = cnts.prod(axis=0).unsqueeze(0) # create 2 X N with count product with ones cat_ones = torch.cat((prod, ones)) # keep 0s for no count features or 1 if there is at least one for # the mask mask = torch.min(cat_ones, axis=0)[0] if logger.isEnabledFor(logging.DEBUG): logger.debug(f'counts mask: {cat_ones.shape}') # use the mask to zero out counts that aren't mutual across all # documents, then sum the counts across docuemnts return (cnts * mask).sum(axis=0).unsqueeze(0)
[docs] @dataclass class WordEmbeddingFeatureVectorizer(EncodableFeatureVectorizer): """Vectorizes string tokens in to word embedded vectors. This class works directly with the string tokens rather than :class:`~zensols.nlp.FeatureDocument` instances. It can be useful when there's a need to vectorize tokens outside of a feature document (i.e. ``cui2vec``). """ FEATURE_TYPE = TextFeatureType.EMBEDDING DESCRIPTION = 'word embedding encoder' embed_model: WordEmbedModel = field() """The word embedding model that has the string tokens to vector mapping.""" def _get_shape(self): return (-1, self.embed_model.vector_dimension) def _encode(self, keys: Iterable[str]) -> FeatureContext: em: WordEmbedModel = self.embed_model vecs: np.ndarray = tuple(map(lambda k: em.get(k), keys)) arr: np.ndarray = np.stack(vecs) return TensorFeatureContext(self.feature_id, torch.from_numpy(arr))