"""Normalize text and map Spacy documents.
"""
__author__ = 'Paul Landes'
from typing import List, Iterable, Tuple, Union, Dict
from dataclasses import dataclass, field
from abc import abstractmethod, ABC
import logging
import re
from itertools import chain
from spacy.tokens import Token, Span, Doc
from zensols.config import ConfigFactory
from . import LexicalSpan
logger = logging.getLogger(__name__)
[docs]
@dataclass
class TokenNormalizer(object):
"""Base token extractor returns tuples of tokens and their normalized
version.
Configuration example::
[default_token_normalizer]
class_name = zensols.nlp.TokenNormalizer
embed_entities = False
"""
embed_entities: bool = field(default=True)
"""Whether or not to replace tokens with their respective named entity
version.
"""
def __embed_entities(self, doc: Doc):
"""For each token, return the named entity form if it exists.
:param doc: the spacy document to iterate over
"""
tlen = len(doc)
ents = {}
for ent in doc.ents:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'adding entity start: {ent.start} -> {ent}')
ents[ent.start] = ent
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'entities: {ents}')
i = 0
while i < tlen:
if i in ents:
ent = ents[i]
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'adding entity: {ent}')
yield ent
i = ent.end
else:
tok = doc[i]
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'adding token: {tok}')
yield tok
i += 1
def _to_token_tuple(self, doc: Doc) -> Iterable[Tuple[Token, str]]:
"Normalize the document in to (token, normal text) tuples."
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'embedding entities: {self.embed_entities}')
if self.embed_entities:
toks = self.__embed_entities(doc)
else:
toks = doc
toks = map(lambda t: (t, t.orth_,), toks)
return toks
def _map_tokens(self, token_tups: Iterable[Tuple[Token, str]]) -> \
Iterable[Tuple[Token, str]]:
"""Map token tuples in sub classes.
:param token_tups: tuples generated from ``_to_token_tuple``
"""
return None
[docs]
def normalize(self, doc: Doc) -> Iterable[Tuple[Token, str]]:
"""Normalize Spacey document ``doc`` in to (token, normal text) tuples.
"""
tlist = self._to_token_tuple(doc)
maps = self._map_tokens(tlist)
if maps is not None:
tlist = tuple(maps)
return iter(tlist)
def __str__(self):
if hasattr(self, 'name'):
name = self.name
else:
name = type(self).__name__
return f'{name}: embed={self.embed_entities}'
def __repr__(self):
return self.__str__()
[docs]
@dataclass
class TokenMapper(ABC):
"""Abstract class used to transform token tuples generated from
:meth:`.TokenNormalizer.normalize`.
"""
[docs]
@abstractmethod
def map_tokens(self, token_tups: Iterable[Tuple[Token, str]]) -> \
Iterable[Tuple[Token, str]]:
"""Transform token tuples.
"""
pass
[docs]
@dataclass
class SplitTokenMapper(TokenMapper):
"""Splits the normalized text on a per token basis with a regular
expression.
Configuration example::
[split_token_mapper]
class_name = zensols.nlp.SplitTokenMapper
regex = r'[ ]'
"""
regex: Union[re.Pattern, str] = field(default=r'[ ]')
"""The regular expression to use for splitting tokens."""
def __post_init__(self):
if not isinstance(self.regex, re.Pattern):
self.regex = re.compile(eval(self.regex))
[docs]
def map_tokens(self, token_tups: Iterable[Tuple[Token, str]]) -> \
Iterable[Tuple[Token, str]]:
rg = self.regex
return map(lambda t: map(lambda s: (t[0], s), re.split(rg, t[1])),
token_tups)
[docs]
@dataclass
class JoinTokenMapper(object):
"""Join tokens based on a regular expression. It does this by creating
spans in the spaCy component (first in the tuple) and using the span text as
the normalized token.
"""
regex: Union[re.Pattern, str] = field(default=r'[ ]')
"""The regular expression to use for joining tokens"""
separator: str = field(default=None)
"""The string used to separate normalized tokens in matches. If ``None``,
use the token text.
"""
def __post_init__(self):
if not isinstance(self.regex, re.Pattern):
self.regex = re.compile(eval(self.regex))
def _loc(self, doc: Doc, tok: Union[Token, Span]) -> Tuple[int, int]:
if isinstance(tok, Span):
etok = doc[tok.end - 1]
start = doc[tok.start].idx
end = etok.idx + len(etok.orth_)
else:
start = tok.idx
end = tok.idx + len(tok.orth_)
return start, end
[docs]
def map_tokens(self, token_tups: Iterable[Tuple[Token, str]]) -> \
Iterable[Tuple[Token, str]]:
def map_match(t: Token) -> str:
tup = tix2tup.get(t.idx)
if tup is not None:
return tup[1]
tups = tuple(token_tups)
stok: Token = tups[0][0]
etok: Token = tups[-1][0]
doc: Doc = stok.doc
src: Span = doc.char_span(stok.idx, etok.idx + len(etok.orth_))
matches: List[Span] = []
tix2tup: Dict[int, int]
if self.separator is not None:
tix2tup = {doc[t[0].start].idx
if isinstance(t[0], Span) else t[0].idx: t
for t in tups}
for match in re.finditer(self.regex, src.text):
start, end = match.span()
span: Span = doc.char_span(start, end)
# this is a Span object or None if match doesn't map to valid token
# sequence
if span is not None:
matches.append(span)
if len(matches) > 0:
mtups = []
mix = 0
mlen = len(matches)
stack = list(tups)
while len(stack) > 0:
tup = stack.pop(0)
tok = tup[0]
tok_loc = LexicalSpan.from_token(tok)
next_tup = tup
if mix < mlen:
match: Span = matches[mix]
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'matched: {match}')
mloc = LexicalSpan.from_token(match)
if mloc.overlaps_with(tok_loc):
mix += 1
match_text = match.text
if self.separator is not None:
norms = map(map_match, doc[match.start:match.end])
norms = filter(lambda t: t is not None, norms)
match_text = self.separator.join(norms)
next_tup = (match, match_text)
while len(stack) > 0:
tup = stack.pop(0)
tok = tup[0]
tok_loc = self._loc(doc, tok)
if not mloc.overlaps_with(tok_loc):
stack.insert(0, tup)
break
mtups.append(next_tup)
tups = (mtups,)
return tups
[docs]
@dataclass
class SplitEntityTokenMapper(TokenMapper):
"""Splits embedded entities (or any :class:`~spacy.token.span.Span`) in to
separate tokens. This is useful for splitting up entities as tokens after
being grouped with :obj:`.TokenNormalizer.embed_entities`. Note,
``embed_entities`` must be ``True`` to create the entities as they come
from spaCy as spans. This then can be used to create
:class:`.SpacyFeatureToken` with spans that have the entity.
"""
token_unit_type: bool = field(default=False)
"""Whether to generate tokens for each split span or a one token span."""
copy_attributes: Tuple[str, ...] = field(default=('label', 'label_'))
"""Attributes to copy from the span to the split token."""
[docs]
def map_tokens(self, token_tups: Iterable[Tuple[Token, str]]) -> \
Iterable[Tuple[Token, str]]:
def map_tup(tup):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'setm: mapping tup: {tup}')
if isinstance(tup[0], Span):
span = tup[0]
for tix in range(span.end - span.start):
if not token_unit_type:
tok = span[tix:tix + 1]
else:
tok = span[tix]
for attr in cp_attribs:
setattr(tok, attr, getattr(span, attr))
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'setm: split: {tok}')
yield (tok, tok.orth_)
else:
yield tup
token_unit_type = self.token_unit_type
cp_attribs = self.copy_attributes
return map(map_tup, token_tups)
[docs]
@dataclass
class LemmatizeTokenMapper(TokenMapper):
"""Lemmatize tokens and optional remove entity stop words.
**Important:** This completely ignores the normalized input token string
and essentially just replaces it with the lemma found in the token
instance.
Configuration example::
[lemma_token_mapper]
class_name = zensols.nlp.LemmatizeTokenMapper
:param lemmatize: lemmatize if ``True``; this is an option to allow (only)
the removal of the first top word in named entities
:param remove_first_stop: whether to remove the first top word in named
entities when ``embed_entities`` is ``True``
"""
lemmatize: bool = field(default=True)
remove_first_stop: bool = field(default=False)
def _lemmatize(self, tok_or_ent):
if isinstance(tok_or_ent, Token):
stok = tok_or_ent.lemma_
else:
if self.remove_first_stop and tok_or_ent[0].is_stop:
tok_or_ent = tok_or_ent[1:]
stok = tok_or_ent.text.lower()
return stok
[docs]
def map_tokens(self, token_tups: Iterable[Tuple[Token, str]]) -> \
Iterable[Tuple[Token, str]]:
return (map(lambda x: (x[0], self._lemmatize(x[0])), token_tups),)
[docs]
@dataclass
class FilterTokenMapper(TokenMapper):
"""Filter tokens based on token (Spacy) attributes.
Configuration example::
[filter_token_mapper]
class_name = zensols.nlp.FilterTokenMapper
remove_stop = True
remove_punctuation = True
"""
remove_stop: bool = field(default=False)
remove_space: bool = field(default=False)
remove_pronouns: bool = field(default=False)
remove_punctuation: bool = field(default=False)
remove_determiners: bool = field(default=False)
def __post_init__(self):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'created {self.__class__}: ' +
f'remove_stop: {self.remove_stop}, ' +
f'remove_space: {self.remove_space}, ' +
f'remove_pronouns: {self.remove_pronouns}, ' +
f'remove_punctuation: {self.remove_punctuation}, ' +
f'remove_determiners: {self.remove_determiners}')
def _filter(self, tok_or_ent_tup):
tok_or_ent = tok_or_ent_tup[0]
keep = False
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'filter: {tok_or_ent} ({type(tok_or_ent)})')
if isinstance(tok_or_ent, Token):
t = tok_or_ent
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'token {t}: l={len(t)}, ' +
f's={t.is_stop}, p={t.is_punct}')
if (not self.remove_stop or not t.is_stop) and \
(not self.remove_space or not t.is_space) and \
(not self.remove_pronouns or not t.pos_ == 'PRON') and \
(not self.remove_punctuation or not t.is_punct) and \
(not self.remove_determiners or not t.tag_ == 'DT') and \
len(t) > 0:
keep = True
else:
keep = True
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'filter: keeping={keep}')
return keep
[docs]
def map_tokens(self, token_tups: Iterable[Tuple[Token, str]]) -> \
Iterable[Tuple[Token, str]]:
if logger.isEnabledFor(logging.DEBUG):
logger.debug('filter mapper: map_tokens')
return (filter(self._filter, token_tups),)
[docs]
@dataclass
class FilterRegularExpressionMapper(TokenMapper):
"""Filter tokens based on normalized form regular expression.
"""
regex: Union[re.Pattern, str] = field(default=r'[ ]+')
"""The regular expression to use for splitting tokens."""
invert: bool = field(default=False)
"""If ``True`` then remove rather than keep everything that matches.."""
def __post_init__(self):
if not isinstance(self.regex, re.Pattern):
self.regex = re.compile(eval(self.regex))
def _filter(self, tup: Tuple[Token, str]):
token, norm = tup
match = self.regex.match(norm) is not None
if self.invert:
match = not match
return match
[docs]
def map_tokens(self, token_tups: Iterable[Tuple[Token, str]]) -> \
Iterable[Tuple[Token, str]]:
if logger.isEnabledFor(logging.DEBUG):
logger.debug('filter mapper: map_tokens')
return (filter(self._filter, token_tups),)
[docs]
@dataclass
class SubstituteTokenMapper(TokenMapper):
"""Replace a regular expression in normalized token text.
Configuration example::
[subs_token_mapper]
class_name = zensols.nlp.SubstituteTokenMapper
regex = r'[ \\t]'
replace_char = _
"""
regex: str = field(default='')
"""The regular expression to use for substitution."""
replace_char: str = field(default='')
"""The character that is used for replacement."""
def __post_init__(self):
self.regex = re.compile(eval(self.regex))
[docs]
def map_tokens(self, token_tups: Iterable[Tuple[Token, str]]) -> \
Iterable[Tuple[Token, str]]:
return (map(lambda x: (x[0], re.sub(
self.regex, self.replace_char, x[1])), token_tups),)
[docs]
@dataclass
class LambdaTokenMapper(TokenMapper):
"""Use a lambda expression to map a token tuple.
This is handy for specialized behavior that can be added directly to a
configuration file.
Configuration example::
[lc_lambda_token_mapper]
class_name = zensols.nlp.LambdaTokenMapper
map_lambda = lambda x: (x[0], f'<{x[1].lower()}>')
"""
add_lambda: str = field(default=None)
map_lambda: str = field(default=None)
def __post_init__(self):
if self.add_lambda is None:
self.add_lambda = lambda x: ()
else:
self.add_lambda = eval(self.add_lambda)
if self.map_lambda is None:
self.map_lambda = lambda x: x
else:
self.map_lambda = eval(self.map_lambda)
[docs]
def map_tokens(self, token_tups: Iterable[Tuple[Token, str]]) -> \
Iterable[Tuple[Token, str]]:
return (map(self.map_lambda, token_tups),)
[docs]
@dataclass
class MapTokenNormalizer(TokenNormalizer):
"""A normalizer that applies a sequence of :class:`.TokenMapper` instances
to transform the normalized token text. The members of the
``mapper_class_list`` are sections of the application configuration.
Configuration example::
[map_filter_token_normalizer]
class_name = zensols.nlp.MapTokenNormalizer
mapper_class_list = list: filter_token_mapper
"""
config_factory: ConfigFactory = field(default=None)
"""The factory that created this instance and used to create the mappers.
"""
mapper_class_list: List[str] = field(default_factory=list)
"""The configuration section names to create from the application
configuration factory, which is added to :obj:`mappers`. This field
settings is deprecated; use :obj:`mappers` instead.
"""
def __post_init__(self):
self.mappers = list(map(self.config_factory, self.mapper_class_list))
def _map_tokens(self, token_tups: Iterable[Tuple[Token, str]]) -> \
Iterable[Tuple[Token, str]]:
for mapper in self.mappers:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'mapping token_tups with {mapper}')
token_tups = chain.from_iterable(mapper.map_tokens(token_tups))
return token_tups
def __str__(self) -> str:
s = super().__str__()
maps = ', '.join(map(str, self.mapper_class_list))
return f'{s}, {maps}'