Source code for zensols.nlp.container

"""Domain objects that define features associated with text.

"""
from __future__ import annotations
__author__ = 'Paul Landes'
from typing import List, Tuple, Iterable, Dict, Type, Any, ClassVar, Set, Union
from dataclasses import dataclass, field
import dataclasses
from abc import ABCMeta, abstractmethod
import sys
import logging
import textwrap as tw
import itertools as it
from itertools import chain
from io import TextIOBase
from frozendict import frozendict
from interlap import InterLap
from spacy.tokens import Doc, Span, Token
from zensols.persist import PersistableContainer, persisted, PersistedWork
from . import NLPError, TextContainer, FeatureToken, LexicalSpan
from .spannorm import SpanNormalizer, DEFAULT_FEATURE_TOKEN_NORMALIZER

logger = logging.getLogger(__name__)


[docs] class TokenContainer(PersistableContainer, TextContainer, metaclass=ABCMeta): """A base class for token container classes such as :class:`.FeatureSentence` and :class:`.FeatureDocument`. In addition to the defined methods, each instance has a ``text`` attribute, which is the original text of the document. """ _PERSITABLE_TRANSIENT_ATTRIBUTES: ClassVar[Set[str]] = {'_token_norm'} def __post_init__(self): super().__init__() self._norm = PersistedWork('_norm', self, transient=True) self._entities = PersistedWork('_entities', self, transient=True) self._token_norm: SpanNormalizer = DEFAULT_FEATURE_TOKEN_NORMALIZER
[docs] @abstractmethod def token_iter(self, *args, **kwargs) -> Iterable[FeatureToken]: """Return an iterator over the token features. :param args: the arguments given to :meth:`itertools.islice` """ pass
[docs] @staticmethod def strip_tokens(token_iter: Iterable[FeatureToken]) -> \ Iterable[FeatureToken]: """Strip beginning and ending whitespace. This uses :obj:`~.tok.SpacyFeatureToken.is_space`, which is ``True`` for spaces, tabs and newlines. :param token_iter: an stream of tokens :return: non-whitespace middle tokens """ first_tok: bool = False space_toks: List[FeatureToken] = [] tok: FeatureToken for tok in token_iter: if tok.is_space: if first_tok: space_toks.append(tok) else: first_tok = True stok: FeatureToken for stok in space_toks: yield stok space_toks.clear() yield tok
[docs] def strip_token_iter(self, *args, **kwargs) -> Iterable[FeatureToken]: """Strip beginning and ending whitespace (see :meth:`strip_tokens`) using :meth:`token_iter`. """ return self.strip_tokens(self.token_iter(*args, **kwargs))
[docs] def strip(self, in_place: bool = True) -> TokenContainer: """Strip beginning and ending whitespace (see :meth:`strip_tokens`) and :obj:`text`. """ self._clear_persistable_state() cont: TokenContainer = self if in_place else self.clone() cont._strip() return cont
@abstractmethod def _strip(self): pass
[docs] def norm_token_iter(self, *args, **kwargs) -> Iterable[str]: """Return a list of normalized tokens. :param args: the arguments given to :meth:`itertools.islice` """ return map(lambda t: t.norm, self.token_iter(*args, **kwargs))
@property @persisted('_norm') def norm(self) -> str: """The normalized version of the sentence.""" return self._token_norm.get_norm(self.token_iter()) @property @persisted('_canonical', transient=True) def canonical(self) -> str: """A canonical representation of the container, which are non-space tokens separated by :obj:`CANONICAL_DELIMITER`. """ return self._token_norm.get_canonical(self.token_iter()) @property @persisted('_tokens', transient=True) def tokens(self) -> Tuple[FeatureToken, ...]: """Return the token features as a tuple. """ return tuple(self.token_iter()) @property @persisted('_token_len', transient=True) def token_len(self) -> int: """Return the number of tokens.""" return sum(1 for i in self.token_iter()) @property @persisted('_lexspan', transient=True) def lexspan(self) -> LexicalSpan: """The document indexed lexical span using :obj:`idx`. """ toks: Tuple[FeatureToken, ...] = self.tokens if len(toks) == 0: return LexicalSpan.EMPTY_SPAN else: return LexicalSpan(toks[0].lexspan.begin, toks[-1].lexspan.end) @persisted('_interlap', transient=True) def _get_interlap(self) -> InterLap: """Create an interlap with all tokens of the container added.""" il = InterLap() # adding with tuple inline is ~3 times as fast than a list, and ~9 times # faster than an individual add in a for loop spans: Tuple[Tuple[int, int]] = tuple( map(lambda t: (t.lexspan.begin, t.lexspan.end - 1, t), self.token_iter())) if len(spans) > 0: il.add(spans) return il
[docs] def map_overlapping_tokens(self, spans: Iterable[LexicalSpan], inclusive: bool = True) -> \ Iterable[Tuple[FeatureToken, ...]]: """Return a tuple of tokens, each tuple in the range given by the respective span in ``spans``. :param spans: the document 0-index character based inclusive spans to compare with :obj:`.FeatureToken.lexspan` :param inclusive: whether to check include +1 on the end component :return: a tuple of matching tokens for the respective ``span`` query """ def map_span(s: LexicalSpan) -> Tuple[FeatureToken]: toks = map(lambda m: m[2], il.find(s.astuple)) # we have to manually check non-inclusive right intervals since # InterLap includes it if not inclusive: toks = filter(lambda t: t.lexspan.overlaps_with(s, False), toks) return tuple(toks) il = self._get_interlap() return map(map_span, spans)
[docs] def get_overlapping_tokens(self, span: LexicalSpan, inclusive: bool = True) -> \ Iterable[FeatureToken]: """Get all tokens that overlap lexical span ``span``. :param span: the document 0-index character based inclusive span to compare with :obj:`.FeatureToken.lexspan` :param inclusive: whether to check include +1 on the end component :return: a token sequence containing the 0 index offset of ``span`` """ return next(iter(self.map_overlapping_tokens((span,), inclusive)))
[docs] def get_overlapping_span(self, span: LexicalSpan, inclusive: bool = True) -> TokenContainer: """Return a feature span that includes the lexical scope of ``span``.""" sent = FeatureSentence(tokens=self.tokens, text=self.text) doc = FeatureDocument(sents=(sent,), text=self.text) return doc.get_overlapping_document(span, inclusive=inclusive)
[docs] @abstractmethod def to_sentence(self, limit: int = sys.maxsize, contiguous_i_sent: Union[str, bool] = False, delim: str = '') -> FeatureSentence: """Coerce this instance to a single sentence. No tokens data is updated so :obj:`.FeatureToken.i_sent` keep their original indexes. These sentence indexes will be inconsistent when called on :class:`.FeatureDocument` unless contiguous_i_sent is set to ``True``. :param limit: the max number of sentences to create (only starting kept) :param contiguous_i_sent: if ``True``, ensures all tokens have :obj:`.FeatureToken.i_sent` value that is contiguous for the returned instance; if this value is ``reset``, the token indicies start from 0 :param delim: a string added between each constituent sentence :return: an instance of ``FeatureSentence`` that represents this token sequence """ pass
def _set_contiguous_tokens(self, contiguous_i_sent: Union[str, bool], reference: TokenContainer): if contiguous_i_sent is False: pass elif contiguous_i_sent == 'reset': for i, tok in enumerate(self.token_iter()): tok.i_sent = i elif contiguous_i_sent is True: for ref_tok, tok in zip(reference.token_iter(), self.token_iter()): tok.i_sent = ref_tok.i else: raise ValueError( f'Bad value for contiguous_i_sent: {contiguous_i_sent}')
[docs] @abstractmethod def to_document(self, limit: int = sys.maxsize) -> FeatureDocument: """Coerce this instance in to a document. """ pass
[docs] def clone(self, cls: Type[TokenContainer] = None, **kwargs) -> \ TokenContainer: """Clone an instance of this token container. :param cls: the type of the new instance :param kwargs: arguments to add to as attributes to the clone :return: the cloned instance of this instance """ cls = self.__class__ if cls is None else cls return cls(**kwargs)
@property @persisted('_entities') def entities(self) -> Tuple[FeatureSpan, ...]: """The named entities of the container with each multi-word entity as elements. """ return self._get_entities() @abstractmethod def _get_entities(self) -> Tuple[FeatureSpan, ...]: pass @property @persisted('_tokens_by_idx', transient=True) def tokens_by_idx(self) -> Dict[int, FeatureToken]: """A map of tokens with keys as their character offset and values as tokens. **Limitations**: Multi-word entities will have have a mapping only for the first word of that entity if tokens were split by spaces (for example with :class:`~zensols.nlp.SplitTokenMapper`). However, :obj:`tokens_by_i` does not have this limitation. :see: obj:`tokens_by_i` :see: :obj:`zensols.nlp.FeatureToken.idx` """ by_idx = {} cnt = 0 tok: FeatureToken for tok in self.token_iter(): by_idx[tok.idx] = tok cnt += 1 assert cnt == self.token_len return frozendict(by_idx) @property @persisted('_tokens_by_i', transient=True) def tokens_by_i(self) -> Dict[int, FeatureToken]: """A map of tokens with keys as their position offset and values as tokens. The entries also include named entity tokens that are grouped as multi-word tokens. This is helpful for multi-word entities that were split (for example with :class:`~zensols.nlp.SplitTokenMapper`), and thus, have many-to-one mapped indexes. :see: :obj:`zensols.nlp.FeatureToken.i` """ return frozendict(self._get_tokens_by_i()) @abstractmethod def _get_tokens_by_i(self) -> Dict[int, FeatureToken]: pass
[docs] def update_indexes(self): """Update all :obj:`.FeatureToken.i` attributes to those provided by :obj:`tokens_by_i`. This corrects the many-to-one token index mapping for split multi-word named entities. :see: :obj:`tokens_by_i` """ i: int ft: FeatureToken for i, ft in self.tokens_by_i.items(): ft.i = i
[docs] @abstractmethod def update_entity_spans(self, include_idx: bool = True): """Update token entity to :obj:`norm` text. This is helpful when entities are embedded after splitting text, which becomes :obj:`.FeatureToken.norm` values. However, the token spans still index the original entities that are multi-word, which leads to norms that are not equal to the text spans. This synchronizes the token span indexes with the norms. :param include_idx: whether to update :obj:`.SpacyFeatureToken.idx` as well """ pass
[docs] def reindex(self, reference_token: FeatureToken = None): """Re-index tokens, which is useful for situtations where a 0-index offset is assumed for sub-documents created with :meth:`.FeatureDocument.get_overlapping_document` or :meth:`.FeatureDocument.get_overlapping_sentences`. The following data are modified: * :obj:`.FeatureToken.i` * :obj:`.FeatureToken.idx` * :obj:`.FeatureToken.i_sent` * :obj:`.FeatureToken.sent_i` (see :obj:`.SpacyFeatureToken.sent_i`) * :obj:`.FeatureToken.lexspan` (see :obj:`.SpacyFeatureToken.lexspan`) * :obj:`entities` * :obj:`lexspan` * :obj:`tokens_by_i` * :obj:`tokens_by_idx` * :obj:`.FeatureSpan.tokens_by_i_sent` * :obj:`.FeatureSpan.dependency_tree` """ toks: Tuple[FeatureToken] = self.tokens if len(toks) > 0: if reference_token is None: reference_token = toks[0] self._reindex(reference_token.clone()) self.clear()
def _reindex(self, tok: FeatureToken): offset_i, offset_idx = tok.i, tok.idx sent_i = tok.sent_i if hasattr(tok, 'sent_i') else None tok: FeatureToken for tok in self.tokens: idx: int = tok.idx - offset_idx span = LexicalSpan(idx, idx + len(tok.text)) tok.i -= offset_i tok.idx = idx tok.lexspan = span if sent_i is not None: for tok in self.tokens: tok.sent_i -= sent_i
[docs] def clear(self): """Clear all cached state.""" self._clear_persistable_state()
[docs] def write(self, depth: int = 0, writer: TextIOBase = sys.stdout, include_original: bool = False, include_normalized: bool = True, n_tokens: int = sys.maxsize, inline: bool = False): """Write the text container. :param include_original: whether to include the original text :param include_normalized: whether to include the normalized text :param n_tokens: the number of tokens to write :param inline: whether to print the tokens on one line each """ super().write(depth, writer, include_original=include_original, include_normalized=include_normalized) if n_tokens > 0: self._write_line('tokens:', depth, writer) for t in it.islice(self.token_iter(), n_tokens): if inline: t.write_attributes(depth + 1, writer, inline=True, include_type=False) else: t.write(depth + 1, writer)
[docs] def write_text(self, depth: int = 0, writer: TextIOBase = sys.stdout, include_original: bool = False, include_normalized: bool = True, limit: int = sys.maxsize): """Write only the text of the container. :param include_original: whether to include the original text :param include_normalized: whether to include the normalized text :param limit: the max number of characters to print """ inc_both: bool = include_original and include_normalized add_depth = 1 if inc_both else 0 if include_original: if inc_both: self._write_line('[O]:', depth, writer) text: str = tw.shorten(self.text, limit) self._write_wrap(text, depth + add_depth, writer) if include_normalized: if inc_both: self._write_line('[N]:', depth, writer) norm: str = tw.shorten(self.norm, limit) self._write_wrap(norm, depth + add_depth, writer)
def __getitem__(self, key: Union[LexicalSpan, int]) -> \ Union[FeatureToken, TokenContainer]: if isinstance(key, LexicalSpan): return self.get_overlapping_span(key, inclusive=False) return self.tokens[key] def __setstate__(self, state: Dict[str, Any]): super().__setstate__(state) self._token_norm: SpanNormalizer = DEFAULT_FEATURE_TOKEN_NORMALIZER def __eq__(self, other: TokenContainer) -> bool: if self is other: return True else: a: FeatureToken b: FeatureToken for a, b in zip(self.token_iter(), other.token_iter()): if a != b: return False return self.token_len == other.token_len and self.text == other.text def __lt__(self, other: FeatureToken) -> int: return self.norm < other.norm def __hash__(self) -> int: return sum(map(hash, self.token_iter())) def __str__(self): return TextContainer.__str__(self) def __repr__(self): return TextContainer.__repr__(self)
[docs] @dataclass(eq=False, repr=False) class FeatureSpan(TokenContainer): """A span of tokens as a :class:`.TokenContainer`, much like :class:`spacy.tokens.Span`. """ _PERSITABLE_TRANSIENT_ATTRIBUTES: ClassVar[Set[str]] = \ TokenContainer._PERSITABLE_TRANSIENT_ATTRIBUTES | \ {'spacy_span', '_token_val'} """Don't serialize the spacy document on persistance pickling.""" tokens: Tuple[FeatureToken, ...] = field() """The tokens that make up the span.""" text: str = field(default=None) """The original raw text of the span.""" spacy_span: Span = field(default=None, repr=False, compare=False) """The parsed spaCy span this feature set is based. :see: :meth:`.FeatureDocument.spacy_doc` """ def __post_init__(self): super().__post_init__() if self.text is None: self.text = ' '.join(map(lambda t: t.text, self.tokens)) # the _tokens setter is called to set the tokens before the the # spacy_span set; so call it again since now we have spacy_span set self._set_entity_spans() @property def _tokens(self) -> Tuple[FeatureToken, ...]: return self._tokens_val @_tokens.setter def _tokens(self, tokens: Tuple[FeatureToken, ...]): if not isinstance(tokens, tuple): raise NLPError( f'Expecting tuple of tokens, but got {type(tokens)}') self._tokens_val = tokens self._ents: List[Tuple[int, int]] = [] self._set_entity_spans() if hasattr(self, '_norm'): # the __post_init__ is called after this setter for EMPTY_SENTENCE self._norm.clear() def _set_entity_spans(self): if self.spacy_span is not None: for ents in self.spacy_span.ents: start, end = None, None ents = iter(ents) try: start = end = next(ents) while True: end = next(ents) except StopIteration: pass if start is not None: self._ents.append((start.idx, end.idx)) def _strip(self): self.tokens = tuple(self.strip_tokens(self.tokens)) self.text = self.text.strip()
[docs] def to_sentence(self, limit: int = sys.maxsize, contiguous_i_sent: Union[str, bool] = False, delim: str = '') -> FeatureSentence: if limit == 0: return iter(()) else: clone = self.clone(FeatureSentence) if contiguous_i_sent: clone._set_contiguous_tokens(contiguous_i_sent, self) return clone
[docs] def to_document(self) -> FeatureDocument: return FeatureDocument((self.to_sentence(),))
[docs] def clone(self, cls: Type[TokenContainer] = None, **kwargs) -> \ TokenContainer: params = dict(kwargs) if 'tokens' not in params: params['tokens'] = tuple( map(lambda t: t.clone(), self._tokens_val)) if 'text' not in params: params['text'] = self.text clone = super().clone(cls, **params) clone._ents = list(self._ents) return clone
[docs] def token_iter(self, *args, **kwargs) -> Iterable[FeatureToken]: if len(args) == 0: return iter(self._tokens_val) else: return it.islice(self._tokens_val, *args, **kwargs)
@property def token_len(self) -> int: return len(self._tokens_val) def _is_mwe(self) -> bool: """True when this is a span with the same indexes because it was parsed as a single token in to a multi-word expressions (i.e. entity). """ if self.token_len > 1: return self._tokens_val[0].i != self._tokens_val[1].i return False @property @persisted('_tokens_by_i_sent', transient=True) def tokens_by_i_sent(self) -> Dict[int, FeatureToken]: """A map of tokens with keys as their sentanal position offset and values as tokens. :see: :obj:`zensols.nlp.FeatureToken.i` """ by_i_sent: Dict[int, FeatureToken] = {} cnt: int = 0 tok: FeatureToken for tok in self.token_iter(): by_i_sent[tok.i_sent] = tok cnt += 1 assert cnt == self.token_len # add indexes for multi-word entities that otherwise have mappings for # only the first word of the entity ent_span: FeatureSpan for ent_span in self.entities: im: int = 0 if ent_span._is_mwe() else 1 t: FeatureToken for i, t in enumerate(ent_span): by_i_sent[t.i_sent + (im * i)] = t return frozendict(by_i_sent) def _get_tokens_by_i(self) -> Dict[int, FeatureToken]: by_i: Dict[int, FeatureToken] = {} cnt: int = 0 tok: FeatureToken for tok in self.token_iter(): by_i[tok.i] = tok cnt += 1 assert cnt == self.token_len # add indexes for multi-word entities that otherwise have mappings for # only the first word of the entity ent_span: Tuple[FeatureToken, ...] for ent_span in self.entities: im: int = 0 if ent_span._is_mwe() else 1 t: FeatureToken for i, t in enumerate(ent_span): by_i[t.i + (im * i)] = t return by_i def _get_entities(self) -> Tuple[FeatureSpan, ...]: ents: List[FeatureSpan] = [] for start, end in self._ents: ent: List[FeatureToken] = [] tok: FeatureToken for tok in self.token_iter(): if tok.idx >= start and tok.idx <= end: ent.append(tok) if len(ent) > 0: span = FeatureSpan( tokens=tuple(ent), text=' '.join(map(lambda t: t.norm, ent))) ents.append(span) return tuple(ents)
[docs] def update_indexes(self): super().update_indexes() i_sent: int ft: FeatureToken for i_sent, ft in self.tokens_by_i_sent.items(): ft.i_sent = i_sent
[docs] def update_entity_spans(self, include_idx: bool = True): split_ents: List[Tuple[int, int]] = [] fspan: FeatureSpan for fspan in self.entities: beg: int = fspan[0].idx tok: FeatureToken for tok in fspan: ls: LexicalSpan = tok.lexspan end: int = beg + len(tok.norm) if ls.begin != beg or ls.end != end: ls = LexicalSpan(beg, end) tok.lexspan = ls if include_idx: tok.idx = beg split_ents.append((beg, beg)) beg = end + 1 self._ents = split_ents self._entities.clear()
def _reindex(self, tok: FeatureToken): offset_idx: int = tok.idx super()._reindex(tok) for i, tok in enumerate(self.tokens): tok.i_sent = i self._ents = list(map( lambda t: (t[0] - offset_idx, t[1] - offset_idx), self._ents)) def _branch(self, node: FeatureToken, toks: Tuple[FeatureToken, ...], tid_to_idx: Dict[int, int]) -> \ Dict[FeatureToken, List[FeatureToken]]: clds = {} for c in node.children: cix = tid_to_idx.get(c) if cix: child = toks[cix] clds[child] = self._branch(child, toks, tid_to_idx) return clds @property @persisted('_dependency_tree', transient=True) def dependency_tree(self) -> Dict[FeatureToken, List[Dict[FeatureToken]]]: tid_to_idx: Dict[int, int] = {} toks = self.tokens for i, tok in enumerate(toks): tid_to_idx[tok.i] = i root = tuple( filter(lambda t: t.dep_ == 'ROOT' and not t.is_punctuation, toks)) if len(root) == 1: return {root[0]: self._branch(root[0], toks, tid_to_idx)} else: return {} def _from_dictable(self, recurse: bool, readable: bool, class_name_param: str = None) -> Dict[str, Any]: return {'text': self.text, 'tokens': self._from_object(self.tokens, recurse, readable)} def __len__(self) -> int: return self.token_len def __iter__(self): return self.token_iter()
# keep the dataclass semantics, but allow for a setter FeatureSpan.tokens = FeatureSpan._tokens
[docs] @dataclass(eq=False, repr=False) class FeatureSentence(FeatureSpan): """A container class of tokens that make a sentence. Instances of this class iterate over :class:`.FeatureToken` instances, and can create documents with :meth:`to_document`. """ EMPTY_SENTENCE: ClassVar[FeatureSentence]
[docs] def to_sentence(self, limit: int = sys.maxsize, contiguous_i_sent: Union[str, bool] = False, delim: str = '') -> FeatureSentence: if limit == 0: return iter(()) else: if not contiguous_i_sent: return self else: clone = self.clone(FeatureSentence) clone._set_contiguous_tokens(contiguous_i_sent, self) return clone
[docs] def to_document(self) -> FeatureDocument: return FeatureDocument((self,))
[docs] def get_overlapping_span(self, span: LexicalSpan, inclusive: bool = True) -> TokenContainer: doc = FeatureDocument(sents=(self,), text=self.text) return doc.get_overlapping_document(span, inclusive=inclusive)
FeatureSentence.EMPTY_SENTENCE = FeatureSentence(tokens=(), text='')
[docs] @dataclass(eq=False, repr=False) class FeatureDocument(TokenContainer): """A container class of tokens that make a document. This class contains a one to many of sentences. However, it can be treated like any :class:`.TokenContainer` to fetch tokens. Instances of this class iterate over :class:`.FeatureSentence` instances. :param sents: the sentences defined for this document .. document private functions .. automethod:: _combine_documents """ EMPTY_DOCUMENT: ClassVar[FeatureDocument] = None """A zero length document.""" _PERSITABLE_TRANSIENT_ATTRIBUTES: ClassVar[Set[str]] = \ TokenContainer._PERSITABLE_TRANSIENT_ATTRIBUTES | {'spacy_doc'} """Don't serialize the spacy document on persistance pickling.""" sents: Tuple[FeatureSentence, ...] = field() """The sentences that make up the document.""" text: str = field(default=None) """The original raw text of the sentence.""" spacy_doc: Doc = field(default=None, repr=False, compare=False) """The parsed spaCy document this feature set is based. As explained in :class:`~zensols.nlp.FeatureToken`, spaCy documents are heavy weight and problematic to pickle. For this reason, this attribute is dropped when pickled, and only here for ad-hoc predictions. """ def __post_init__(self): super().__post_init__() if self.text is None: self.text = ''.join(map(lambda s: s.text, self.sent_iter())) if not isinstance(self.sents, tuple): raise NLPError( f'Expecting tuple of sentences, but got {type(self.sents)}')
[docs] def set_spacy_doc(self, doc: Doc): ft_to_i: Dict[int, FeatureToken] = self.tokens_by_i st_to_i: Dict[int, Token] = {st.i: st for st in doc} i: int ft: FeatureToken for i, ft in ft_to_i.items(): st: Token = st_to_i.get(i) if st is not None: ft.spacy_token = st ss: Span for ft, ss in zip(self.sents, doc.sents): ft.spacy_span = ss self.spacy_doc = doc
def _strip(self): sent: FeatureSentence for sent in self.sents: sent.strip() self.text = self.text.strip()
[docs] def clone(self, cls: Type[TokenContainer] = None, **kwargs) -> \ TokenContainer: """ :param kwargs: if `copy_spacy` is ``True``, the spacy document is copied to the clone in addition parameters passed to new clone initializer """ params = dict(kwargs) if 'sents' not in params: params['sents'] = tuple(map(lambda s: s.clone(), self.sents)) if 'text' not in params: params['text'] = self.text if params.pop('copy_spacy', False): for ss, cs in zip(self.sents, params['sents']): cs.spacy_span = ss.spacy_span params['spacy_doc'] = self.spacy_doc return super().clone(cls, **params)
[docs] def token_iter(self, *args, **kwargs) -> Iterable[FeatureToken]: sent_toks = chain.from_iterable( map(lambda s: s.token_iter(), self.sents)) if len(args) == 0: return sent_toks else: return it.islice(sent_toks, *args, **kwargs)
[docs] def sent_iter(self, *args, **kwargs) -> Iterable[FeatureSentence]: if len(args) == 0: return iter(self.sents) else: return it.islice(self.sents, *args, **kwargs)
@property def max_sentence_len(self) -> int: """Return the length of tokens from the longest sentence in the document. """ return max(map(len, self.sent_iter())) def _sent_class(self) -> Type[FeatureSentence]: if len(self.sents) > 0: cls = self.sents[0].__class__ else: cls = FeatureSentence return cls
[docs] def to_sentence(self, limit: int = sys.maxsize, contiguous_i_sent: Union[str, bool] = False, delim: str = '') -> FeatureSentence: sents: Tuple[FeatureSentence, ...] = tuple(self.sent_iter(limit)) toks: Iterable[FeatureToken] = chain.from_iterable( map(lambda s: s.tokens, sents)) stext: str = delim.join(map(lambda s: s.text, sents)) cls: Type = self._sent_class() sent: FeatureSentence = cls(tokens=tuple(toks), text=stext) sent._ents = list(chain.from_iterable(map(lambda s: s._ents, sents))) sent._set_contiguous_tokens(contiguous_i_sent, self) return sent
def _combine_update(self, other: FeatureDocument): """Update internal data structures from another combined document. This includes merging entities. :see :class:`.CombinerFeatureDocumentParser` :see: :class:`.MappingCombinerFeatureDocumentParser` """ ss: FeatureSentence ts: FeatureSentence for ss, ts in zip(other, self): ents = set(ss._ents) | set(ts._ents) ts._ents = sorted(ents, key=lambda x: x[0])
[docs] def to_document(self) -> FeatureDocument: return self
@persisted('_id_to_sent_pw', transient=True) def _id_to_sent(self) -> Dict[int, int]: id_to_sent = {} for six, sent in enumerate(self): for tok in sent: id_to_sent[tok.idx] = six return id_to_sent def _get_tokens_by_i(self) -> Dict[int, FeatureToken]: by_i = {} for sent in self.sents: by_i.update(sent.tokens_by_i) return by_i
[docs] def update_indexes(self): sent: FeatureSentence for sent in self.sents: sent.update_indexes()
[docs] def update_entity_spans(self, include_idx: bool = True): sent: FeatureSentence for sent in self.sents: sent.update_entity_spans(include_idx) self._entities.clear()
def _reindex(self, *args): sent: FeatureSentence for sent in self.sents: sent._reindex(*args)
[docs] def clear(self): """Clear all cached state.""" super().clear() sent: FeatureSentence for sent in self.sents: sent.clear()
[docs] def sentence_index_for_token(self, token: FeatureToken) -> int: """Return index of the parent sentence having ``token``.""" return self._id_to_sent()[token.idx]
[docs] def sentence_for_token(self, token: FeatureToken) -> FeatureSentence: """Return the parent sentence that has ``token``.""" six: int = self.sentence_index_for_token(token) return self.sents[six]
[docs] def sentences_for_tokens(self, tokens: Tuple[FeatureToken, ...]) -> \ Tuple[FeatureSentence, ...]: """Find sentences having a set of tokens. :param tokens: the query used to finding containing sentences :return: the document ordered tuple of sentences containing `tokens` """ id_to_sent = self._id_to_sent() sent_ids = sorted(set(map(lambda t: id_to_sent[t.idx], tokens))) return tuple(map(lambda six: self[six], sent_ids))
[docs] def _combine_documents(self, docs: Tuple[FeatureDocument, ...], cls: Type[FeatureDocument], concat_tokens: bool, **kwargs) -> FeatureDocument: """Override if there are any fields in your dataclass. In most cases, the only time this is called is by an embedding vectorizer to batch muultiple sentences in to a single document, so the only feature that matter are the sentence level. :param docs: the documents to combine in to one :param cls: the class of the instance to create :param concat_tokens: if ``True`` each sentence of the returned document are the concatenated tokens of each respective document; otherwise simply concatenate sentences in to one document :param kwargs: additional keyword arguments to pass to the new feature document's initializer """ if concat_tokens: sents = tuple(chain.from_iterable( map(lambda d: d.combine_sentences(), docs))) else: sents = tuple(chain.from_iterable(docs)) if 'text' not in kwargs: kwargs = dict(kwargs) kwargs['text'] = ' '.join(map(lambda d: d.text, docs)) return cls(sents, **kwargs)
[docs] @classmethod def combine_documents(cls, docs: Iterable[FeatureDocument], concat_tokens: bool = True, **kwargs) -> FeatureDocument: """Coerce a tuple of token containers (either documents or sentences) in to one synthesized document. :param docs: the documents to combine in to one :param cls: the class of the instance to create :param concat_tokens: if ``True`` each sentence of the returned document are the concatenated tokens of each respective document; otherwise simply concatenate sentences in to one document :param kwargs: additional keyword arguments to pass to the new feature document's initializer """ docs = tuple(docs) if len(docs) == 0: doc = cls([], **kwargs) else: fdoc = docs[0] doc = fdoc._combine_documents( docs, type(fdoc), concat_tokens, **kwargs) return doc
@persisted('_combine_all_sentences_pw', transient=True) def _combine_all_sentences(self) -> FeatureDocument: if len(self.sents) == 1: return self else: sent_cls = self._sent_class() sent = sent_cls(self.tokens) doc = dataclasses.replace(self) doc.sents = (sent,) doc._combined = True return doc
[docs] def combine_sentences(self, sents: Iterable[FeatureSentence] = None) -> \ FeatureDocument: """Combine the sentences in this document in to a new document with a single sentence. :param sents: the sentences to combine in the new document or all if ``None`` """ if sents is None: return self._combine_all_sentences() else: sents: Tuple[FeatureSentence] = tuple(sents) cls = type(sents[0]) if len(sents) > 0 else FeatureSentence sent: FeatureSentence = cls(tuple( chain.from_iterable(map(lambda s: s.token_iter(), sents)))) return self.__class__((sent,))
def _reconstruct_sents_iter(self) -> Iterable[FeatureSentence]: sent: FeatureSentence for sent in self.sents: stoks: List[FeatureToken] = [] ip_sent: int = -1 tok: FeatureToken for tok in sent: # when the token's sentence index goes back to 0, we have a full # sentence if tok.i_sent < ip_sent: sent = FeatureSentence(tuple(stoks)) stoks = [] yield sent stoks.append(tok) ip_sent = tok.i_sent if len(stoks) > 0: yield FeatureSentence(tuple(stoks))
[docs] def uncombine_sentences(self) -> FeatureDocument: """Reconstruct the sentence structure that we combined in :meth:`combine_sentences`. If that has not been done in this instance, then return ``self``. """ if hasattr(self, '_combined'): return FeatureDocument(tuple(self._reconstruct_sents_iter())) else: return self
def _get_entities(self) -> Tuple[FeatureSpan, ...]: return tuple(chain.from_iterable( map(lambda s: s.entities, self.sents)))
[docs] def get_overlapping_span(self, span: LexicalSpan, inclusive: bool = True) -> TokenContainer: """Return a feature span that includes the lexical scope of ``span``.""" return self.get_overlapping_document(span, inclusive=inclusive)
[docs] def get_overlapping_sentences(self, span: LexicalSpan, inclusive: bool = True) -> \ Iterable[FeatureSentence]: """Return sentences that overlaps with ``span`` from this document. :param span: indicates the portion of the document to retain :param inclusive: whether to check include +1 on the end component """ for sent in self.sents: if sent.lexspan.overlaps_with(span): yield sent
[docs] def get_overlapping_document(self, span: LexicalSpan, inclusive: bool = True) -> FeatureDocument: """Get the portion of the document that overlaps ``span``. Sentences completely enclosed in a span are copied. Otherwise, new sentences are created from those tokens that overlap the span. :param span: indicates the portion of the document to retain :param inclusive: whether to check include +1 on the end component :return: a new document that contains the 0 index offset of ``span`` """ send: int = 1 if inclusive else 0 doc = self.clone() if span != self.lexspan: doc_text: str = self.text sents: List[FeatureSentence] = [] for sent in self.sent_iter(): toks: List[FeatureToken] = list( sent.get_overlapping_tokens(span, inclusive)) if len(toks) == 0: continue elif len(toks) == len(sent): pass else: text: str = doc_text[toks[0].lexspan.begin: toks[-1].lexspan.end - 1 + send] hang: int = (span.end + send) - toks[-1].lexspan.end if hang < 0: tok: FeatureToken = toks[-1] clone = tok.clone() clone.norm = tok.norm[:hang] clone.text = tok.text[:hang] toks[-1] = clone hang = toks[0].lexspan.begin - span.begin if hang < 0: hang *= -1 tok = toks[0] clone = tok.clone() clone.norm = tok.norm[hang:] clone.text = tok.text[hang:] toks[0] = clone sent = sent.clone(tokens=tuple(toks), text=text) sents.append(sent) text: str = doc_text[span.begin:span.end + send] doc.sents = tuple(sents) doc.text = text body_len = sum( 1 for _ in doc.get_overlapping_tokens(span, inclusive)) assert body_len == doc.token_len return doc
[docs] def from_sentences(self, sents: Iterable[FeatureSentence], deep: bool = False) -> FeatureDocument: """Return a new cloned document using the given sentences. :param sents: the sentences to add to the new cloned document :param deep: whether or not to clone the sentences :see: :meth:`clone` """ if deep: sents = tuple(map(lambda s: s.clone(), sents)) clone = self.clone(sents=tuple(sents,)) clone.text = ' '.join(map(lambda s: s.text, sents)) clone.spacy_doc = None return clone
[docs] def write(self, depth: int = 0, writer: TextIOBase = sys.stdout, n_sents: int = sys.maxsize, n_tokens: int = 0, include_original: bool = False, include_normalized: bool = True): """Write the document and optionally sentence features. :param n_sents: the number of sentences to write :param n_tokens: the number of tokens to print across all sentences :param include_original: whether to include the original text :param include_normalized: whether to include the normalized text """ TextContainer.write(self, depth, writer, include_original=include_original, include_normalized=include_normalized) self._write_line('sentences:', depth, writer) s: FeatureSentence for s in it.islice(self.sents, n_sents): s.write(depth + 1, writer, n_tokens=n_tokens, include_original=include_original, include_normalized=include_normalized)
def _from_dictable(self, recurse: bool, readable: bool, class_name_param: str = None) -> Dict[str, Any]: return {'text': self.text, 'sentences': self._from_object(self.sents, recurse, readable)} def __getitem__(self, key: Union[LexicalSpan, int]) -> \ Union[FeatureSentence, TokenContainer]: if isinstance(key, LexicalSpan): return self.get_overlapping_span(key, inclusive=False) return self.sents[key] def __eq__(self, other: FeatureDocument) -> bool: if self is other: return True else: a: FeatureSentence b: FeatureSentence for a, b in zip(self.sents, other.sents): if a != b: return False return len(self.sents) == len(other.sents) and \ self.text == other.text def __hash__(self) -> int: return sum(map(hash, self.sents)) def __len__(self): return len(self.sents) def __iter__(self): return self.sent_iter()
FeatureDocument.EMPTY_DOCUMENT = FeatureDocument(sents=(), text='')
[docs] @dataclass(eq=False, repr=False) class TokenAnnotatedFeatureSentence(FeatureSentence): """A feature sentence that contains token annotations. """ annotations: Tuple[Any, ...] = field(default=()) """A token level annotation, which is one-to-one to tokens."""
[docs] def to_document(self) -> FeatureDocument: return TokenAnnotatedFeatureDocument((self.to_sentence(),))
[docs] def write(self, depth: int = 0, writer: TextIOBase = sys.stdout, **kwargs): super().write(depth, writer, **kwargs) n_ann = len(self.annotations) self._write_line(f'annotations ({n_ann}): {self.annotations}', depth, writer)
[docs] @dataclass(eq=False, repr=False) class TokenAnnotatedFeatureDocument(FeatureDocument): """A feature sentence that contains token annotations. Sentences can be modeled with :class:`.TokenAnnotatedFeatureSentence` or just :class:`.FeatureSentence` since this sets the `annotations` attribute when combining. """ @persisted('_combine_sentences', transient=True) def combine_sentences(self) -> FeatureDocument: """Combine all the sentences in this document in to a new document with a single sentence. """ if len(self.sents) == 1: return self else: sent_cls = self._sent_class() anns = chain.from_iterable(map(lambda s: s.annotations, self)) sent = sent_cls(self.tokens) sent.annotations = tuple(anns) doc = dataclasses.replace(self) doc.sents = [sent] doc._combined = True return doc def _combine_documents(self, docs: Tuple[FeatureDocument, ...], cls: Type[FeatureDocument], concat_tokens: bool) -> FeatureDocument: if concat_tokens: return super()._combine_documents(docs, cls, concat_tokens) else: sents = chain.from_iterable(docs) text = ' '.join(chain.from_iterable(map(lambda s: s.text, docs))) anns = chain.from_iterable(map(lambda s: s.annotations, self)) doc = cls(tuple(sents), text) doc.sents[0].annotations = tuple(anns) return doc @property @persisted('_annotations', transient=True) def annotations(self) -> Tuple[Any, ...]: """A token level annotation, which is one-to-one to tokens.""" return tuple(chain.from_iterable(map(lambda s: s.annotations, self)))