Source code for zensols.nlp.component

"""Components useful for reuse.

"""
__author__ = 'Paul Landes'

from typing import List, Tuple, Dict, Any, Union, Sequence, Optional
from dataclasses import dataclass, field
import logging
import re
from itertools import chain
import json
from spacy.tokenizer import Tokenizer
from spacy.language import Language
from spacy.tokens.doc import Doc
from spacy.matcher import Matcher
from spacy.tokens import Span, Token
from . import NLPError

logger = logging.getLogger(__name__)


[docs] @Language.component('remove_sent_boundaries') def create_remove_sent_boundaries_component(doc: Doc): """Remove sentence boundaries from tokens. :param doc: the spaCy document to remove sentence boundaries """ for token in doc: # this will entirely disable spaCy's sentence detection token.is_sent_start = False return doc
[docs] @dataclass class EntityRecognizer(object): """Base class regular expression and spaCy match patterns named entity recognizer. Both subclasses allow for an optional label for each respective pattern or regular expression. If the label is provided, then the match is made a named entity with a label. In any case, a span is created on the token, and in some cases, retokenized. """ nlp: Language = field() """The NLP model.""" name: str = field() """The component name.""" import_file: Optional[str] = field() """An optional JSON file used to append the pattern configuration.""" patterns: List = field() """A list of the regular expressions to find.""" def __post_init__(self): if self.import_file is not None: self._append_config(self.patterns) def _append_config(self, patterns: List): if logger.isEnabledFor(logging.DEBUG): logger.debug(f'creating regex component for: {self.name}') if self.import_file is not None: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'reading file config file: {self.import_file}') with open(self.import_file) as f: add_pats = json.load(f) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'adding to patterns: {add_pats}') patterns.extend(add_pats) def _make_span(self, doc: Doc, start: int, end: int, label: str, is_char: bool, retok: bool): span: Span if is_char: if label is None: span = doc.char_span(start, end) else: span = doc.char_span(start, end, label=label) else: if label is None: span = Span(doc, start, end) else: span = Span(doc, start, end, label=label) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'merge span ({start}, {end}) for {label}: {span}') if span is not None: # this is a span object or none if match doesn't map to valid token # sequence if logger.isEnabledFor(logging.DEBUG): logger.debug(f'match: {span.text}') if label is not None: doc.ents += (span,) if retok: # https://github.com/explosion/spaCy/discussions/4806 with doc.retokenize() as retokenizer: # Iterate over all spans and merge them into one # token. This is done after setting the entities – # otherwise, it would cause mismatched indices! retokenizer.merge(span) def _split_span(self, doc: Doc, span_ix: List[Tuple[int, int]], label: str): if label is not None: raise NLPError('Labels for splitting spans not yet supported') doc_text: str = doc.text toks: Token = tuple(filter(lambda t: t.idx == span_ix[0][0], doc)) if len(toks) == 0: stext: str = doc_text[span_ix[0][0]:span_ix[-1][1]] raise NLPError( f'Could not find token {stext} at {span_ix[0][0]} in {doc}') tok: Tuple[Token] = toks[0] orths: Tuple[str] = tuple(map(lambda s: doc_text[s[0]:s[1]], span_ix)) heads: List[Token] = [tok] * len(span_ix) #attrs = {'LABEL': [label] * len(span_ix)} if logger.isEnabledFor(logging.DEBUG): logger.debug(f'orths: {orths}, heads={heads}') with doc.retokenize() as retokenizer: retokenizer.split(tok, orths, heads=heads)
[docs] @dataclass class RegexEntityRecognizer(EntityRecognizer): """Merges regular expression matches as a :class:`~spacy.tokens.Span`. After matches are found, re-tokenization merges them in to one token per match. """ patterns: List[Tuple[str, List[re.Pattern]]] = field() """A list of the regular expressions to find.""" def __call__(self, doc: Doc) -> Doc: for label, regex_list in self.patterns: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'label: {label}, regex: {regex_list}') matches = map(lambda r: re.finditer(r, doc.text), regex_list) for match in chain.from_iterable(matches): start, end = match.span() self._make_span(doc, start, end, label, True, True) return doc
[docs] @Language.factory( 'regexner', default_config={'patterns': [], 'path': None}) def create_regexner_component( nlp: Language, name: str, patterns: Sequence[Tuple[Optional[str], Sequence[Union[re.Pattern, str]]]], path: str = None): def map_rlist(rlist): rl = map(lambda x: x if isinstance(x, re.Pattern) else re.compile(x), rlist) return tuple(rl) regexes = map(lambda x: (x[0], map_rlist(x[1])), patterns) return RegexEntityRecognizer(nlp, name, path, list(regexes))
[docs] @dataclass class PatternEntityRecognizer(EntityRecognizer): """Adds entities based on regular epxressions. :see: `Rule matching <https://spacy.io/usage/rule-based-matching>`_ """ _NULL_LABEL = '<_>' patterns: List[Tuple[str, List[List[Dict[str, Any]]]]] = field() """The patterns given to the :class:`~spacy.matcher.Matcher`.""" def __post_init__(self): super().__post_init__() self._matchers = [] self._labels = {} for label, patterns in self.patterns: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'label: {label}') logger.debug(f'pattern: {patterns}') matcher = Matcher(self.nlp.vocab) label = self._NULL_LABEL if label is None else label matcher.add(label, patterns, on_match=self._add_event_ent) self._labels[id(matcher)] = label self._matchers.append(matcher) def _add_event_ent(self, matcher, doc, i, matches): match_id, start, end = matches[i] label = self._labels[id(matcher)] label = None if label == self._NULL_LABEL else label self._make_span(doc, start, end, label, False, False) def __call__(self, doc: Doc) -> Doc: for matcher in self._matchers: match: List[Tuple[int, int, int]] = matcher(doc) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'matched: {match}') logger.debug(f'doc ents: {doc.ents}') return doc
[docs] @Language.factory( 'patner', default_config={'patterns': [], 'path': None}) def create_patner_component( nlp: Language, name: str, patterns: List[Tuple[Optional[str], List[List[Dict[str, Any]]]]], path: str = None): return PatternEntityRecognizer(nlp, name, path, list(patterns))
[docs] @Language.factory('whitespace_tokenizer') def create_whitespace_tokenizer_component(nlp: Language, name: str): nlp.tokenizer = Tokenizer(nlp.vocab, token_match=re.compile(r'\S+').match) # this factory only configures the spaCy model, so return the identity return lambda x: x
[docs] @dataclass class RegexSplitter(EntityRecognizer): """Splits on regular expressions.""" patterns: List[Tuple[str, List[re.Pattern]]] = field() """A list of the regular expressions to find.""" def __call__(self, doc: Doc) -> Doc: for label, regex_list in self.patterns: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'label: {label}, regex: {regex_list}') matches = map(lambda r: re.finditer(r, doc.text), regex_list) match: re.Match for match in chain.from_iterable(matches): if logger.isEnabledFor(logging.DEBUG): logger.debug(f'match: {match}') spans: List[int, int] = [] for i in range(1, len(match.groups()) + 1): s: Tuple[int, int] = match.span(i) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'match: {s} ({match.group(i)})') spans.append(s) self._split_span(doc, spans, label) return doc
[docs] @Language.factory( 'regexsplit', default_config={'patterns': [], 'path': None}) def create_regexsplit_component( nlp: Language, name: str, patterns: Sequence[Tuple[Optional[str], Sequence[Union[re.Pattern, str]]]], path: str = None): def map_rlist(rlist): rl = map(lambda x: x if isinstance(x, re.Pattern) else re.compile(x), rlist) return tuple(rl) regexes = map(lambda x: (x[0], map_rlist(x[1])), patterns) return RegexSplitter(nlp, name, path, list(regexes))