"""A heuristic text indexing and search class.
"""
__author__ = 'Paul Landes'
from typing import Dict, List, Tuple, Iterable, Set, Type
from dataclasses import dataclass, field
import logging
import re
import textwrap as tw
from zensols.persist import persisted
from .container import (
LexicalSpan, FeatureToken, TokenContainer, FeatureDocument, FeatureSentence
)
logger = logging.getLogger(__name__)
TokenOrth = Tuple[str, FeatureToken]
[docs]
@dataclass
class FeatureDocumentIndexer(object):
"""A utility class that indexes and searches for text in potentially
whitespace mangled documents. It does this by trying more efficient means
first, then resorts to methods that are more computationaly expensive.
"""
doc: FeatureDocument = field()
"""The document to index."""
@staticmethod
def _get_norm(cont: TokenContainer, no_space: bool = False) -> str:
"""Create normalized text by removing continuous whitespace with a
single space or the empty string.
"""
repl: str = '' if no_space else ' '
return re.sub(r'\s+', repl, cont.text).strip()
@staticmethod
def _get_tok_orths(cont: TokenContainer) -> Tuple[TokenOrth, ...]:
"""Return tuples of (<orthographic text>, <token>)."""
return tuple(map(lambda t: (t.text.strip(), t),
filter(lambda t: not t.is_space, cont.token_iter())))
@classmethod
def _spans_equal(cls: Type, a: str, b: str) -> bool:
"""Return whether strings ``a`` and ``b`` are the same and log it."""
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'cmp <{cls._shorten(a)}> ?= <{cls._shorten(b)}>')
return a == b
@staticmethod
def _get_pack2ix(text: str) -> Dict[int, int]:
"""Return a dictionary of character positions in ``text`` to respective
positions in the same string without whitespace.
"""
ixs: List[int] = []
ws: Set[str] = set(' \r\n\t')
text: str = text
ix: int = 0
c: str
for c in text:
ixs.append(ix)
if c not in ws:
ix += 1
return dict(zip(ixs, range(len(ixs))))
@property
@persisted('_text2sent')
def text2sent(self) -> Dict[str, FeatureSentence]:
"""Return a dictionary of sentence normalized text to respective
sentence in :obj:`.doc`.
"""
return {self._get_norm(s): s for s in self.doc}
@property
@persisted('_doc_tok_orths')
def doc_tok_orths(self) -> Tuple[TokenOrth, ...]:
"""Reutrn tuples of (<orthographic text>, <token>)."""
return self._get_tok_orths(self.doc)
@property
@persisted('_packed_doc_text')
def packed_doc_text(self) -> str:
"""Return the document' (:obj:`doc`) no-space normalized text."""
return self._get_norm(self.doc, True)
@property
@persisted('_pack2ix')
def pack2ix(self) -> Dict[int, int]:
"""Return a dictionary of character positions in the document
(:obj:`doc`) text to respective positions in the same string without
whitespace.
"""
return self._get_pack2ix(self.doc.text.rstrip())
@staticmethod
def _shorten(s: str) -> str:
"""Shorten text used for logging."""
return tw.shorten(s, 80)
def _find_start_offset(self, query: TokenContainer,
candidate: TokenContainer) -> TokenContainer:
"""Find the sub-sentence by exact matches on the indexed text."""
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'looking for candidate in <{query.text}>')
cont: TokenContainer = None
at_toks: Iterable[FeatureToken] = filter(
lambda t: not t.is_space, query.token_iter())
ca_toks: Iterable[FeatureToken] = filter(
lambda t: not t.is_space, candidate.token_iter())
at: FeatureToken
ct: FeatureToken
for i, (at, ct) in enumerate(zip(at_toks, ca_toks)):
if at.text != ct.text:
break
if i > 0:
lspan = LexicalSpan(candidate.lexspan[0], ct.lexspan[1])
cont = self.doc.get_overlapping_span(lspan)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'found sentence start: <{cont.text}>')
return cont
def _find_by_sent_ix(self, sent_ix: int, query: TokenContainer,
targ_comp_text: str) -> TokenContainer:
"""Find the sentence by an index (when provided)."""
sent: TokenContainer = None
query_text: str = query.text
if sent_ix < len(self.doc):
candidate: TokenContainer = self.doc[sent_ix]
cand_norm: str = self._get_norm(candidate)
sent = self.text2sent.get(targ_comp_text)
if sent is None and candidate is not None:
if cand_norm.startswith(query_text) or \
query_text.startswith(cand_norm):
sent = self._find_start_offset(query, candidate)
if sent is not None and \
not self._spans_equal(targ_comp_text, self._get_norm(sent)):
sent = None
return sent
def _find_doc_offset(self, query: TokenContainer,
targ_comp_text: str) -> TokenContainer:
"""Find the sub-sentence by finding subsequences of token text."""
dorth: Tuple[TokenOrth, ...] = self.doc_tok_orths
dtoks: Tuple[str, ...] = tuple(map(
lambda t: t[0], self.doc_tok_orths))
atoks: Tuple[str, ...] = tuple(map(
lambda t: t[0], self._get_tok_orths(query)))
alen: int = len(atoks)
dpos: int = -1
if logger.isEnabledFor(logging.TRACE):
logger.trace(f'atoks: <{atoks}>')
if logger.isEnabledFor(logging.TRACE):
logger.trace(f'dtoks: <{dtoks}>')
for i in range(len(dtoks)):
if dtoks[i:i + alen] == atoks:
dpos = i
break
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'find offset: pos: {dpos}')
if dpos > -1:
lspan = LexicalSpan(dorth[dpos][1].lexspan[0],
dorth[dpos + alen - 1][1].lexspan[1])
span: TokenContainer = self.doc.get_overlapping_span(lspan)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'found in doc: <{self._shorten(span.text)}>')
if not self._spans_equal(targ_comp_text, self._get_norm(span)):
span = None
return span
def _find_by_char(self, query: TokenContainer) -> TokenContainer:
"""Find the sub-span by removing all space, which is needed in cases
where parsed tokens (such as entites and MIMIC redacted tokens) have
space.
"""
span: TokenContainer = None
targ_comp: str = self._get_norm(query, True)
doc_comp: str = self.packed_doc_text
pack_ix: int = doc_comp.find(targ_comp)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'find by char: packed text index: {pack_ix}')
logger.debug(f'comp span: <{self._shorten(query.text)}>')
if logger.isEnabledFor(logging.TRACE):
logger.trace(f'annotated: <{targ_comp}>')
logger.trace(f'doc: <{doc_comp}>')
if pack_ix > -1:
pack2ix: Dict[int, int] = self.pack2ix
start_ix: int = pack2ix.get(pack_ix)
end_ix: int = pack2ix.get(pack_ix + len(targ_comp) - 1)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'span: ({start_ix}, {end_ix})')
if start_ix is not None and end_ix is not None:
lspan = LexicalSpan(start_ix, end_ix)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'doc span: {lspan}')
span = self.doc.get_overlapping_span(lspan)
span_comp: str = self._get_norm(span, True)
if not self._spans_equal(targ_comp, span_comp):
span = None
return span
[docs]
def find(self, query: TokenContainer, sent_ix: int = None) -> \
TokenContainer:
"""Find a sentence in document :obj:`doc`. If a sentence index is
given, it treats the query as a sentence to find in :obj:`doc`.
:param query: the sentence to find in :obj:`doc`
:param sent_ix: the sentence index hint if available
:return: the matched text from :obj:`doc`
"""
targ_comp_text: str = self._get_norm(query)
span: TokenContainer = None
if sent_ix is not None:
span = self._find_by_sent_ix(sent_ix, query, targ_comp_text)
if span is None:
span = self._find_doc_offset(query, targ_comp_text)
if span is None:
span = self._find_by_char(query)
return span