Source code for zensols.nlp.chunker

"""Clasess that segment text from :class:`.FeatureDocument` instances, but
retain the original structure by preserving sentence and token indicies.

"""
__author__ = 'Paul Landes'

from typing import ClassVar, Tuple, List, Iterable, Optional
from dataclasses import dataclass, field
from abc import ABCMeta, abstractmethod
import textwrap as tw
import re
import logging
from . import LexicalSpan, TokenContainer, FeatureSentence, FeatureDocument

logger = logging.getLogger(__name__)


[docs] @dataclass class Chunker(object, metaclass=ABCMeta): """Splits :class:`~zensols.nlp.container.TokenContainer` instances using regular expression :obj:`pattern`. Matched container (implementation of the container is based on the subclass) are given if used as an iterable. The document of all parsed containers is given if used as a callable. """ doc: FeatureDocument = field() """The document that contains the entire text (i.e. :class:`.Note`).""" pattern: re.Pattern = field() """The chunk regular expression. There should be a default for each subclass. """ sub_doc: FeatureDocument = field(default=None) """A lexical span created document of :obj:`doc`, which defaults to the global document. Providing this and :obj:`char_offset` allows use of a document without having to use :meth:`.TokenContainer.reindex`. """ char_offset: int = field(default=None) """The 0-index absolute character offset where :obj:`sub_doc` starts. However, if the value is -1, then the offset is used as the begging character offset of the first token in the :obj:`sub_doc`. """ def __post_init__(self): if self.sub_doc is None: self.sub_doc = self.doc def _get_coff(self) -> int: coff: int = self.char_offset if coff is None: coff = self.doc.lexspan.begin if coff == -1: coff = next(self.sub_doc.token_iter()).lexspan.begin return coff def __iter__(self) -> Iterable[TokenContainer]: def match_to_span(m: re.Match) -> LexicalSpan: s: Tuple[int, int] = m.span(1) return LexicalSpan(s[0] + coff, s[1] + coff) def trunc(s: str) -> str: sh: str = tw.shorten(s, 50).replace('\n', '\\n') sh = f'<<{s}>>' return sh conts: List[TokenContainer] = [] if self.sub_doc.token_len > 0: # offset from the global document (if a subdoc from get_overlap...) coff: int = self._get_coff() # the text to match on, or ``gtext`` if there is no subdoc subdoc_text: str = self.sub_doc.text # the global document gtext: str = self.doc.text # all regular expression matches found in ``subdoc_text`` matches: List[LexicalSpan] = \ list(map(match_to_span, self.pattern.finditer(subdoc_text))) # guard on no-matches-found edge case if len(matches) > 0: subdoc_len: int = len(subdoc_text) + coff start: int = matches[0].begin end: int = matches[-1].end if logger.isEnabledFor(logging.DEBUG): logger.debug(f'coff: {coff}, start={start}, end={end}') # add a start front content match when not match on first char if start > coff: fms = LexicalSpan(coff, start - 1) matches.insert(0, fms) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'adding start match: {start}, {coff}: ' + f'{gtext[fms[0]:fms[1]]}') # and any trailing content when match doesn't include last char if subdoc_len > end: matches.append(LexicalSpan(end, subdoc_len)) # treat matches as a LIFO stack while len(matches) > 0: # pop the first match in the stack span: LexicalSpan = matches.pop(0) cont: TokenContainer = None if logger.isEnabledFor(logging.DEBUG): st: str = trunc(gtext[span[0]:span[1]]) logger.debug( f'span begin: {span.begin}, start: {start}, ' + f'match {span}: {st}') if span.begin > start: # when the match comes after the last ending marker, # added this content to the last match entry cont = self._create_container( LexicalSpan(start, span.begin - 1)) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'create (trailing): {cont}') # content exists if it's text we keep (ie non-space) if cont is not None: if len(conts) > 0: # tack on to the last entry since it trailed # (probably after a newline) conts[-1] = self._merge_containers( conts[-1], cont) else: # add a new entry conts.append(cont) # indcate we already added the content so we don't # double add it cont = None # we dealt with the last trailling content from the # previous span, but we haven't taken care of this span matches.insert(0, span) else: # create and add the content for the exact match (again, # we skip empty space etc.) cont = self._create_container(span) if logger.isEnabledFor(logging.DEBUG): st: str = trunc(gtext[span[0]:span[1]]) logger.debug(f'create (not empty) {st} -> {cont}') if cont is not None: conts.append(cont) # walk past this span to detect unmatched content for the # next iteration (if there is one) start = span.end + 1 # adhere to iterable contract for potentially more dynamic subclasses return iter(conts) def _merge_containers(self, a: TokenContainer, b: TokenContainer) -> \ TokenContainer: """Merge two token containers into one, which is used for straggling content tacked to previous entries for text between matches. """ if logger.isEnabledFor(logging.DEBUG): logger.debug(f'merging: {a}||{b}') return FeatureDocument((a, b)).to_sentence() @abstractmethod def _create_container(self, span: LexicalSpan) -> Optional[TokenContainer]: """Create content from :obj:`doc` and :obj:`sub_doc` as a subdocument for span ``span``. """ pass
[docs] @abstractmethod def to_document(self, conts: Iterable[TokenContainer]) -> FeatureDocument: pass
def __call__(self) -> FeatureDocument: return self.to_document(self)
[docs] @dataclass class ParagraphChunker(Chunker): """A :class:`.Chunker` that splits list item and enumerated lists into separate sentences. Matched sentences are given if used as an iterable. For this reason, this class will probably be used as an iterable since clients will usually want just the separated paragraphs as documents """ DEFAULT_SPAN_PATTERN: ClassVar[re.Pattern] = re.compile( r'(.+?)(?:(?=\n{2})|\Z)', re.MULTILINE | re.DOTALL) """The default paragraph regular expression, which uses two newline positive lookaheads to avoid matching on paragraph spacing. """ pattern: re.Pattern = field(default=DEFAULT_SPAN_PATTERN) """The list regular expression, which defaults to :obj:`DEFAULT_SPAN_PATTERN`. """ def _merge_containers(self, a: TokenContainer, b: TokenContainer) -> \ TokenContainer: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'merging: {a}||{b}') # return documents to keep as much of the sentence structure as possible return FeatureDocument.combine_documents((a, b)) def _create_container(self, span: LexicalSpan) -> Optional[TokenContainer]: doc: FeatureDocument = self.doc.get_overlapping_document(span) slen: int = len(doc.sents) # remove double newline empty sentences, happens at beginning or ending sents: Tuple[FeatureSentence] = tuple( filter(lambda s: len(s) > 0, map(lambda x: x.strip(), doc))) if slen != len(sents): # when we find surrounding whitespace, create a (sentence) stripped # document doc = FeatureDocument(sents=tuple(sents), text=doc.text.strip()) if len(doc.sents) > 0: # we still need to strip per sentence for whitespace added at the # sentence level return doc.strip()
[docs] def to_document(self, conts: Iterable[TokenContainer]) -> FeatureDocument: """It usually makes sense to use instances of this class as an iterable rather than this (see class docs). """ return FeatureDocument.combine_documents(conts)
[docs] @dataclass class ListItemChunker(Chunker): """A :class:`.Chunker` that splits list item and enumerated lists into separate sentences. Matched sentences are given if used as an iterable. This is useful when spaCy sentence chunks lists incorrectly and finds lists using a regular expression to find lines that star with a decimal, or list characters such as ``-`` and ``+``. """ DEFAULT_SPAN_PATTERN: ClassVar[re.Pattern] = re.compile( r'^((?:[0-9-+]+|[a-zA-Z]+:)[^\n]+)$', re.MULTILINE) """The default list item regular expression, which uses an initial character item notation or an initial enumeration digit. """ pattern: re.Pattern = field(default=DEFAULT_SPAN_PATTERN) """The list regular expression, which defaults to :obj:`DEFAULT_SPAN_PATTERN`. """ def _create_container(self, span: LexicalSpan) -> Optional[TokenContainer]: doc: FeatureDocument = self.doc.get_overlapping_document(span) sent: FeatureSentence = doc.to_sentence() # skip empty sentences, usually (spaCy) sentence chunked from text with # two newlines in a row sent.strip() if sent.token_len > 0: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'narrowed sent: <{sent.text}>') return sent
[docs] def to_document(self, conts: Iterable[TokenContainer]) -> FeatureDocument: sents: Tuple[FeatureSentence] = tuple(conts) if logger.isEnabledFor(logging.DEBUG): logger.debug('creating doc from:') for s in sents: logger.debug(f' {s}') return FeatureDocument( sents=sents, text='\n'.join(map(lambda s: s.text, sents)))