"""A heuristic text indexing and search class.
"""
__author__ = 'Paul Landes'
from typing import Dict, List, Tuple, Iterable, Set, Type
from dataclasses import dataclass, field
import logging
import re
import textwrap as tw
from zensols.persist import persisted
from .container import (
    LexicalSpan, FeatureToken, TokenContainer, FeatureDocument, FeatureSentence
)
logger = logging.getLogger(__name__)
TokenOrth = Tuple[str, FeatureToken]
[docs]
@dataclass
class FeatureDocumentIndexer(object):
    """A utility class that indexes and searches for text in potentially
    whitespace mangled documents.  It does this by trying more efficient means
    first, then resorts to methods that are more computationaly expensive.
    """
    doc: FeatureDocument = field()
    """The document to index."""
    @staticmethod
    def _get_norm(cont: TokenContainer, no_space: bool = False) -> str:
        """Create normalized text by removing continuous whitespace with a
        single space or the empty string.
        """
        repl: str = '' if no_space else ' '
        return re.sub(r'\s+', repl, cont.text).strip()
    @staticmethod
    def _get_tok_orths(cont: TokenContainer) -> Tuple[TokenOrth, ...]:
        """Return tuples of (<orthographic text>, <token>)."""
        return tuple(map(lambda t: (t.text.strip(), t),
                         filter(lambda t: not t.is_space, cont.token_iter())))
    @classmethod
    def _spans_equal(cls: Type, a: str, b: str) -> bool:
        """Return whether strings ``a`` and ``b`` are the same and log it."""
        if logger.isEnabledFor(logging.DEBUG):
            logger.debug(f'cmp <{cls._shorten(a)}> ?= <{cls._shorten(b)}>')
        return a == b
    @staticmethod
    def _get_pack2ix(text: str) -> Dict[int, int]:
        """Return a dictionary of character positions in ``text`` to respective
        positions in the same string without whitespace.
        """
        ixs: List[int] = []
        ws: Set[str] = set(' \r\n\t')
        text: str = text
        ix: int = 0
        c: str
        for c in text:
            ixs.append(ix)
            if c not in ws:
                ix += 1
        return dict(zip(ixs, range(len(ixs))))
    @property
    @persisted('_text2sent')
    def text2sent(self) -> Dict[str, FeatureSentence]:
        """Return a dictionary of sentence normalized text to respective
        sentence in :obj:`.doc`.
        """
        return {self._get_norm(s): s for s in self.doc}
    @property
    @persisted('_doc_tok_orths')
    def doc_tok_orths(self) -> Tuple[TokenOrth, ...]:
        """Reutrn tuples of (<orthographic text>, <token>)."""
        return self._get_tok_orths(self.doc)
    @property
    @persisted('_packed_doc_text')
    def packed_doc_text(self) -> str:
        """Return the document' (:obj:`doc`) no-space normalized text."""
        return self._get_norm(self.doc, True)
    @property
    @persisted('_pack2ix')
    def pack2ix(self) -> Dict[int, int]:
        """Return a dictionary of character positions in the document
        (:obj:`doc`) text to respective positions in the same string without
        whitespace.
        """
        return self._get_pack2ix(self.doc.text.rstrip())
    @staticmethod
    def _shorten(s: str) -> str:
        """Shorten text used for logging."""
        return tw.shorten(s, 80)
    def _find_start_offset(self, query: TokenContainer,
                           candidate: TokenContainer) -> TokenContainer:
        """Find the sub-sentence by exact matches on the indexed text."""
        if logger.isEnabledFor(logging.DEBUG):
            logger.debug(f'looking for candidate in <{query.text}>')
        cont: TokenContainer = None
        at_toks: Iterable[FeatureToken] = filter(
            lambda t: not t.is_space, query.token_iter())
        ca_toks: Iterable[FeatureToken] = filter(
            lambda t: not t.is_space, candidate.token_iter())
        at: FeatureToken
        ct: FeatureToken
        for i, (at, ct) in enumerate(zip(at_toks, ca_toks)):
            if at.text != ct.text:
                break
        if i > 0:
            lspan = LexicalSpan(candidate.lexspan[0], ct.lexspan[1])
            cont = self.doc.get_overlapping_span(lspan)
            if logger.isEnabledFor(logging.DEBUG):
                logger.debug(f'found sentence start: <{cont.text}>')
        return cont
    def _find_by_sent_ix(self, sent_ix: int, query: TokenContainer,
                         targ_comp_text: str) -> TokenContainer:
        """Find the sentence by an index (when provided)."""
        sent: TokenContainer = None
        query_text: str = query.text
        if sent_ix < len(self.doc):
            candidate: TokenContainer = self.doc[sent_ix]
            cand_norm: str = self._get_norm(candidate)
            sent = self.text2sent.get(targ_comp_text)
            if sent is None and candidate is not None:
                if cand_norm.startswith(query_text) or \
                   
query_text.startswith(cand_norm):
                    sent = self._find_start_offset(query, candidate)
        if sent is not None and \
           
not self._spans_equal(targ_comp_text, self._get_norm(sent)):
            sent = None
        return sent
    def _find_doc_offset(self, query: TokenContainer,
                         targ_comp_text: str) -> TokenContainer:
        """Find the sub-sentence by finding subsequences of token text."""
        dorth: Tuple[TokenOrth, ...] = self.doc_tok_orths
        dtoks: Tuple[str, ...] = tuple(map(
            lambda t: t[0], self.doc_tok_orths))
        atoks: Tuple[str, ...] = tuple(map(
            lambda t: t[0], self._get_tok_orths(query)))
        alen: int = len(atoks)
        dpos: int = -1
        if logger.isEnabledFor(logging.TRACE):
            logger.trace(f'atoks: <{atoks}>')
        if logger.isEnabledFor(logging.TRACE):
            logger.trace(f'dtoks: <{dtoks}>')
        for i in range(len(dtoks)):
            if dtoks[i:i + alen] == atoks:
                dpos = i
                break
        if logger.isEnabledFor(logging.DEBUG):
            logger.debug(f'find offset: pos: {dpos}')
        if dpos > -1:
            lspan = LexicalSpan(dorth[dpos][1].lexspan[0],
                                dorth[dpos + alen - 1][1].lexspan[1])
            span: TokenContainer = self.doc.get_overlapping_span(lspan)
            if logger.isEnabledFor(logging.DEBUG):
                logger.debug(f'found in doc: <{self._shorten(span.text)}>')
            if not self._spans_equal(targ_comp_text, self._get_norm(span)):
                span = None
            return span
    def _find_by_char(self, query: TokenContainer) -> TokenContainer:
        """Find the sub-span by removing all space, which is needed in cases
        where parsed tokens (such as entites and MIMIC redacted tokens) have
        space.
        """
        span: TokenContainer = None
        targ_comp: str = self._get_norm(query, True)
        doc_comp: str = self.packed_doc_text
        pack_ix: int = doc_comp.find(targ_comp)
        if logger.isEnabledFor(logging.DEBUG):
            logger.debug(f'find by char: packed text index: {pack_ix}')
            logger.debug(f'comp span: <{self._shorten(query.text)}>')
        if logger.isEnabledFor(logging.TRACE):
            logger.trace(f'annotated: <{targ_comp}>')
            logger.trace(f'doc: <{doc_comp}>')
        if pack_ix > -1:
            pack2ix: Dict[int, int] = self.pack2ix
            start_ix: int = pack2ix.get(pack_ix)
            end_ix: int = pack2ix.get(pack_ix + len(targ_comp) - 1)
            if logger.isEnabledFor(logging.DEBUG):
                logger.debug(f'span: ({start_ix}, {end_ix})')
            if start_ix is not None and end_ix is not None:
                lspan = LexicalSpan(start_ix, end_ix)
                if logger.isEnabledFor(logging.DEBUG):
                    logger.debug(f'doc span: {lspan}')
                span = self.doc.get_overlapping_span(lspan)
                span_comp: str = self._get_norm(span, True)
                if not self._spans_equal(targ_comp, span_comp):
                    span = None
        return span
[docs]
    def find(self, query: TokenContainer, sent_ix: int = None) -> \
            
TokenContainer:
        """Find a sentence in document :obj:`doc`.  If a sentence index is
        given, it treats the query as a sentence to find in :obj:`doc`.
        :param query: the sentence to find in :obj:`doc`
        :param sent_ix: the sentence index hint if available
        :return: the matched text from :obj:`doc`
        """
        targ_comp_text: str = self._get_norm(query)
        span: TokenContainer = None
        if sent_ix is not None:
            span = self._find_by_sent_ix(sent_ix, query, targ_comp_text)
        if span is None:
            span = self._find_doc_offset(query, targ_comp_text)
        if span is None:
            span = self._find_by_char(query)
        return span