Source code for zensols.deepnlp.transformer.vectorizers

"""Contains classes that are used to vectorize documents in to transformer
embeddings.

"""
from __future__ import annotations
__author__ = 'Paul Landes'
from typing import Tuple, List, Dict, Union, Sequence, Any
from dataclasses import dataclass, field
from abc import ABCMeta
import sys
import logging
from itertools import chain
from io import TextIOBase
import torch
from torch import Tensor
from zensols.persist import persisted, Deallocatable
from zensols.deeplearn.vectorize import (
    VectorizerError, TensorFeatureContext, EncodableFeatureVectorizer,
    FeatureContext, AggregateEncodableFeatureVectorizer,
    NominalEncodedEncodableFeatureVectorizer, MaskFeatureVectorizer,
)
from zensols.nlp import FeatureDocument, FeatureSentence
from zensols.deepnlp.vectorize import (
    EmbeddingFeatureVectorizer, TextFeatureType, FeatureDocumentVectorizer
)
from . import (
    TransformerEmbedding, TransformerResource,
    TransformerDocumentTokenizer, TokenizedDocument, TokenizedFeatureDocument,
)

logger = logging.getLogger(__name__)


[docs] class TransformerFeatureContext(FeatureContext, Deallocatable): """A vectorizer feature contex used with :class:`.TransformerEmbeddingFeatureVectorizer`. """
[docs] def __init__(self, feature_id: str, document: Union[TokenizedDocument, FeatureDocument]): """ :params feature_id: the feature ID used to identify this context :params document: document used to create the transformer embeddings """ super().__init__(feature_id) Deallocatable.__init__(self) self._document = document
[docs] def get_document(self, vectorizer: TransformerFeatureVectorizer) -> \ TokenizedDocument: document = self._document if isinstance(document, FeatureDocument): document = vectorizer.tokenize(document) return document
[docs] def get_feature_document(self) -> FeatureDocument: if not isinstance(self._document, FeatureDocument): raise VectorizerError( f'Expecting FeatureDocument but got: {type(self._document)}') return self._document
[docs] def deallocate(self): super().deallocate() self._try_deallocate(self._document) del self._document
[docs] @dataclass class TransformerFeatureVectorizer(EmbeddingFeatureVectorizer, FeatureDocumentVectorizer): """Base class for classes that vectorize transformer models. This class also tokenizes documents. """ is_labeler: bool = field(default=False) """If ``True``, make this a labeling specific vectorizer. Otherwise, certain layers will use the output of the vectorizer as features rather than the labels. """ encode_tokenized: bool = field(default=False) """Whether to tokenize the document on encoding. Set this to ``True`` only if the huggingface model ID (i.e. ``bert-base-cased``) will not change after vectorization/batching. Setting this to ``True`` tells the vectorizer to tokenize during encoding, and thus will speed experimentation by providing the tokenized tensors to the model directly. """ def __post_init__(self): if self.encode_transformed and not self.encode_tokenized: raise VectorizerError("""\ Can not transform while not tokenizing on the encoding side. Either set encode_transformed to False or encode_tokenized to True.""") def _assert_token_output(self, expected: str = 'last_hidden_state'): if self.embed_model.output != expected: raise VectorizerError(f"""\ Some vectorizers only work at the token level, so output such as \ '{expected}', which provides an output for each token in the \ transformer embedding, is required, got: '{self.embed_model.output}' \ for attribute `output` in '{self.name}'""") @property def feature_type(self) -> TextFeatureType: if self.is_labeler: return TextFeatureType.NONE else: return self.FEATURE_TYPE @property def word_piece_token_length(self) -> int: return self.embed_model.tokenizer.word_piece_token_length def _get_shape(self) -> Tuple[int, int]: return self.word_piece_token_length, self.embed_model.vector_dimension def _get_tokenizer(self) -> TransformerDocumentTokenizer: emb: TransformerEmbedding = self.embed_model return emb.tokenizer def _get_resource(self) -> TransformerResource: return self._get_tokenizer().resource def _create_context(self, doc: FeatureDocument) -> \ TransformerFeatureContext: if self.encode_tokenized: doc = self.tokenize(doc).detach() return TransformerFeatureContext(self.feature_id, doc) def _context_to_document(self, ctx: TransformerFeatureContext) -> \ TokenizedDocument: return ctx.get_document(self)
[docs] def tokenize(self, doc: FeatureDocument) -> TokenizedFeatureDocument: """Tokenize the document in to a token document used by the encoding phase. :param doc: the document to be tokenized """ emb: TransformerEmbedding = self.embed_model if logger.isEnabledFor(logging.DEBUG): logger.debug(f'synthesized document: {doc}') return emb.tokenize(doc)
[docs] @dataclass class TransformerEmbeddingFeatureVectorizer(TransformerFeatureVectorizer): """A feature vectorizer used to create transformer (i.e. BERT) embeddings. The class uses the :obj:`.embed_model`, which is of type :class:`.TransformerEmbedding`. Note the encoding input ideally are sentences shorter than 512 tokens. However, this vectorizer can accommodate both :class:`.FeatureSentence` and :class:`.FeatureDocument` instances. """ DESCRIPTION = 'transformer document embedding' FEATURE_TYPE = TextFeatureType.EMBEDDING def __post_init__(self): super().__post_init__() if self.encode_transformed and self.embed_model.trainable: # once the transformer last hidden state is dumped during encode # the parameters are lost, which are needed to train the model # properly raise VectorizerError('a trainable model can not encode ' + 'transformed vectorized features') def _encode(self, doc: FeatureDocument) -> FeatureContext: return self._create_context(doc) def _decode(self, context: TransformerFeatureContext) -> Tensor: emb: TransformerEmbedding = self.embed_model if logger.isEnabledFor(logging.INFO): logger.info(f'decoding {context} with trainable: {emb.trainable}') arr: Tensor if emb.trainable: doc: TokenizedDocument = self._context_to_document(context) arr = doc.tensor if logger.isEnabledFor(logging.DEBUG): logger.debug(f'passing through tensor: {arr.shape}') else: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'transforming doc: {context}') doc: TokenizedDocument = self._context_to_document(context) arr = emb.transform(doc) if logger.isEnabledFor(logging.INFO): logger.info(f'decoded trans layer {arr.shape} on {arr.device}') return arr
[docs] class TransformerExpanderFeatureContext(TransformerFeatureContext): """A vectorizer feature context used with :class:`.TransformerExpanderFeatureVectorizer`. """ contexts: Tuple[FeatureContext] = field() """The subordinate contexts."""
[docs] def __init__(self, feature_id: str, contexts: Tuple[FeatureContext], document: Union[TokenizedDocument, FeatureDocument]): """ :params feature_id: the feature ID used to identify this context :params contexts: subordinate contexts given to :class:`.MultiFeatureContext` :params document: document used to create the transformer embeddings """ super().__init__(feature_id, document) self.contexts = contexts
[docs] def deallocate(self): super().deallocate() if hasattr(self, 'contexts'): self._try_deallocate(self.contexts) del self.contexts
[docs] @dataclass class TransformerExpanderFeatureVectorizer(TransformerFeatureVectorizer): """A vectorizer that expands lingustic feature vectors to their respective locations as word piece token vectors. This is used to concatenate lingustic features with Bert (and other transformer) embeddings. Each lingustic token is copied in the word piece token location across all vectorizers and sentences. :shape: (-1, token length, X), where X is the sum of all the delegate shapes across all three dimensions """ DESCRIPTION = 'transformer expander' FEATURE_TYPE = TextFeatureType.TOKEN delegate_feature_ids: Tuple[str] = field(default=None) """A list of feature IDs of vectorizers whose output will be expanded.""" def __post_init__(self): super().__post_init__() if self.delegate_feature_ids is None: raise VectorizerError('expected attribute: delegate_feature_ids') self._assert_token_output() self._validated = False def _validate(self): if not self._validated: for vec in self.delegates: if hasattr(vec, 'feature_tye') and \ vec.feature_type != TextFeatureType.TOKEN: raise VectorizerError('Only token level vectorizers are ' + f'supported, but got {vec}') self._validated = True def _get_shape(self) -> Tuple[int, int]: shape = [-1, self.word_piece_token_length, 0] vec: FeatureDocumentVectorizer for vec in self.delegates: shape[2] += vec.shape[-1] return tuple(shape) @property @persisted('_delegates', allocation_track=False) def delegates(self) -> EncodableFeatureVectorizer: """The delegates used for encoding and decoding the lingustic features. """ return tuple(map(lambda f: self.manager[f], self.delegate_feature_ids)) def _encode(self, doc: FeatureDocument) -> FeatureContext: udoc: Union[TokenizedDocument, FeatureDocument] = doc self._validate() if self.encode_tokenized: udoc: TokenizedDocument = self.tokenize(doc).detach() cxs = tuple(map(lambda vec: vec.encode(doc), self.delegates)) return TransformerExpanderFeatureContext(self.feature_id, cxs, udoc) def _decode(self, context: TransformerExpanderFeatureContext) -> Tensor: doc: TokenizedDocument = self._context_to_document(context) arrs: List[Tensor] = [] # decode subordinate contexts vec: FeatureDocumentVectorizer ctx: FeatureContext for vec, ctx in zip(self.delegates, context.contexts): src = vec.decode(ctx) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'decoded shape ({vec.feature_id}): {src.shape}') arrs.append(src) # get the mapping per sentence wps_sents = tuple(map(lambda s: doc.map_word_pieces(s), doc.offsets)) tlen = self.word_piece_token_length # use variable length tokens if tlen <= 0: tlen = max(chain.from_iterable( chain.from_iterable( map(lambda s: map(lambda t: t[1], s), wps_sents)))) # max findes the largest index, so add 1 for size tlen += 1 # add another (to be zero) for the ending sentence boudary tlen += 1 if doc.boundary_tokens else 0 # number of sentences n_sents = len(wps_sents) # feature dimension (last dimension) dim = sum(map(lambda x: x.size(-1), arrs)) # tensor to populate marr = self.torch_config.zeros((n_sents, tlen, dim)) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'sents: {n_sents}, token length: {tlen}, dim: {dim}') sent: Tensor arr: Tensor wps: Tuple[Tuple[Tensor, List[int]]] marrix = 0 # iterate feature vectors for arr in arrs: ln = arr.size(-1) meix = marrix + ln if logger.isEnabledFor(logging.DEBUG): logger.debug(f'feature range: [{marrix}:{meix}]') # iterate sentences for six, (sent, wps) in enumerate(zip(doc.offsets, wps_sents)): if logger.isEnabledFor(logging.DEBUG): logger.debug(f'expanding for {arr.shape} in ' + f'[{six},:,{marrix}:{meix}]') # iterate lingustic / word piece tokens for tix, wpixs in wps: # for each word piece mapping, copy the source feature # vector to the target, thereby expanding and increasing # the size of the last dimsion for wix in wpixs: if False and logger.isEnabledFor(logging.DEBUG): logger.debug(f'[{six}, {wix}, {marrix}:{meix}] ' + f'= [{six}, {tix}]') marr[six, wix, marrix:meix] = arr[six, tix] marrix += ln return marr
[docs] @dataclass class LabelTransformerFeatureVectorizer(TransformerFeatureVectorizer, metaclass=ABCMeta): """A base class for vectorizing by mapping tokens to transformer consumable word piece tokens. This includes creating labels and masks. :shape: (|sentences|, |max word piece length|) """ is_labeler: bool = field(default=True) """If ``True``, make this a labeling specific vectorizer. Otherwise, certain layers will use the output of the vectorizer as features rather than the labels. """ FEATURE_TYPE = TextFeatureType.TOKEN def _get_shape(self) -> Tuple[int, int]: return (-1, self.word_piece_token_length) def _decode_sentence(self, sent_ctx: FeatureContext) -> Tensor: arr: Tensor = super()._decode_sentence(sent_ctx) return arr.unsqueeze(2)
[docs] @dataclass class TransformerNominalFeatureVectorizer(AggregateEncodableFeatureVectorizer, LabelTransformerFeatureVectorizer): """This creates word piece (maps to tokens) labels. This class uses a :class:`~zensols.deeplearn.vectorize.NominalEncodedEncodableFeatureVectorizer`` to map from string labels to their nominal long values. This allows a single instance and centralized location where the label mapping happens in case other (non-transformer) components need to vectorize labels. :shape: (|sentences|, |max word piece length|) """ DESCRIPTION = 'transformer seq labeler' delegate_feature_id: str = field(default=None) """The feature ID for the aggregate encodeable feature vectorizer.""" label_all_tokens: bool = field(default=False) """If ``True``, label all word piece tokens with the corresponding linguistic token label. Otherwise, the default padded value is used, and thus, ignored by the loss function when calculating loss. """ annotations_attribute: str = field(default='annotations') """The attribute used to get the features from the :class:`~zensols.nlp.FeatureSentence`. For example, :class:`~zensols.nlp.TokenAnnotatedFeatureSentence` has an ``annotations`` attribute. """ def __post_init__(self): super().__post_init__() if self.delegate_feature_id is None: raise VectorizerError('Expected attribute: delegate_feature_id') self._assert_token_output() def _get_attributes(self, sent: FeatureSentence) -> Sequence[Any]: return getattr(sent, self.annotations_attribute) def _create_decoded_pad(self, shape: Tuple[int]) -> Tensor: return self.create_padded_tensor(shape, self.delegate.data_type) def _encode_nominals(self, doc: FeatureDocument) -> Tensor: delegate: NominalEncodedEncodableFeatureVectorizer = self.delegate tdoc: TokenizedDocument = self.tokenize(doc) by_label: Dict[str, int] = delegate.by_label dtype: torch.dtype = delegate.data_type lab_all: bool = self.label_all_tokens n_sents: int = len(doc) # even if self.word_piece_token_length > 0, the TokeizedDocument length # gives the correct encoded size given the tokenizer's configuration, # otherwise it will pad too wide causing an error in TransformerSequence n_toks: int = len(tdoc) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'encoding using {n_toks} length') arr = self.create_padded_tensor((n_sents, n_toks), dtype) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'output shape: {arr.shape}/{self.shape}') sent: FeatureSentence for six, sent in enumerate(doc): sent_labels: Sequence[Any] = self._get_attributes(sent) word_ids: Tensor = tdoc.offsets[six] previous_word_idx: int = None tix: int word_idx: int for tix, word_idx in enumerate(word_ids): # special tokens have a word id that is None. We set the label # to -100 so they are automatically ignored in the loss # function. if word_idx == -1: pass # we set the label for the first token of each word. elif word_idx != previous_word_idx: lab: str = sent_labels[word_idx] arr[six][tix] = by_label[lab] # for the other tokens in a word, we set the label to either # the current label or -100, depending on the label_all_tokens # flag elif lab_all: arr[six][tix] = by_label[sent_labels[word_idx]] previous_word_idx = word_idx return arr def _encode(self, doc: FeatureDocument) -> FeatureContext: ctx: FeatureContext if self.encode_tokenized: arr: Tensor = self._encode_nominals(doc) ctx = TensorFeatureContext(self.feature_id, arr) else: ctx = self._create_context(doc) return ctx def _decode(self, context: FeatureContext) -> Tensor: if isinstance(context, TransformerFeatureContext): doc: FeatureDocument = context.get_feature_document() arr: Tensor = self._encode_nominals(doc) context = TensorFeatureContext(self.feature_id, arr) return LabelTransformerFeatureVectorizer._decode(self, context)
[docs] def write(self, depth: int = 0, writer: TextIOBase = sys.stdout): super().write(depth, writer) self._write_line('delegate:', depth, writer) self._write_object(self.delegate, depth + 1, writer)
[docs] @dataclass class TransformerMaskFeatureVectorizer(LabelTransformerFeatureVectorizer): """Creates a mask of word piece tokens to ``True`` and special tokens and padding to ``False``. This maps tokens to word piece tokens like :class:`.TransformerNominalFeatureVectorizer`. :shape: (|sentences|, |max word piece length|) """ DESCRIPTION = 'transformer mask' data_type: Union[str, None, torch.dtype] = field(default='bool') """The mask tensor type. To use the int type that matches the resolution of the manager's :obj:`torch_config`, use ``DEFAULT_INT``. """ def __post_init__(self): super().__post_init__() self.data_type = MaskFeatureVectorizer.str_to_dtype( self.data_type, self.manager.torch_config) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'init mask data type: {self.data_type}') def _create_decoded_pad(self, shape: Tuple[int]) -> Tensor: return self.torch_config.zeros(shape, dtype=self.data_type) def _encode_mask(self, doc: FeatureDocument) -> Tensor: tdoc: TokenizedDocument = self.tokenize(doc) arr: Tensor = tdoc.attention_mask.type(dtype=self.data_type) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'mask type: {arr.dtype}') return arr def _encode(self, doc: FeatureDocument) -> FeatureContext: ctx: FeatureContext if self.encode_tokenized: arr: Tensor = self._encode_mask(doc) ctx = TensorFeatureContext(self.feature_id, arr) else: ctx = self._create_context(doc) return ctx def _decode(self, context: FeatureContext) -> Tensor: if isinstance(context, TransformerFeatureContext): doc: FeatureDocument = context.get_feature_document() arr: Tensor = self._encode_mask(doc) context = TensorFeatureContext(self.feature_id, arr) return super()._decode(context)