Source code for zensols.nlp.score

"""Produces matching scores.

"""
from __future__ import annotations
__author__ = 'Paul Landes'
from typing import (
    Tuple, Set, Dict, Iterable, List, ClassVar, Union, Optional, Type
)
from dataclasses import dataclass, field
from abc import ABCMeta, ABC, abstractmethod
import logging
import sys
from io import TextIOBase
import nltk.translate.bleu_score as bleu
import numpy as np
from zensols.introspect import ClassImporter
from zensols.config import Dictable
from zensols.persist import persisted
from zensols.nlp import TokenContainer
from . import NLPError

logger = logging.getLogger(__name__)


[docs] class ScorerError(NLPError): """Raised for any scoring errors (this module).""" pass
[docs] @dataclass class Score(Dictable, metaclass=ABCMeta): """Individual scores returned from :class:`.ScoreMethod`. """
[docs] def asrow(self, meth: str) -> Dict[str, float]: return {f'{meth}_{x[0]}': x[1] for x in self.asdict().items()}
[docs] @dataclass(eq=False) class ErrorScore(Score): """A replacement instance when scoring fails from a raised exception. """ method: str = field(repr=False) """The method of the :class:`.ScoreMethod` that raised the exception.""" exception: Exception = field() """The exception that was raised.""" replace_score: Score = field(default=None) """The score to use in place of this score. Otherwise :meth:`asrow` return a single :obj:`numpy.nan` like :class:`.FloatScore`. """
[docs] def asrow(self, meth: str) -> Dict[str, float]: if self.replace_score is not None: return self.replace_score.asrow(self.method) return {self.method: np.nan}
def __eq___(self, other) -> bool: return self.method == other.method and \ str(self.exception) == str(other.exeption)
[docs] @dataclass class FloatScore(Score): """Float container. This is needed to create the flat result container structure. Object creation becomes less import since most clients will use :meth:`.ScoreSet.asnumpy`. """ NAN_INSTANCE: ClassVar[FloatScore] = None """Used to add to ErrorScore for harmonic means replacements. """ value: float = field() """The value of score."""
[docs] def asrow(self, meth: str) -> Dict[str, float]: return {meth: self.value}
FloatScore.NAN_INSTANCE = FloatScore(np.nan)
[docs] @dataclass class HarmonicMeanScore(Score): """A score having a precision, recall and the harmonic mean of the two, F-score.' """ NAN_INSTANCE: ClassVar[HarmonicMeanScore] = None """Used to add to ErrorScore for harmonic means replacements. """ precision: float = field() recall: float = field() f_score: float = field()
HarmonicMeanScore.NAN_INSTANCE = HarmonicMeanScore(np.nan, np.nan, np.nan)
[docs] @dataclass class ScoreResult(Dictable): """A result of scores created by a :class:`.ScoreMethod`. """ scores: Dict[str, Tuple[Score]] = field() """The scores by method name.""" correlation_id: Optional[str] = field(default=None) """An ID for correlating back to the :class:`.TokenContainer`.""" def __len__(self) -> int: return len(self.scores) def __getitem__(self, k: str) -> Dict[str, Tuple[Score]]: return self.scores[k]
[docs] def write(self, depth: int = 0, writer: TextIOBase = sys.stdout): dct = super().asdict() del dct['correlation_id'] if self.correlation_id is None: self._write_dict(dct, depth, writer) else: self._write_line(f'correlation ID: {self.correlation_id}', depth, writer) self._write_dict(dct, depth + 1, writer)
[docs] @dataclass class ScoreSet(Dictable): """All scores returned from :class:`.Scorer'. """ results: Tuple[ScoreResult] = field() """A tuple with each element having the results of the respective sentence pair in :obj:`.ScoreContext.sents`. Each elemnt is a dictionary with the method are the keys with results as the values as output of the :class:`.ScoreMethod`. This is created in :class:`.Scorer`. """ correlation_id_col: str = field(default='id') """The column name for the :obj:`.ScoreResult.correlation_id` added to Numpy arrays and Pandas dataframes. If ``None``, then the correlation IDS are used as the index. """ def __len__(self) -> int: return len(self.results) def __iter__(self) -> Iterable[Dict[str, Tuple[Score]]]: return iter(self.results) def __getitem__(self, i: int) -> Dict[str, Tuple[Score]]: return self.results[i] @property def has_correlation_id(self) -> bool: """Whether the results have correlation IDs.""" return len(self.results) > 0 and \ self.results[0].correlation_id is not None
[docs] def as_numpy(self, add_correlation: bool = True) -> \ Tuple[List[str], np.ndarray]: """Return the Numpy array with column descriptors of the results. Spacy depends on Numpy, so this package will always be availale. :param add_correlation: whether to add the correlation ID (if there is one), using :obj:`correlation_id_col` """ cols: Set[str] = set() rows: List[Dict[str, float]] = [] result: ScoreResult for result in self.results: row: Dict[str, float] = {} rows.append(row) meth: str for meth, result in result.scores.items(): rdat: Dict[str, float] = result.asrow(meth) row.update(rdat) cols.update(rdat.keys()) cols: List[str] = sorted(cols) nd_rows: List[np.ndarray] = [] for row in rows: nd_rows.append(np.array(tuple(map(row.get, cols)))) arr = np.stack(nd_rows) if add_correlation and self.has_correlation_id: ids = np.array(tuple(map(lambda r: r.correlation_id, self.results))) ids = np.expand_dims(ids, 1) arr = np.append(arr, ids, axis=1) cols.append(self.correlation_id_col) return cols, arr
[docs] def as_dataframe(self, add_correlation: bool = True) -> 'pandas.DataFrame': """This gets data from :meth:`as_numpy` and returns it as a Pandas dataframe. :param add_correlation: whether to add the correlation ID (if there is one), using :obj:`correlation_id_col` :return: an instance of :class:`pandas.DataFrame` of the results """ import pandas as pd cols, arr = self.as_numpy(add_correlation=False) df = pd.DataFrame(arr, columns=cols) if add_correlation and self.has_correlation_id: # add as a dataframe, otherwise string correlation IDs cast the # numpy array to a string cid: str = self.correlation_id_col cids: Tuple[Union[str, int]] = tuple( map(lambda r: r.correlation_id, self.results)) if cid is None: df.index = cids else: cols: List[str] = df.columns.tolist() df[cid] = cids cols.insert(0, cid) df = df[cols] return df
[docs] def write(self, depth: int = 0, writer: TextIOBase = sys.stdout): self._write_line('results:', depth, writer) self._write_iterable(self.results, depth + 1, writer)
[docs] @dataclass class ScoreContext(Dictable): """Input needed to create score(s) using :class:`.Scorer`. """ pairs: Tuple[Tuple[TokenContainer, TokenContainer]] = field() """Sentence, span or document pairs to score (order matters for some scoring methods such as rouge). Depending on the scoring method the ordering of the sentence pairs should be: * ``(<summary>, <source>)`` * ``(<gold>, <prediction>)`` * ``(<references>, <candidates>)`` See :class:`.ScoreMethod` implementations for more information about pair ordering. """ methods: Set[str] = field(default=None) """A set of strings, each indicating the :class:`.ScoreMethod` used to score :obj:`pairs`. """ norm: bool = field(default=True) """Whether to use the normalized tokens, otherwise use the original text.""" correlation_ids: Tuple[Union[int, str]] = field(default=None) """The IDs to correlate with each sentence pair, or ``None`` to skip correlating them. The length of this tuple must be that of :obj:`pairs`. """ def __post_init__(self): self.validate()
[docs] def validate(self): if self.correlation_ids is not None and \ len(self.pairs) != len(self.correlation_ids): raise ScorerError( 'Expecting same length pairs to correlation IDs but got: ' + f'{len(self.pairs)} != {len(self.correlation_ids)}')
[docs] @dataclass class ScoreMethod(ABC): """An abstract base class for scoring methods (bleu, rouge, etc). """ reverse_sents: bool = field(default=False) """Whether to reverse the order of the sentences.""" @classmethod def _get_external_modules(cls: Type) -> Tuple[str, ...]: """Return a list of external module names needed by this method.""" return ()
[docs] @classmethod def missing_modules(cls: Type) -> Tuple[str]: """Return a list of missing modules neede by this score method.""" missing: List[str] = [] mod: str for mod in cls._get_external_modules(): try: ClassImporter.get_module(mod) except ModuleNotFoundError: missing.append(mod) return missing
[docs] @classmethod def is_available(cls: Type) -> bool: """Whether or not this method is available on this system.""" return len(cls.missing_modules()) == 0
@abstractmethod def _score(self, meth: str, context: ScoreContext) -> Iterable[Score]: """See :meth:`score`""" pass
[docs] def score(self, meth: str, context: ScoreContext) -> Iterable[Score]: """Score the sentences in ``context`` using method identifer ``meth``. :param meth: the identifer such as ``bleu`` :param context: the context containing the data to score :return: the results, which are usually :class:`float` or :class:`.Score` """ scores: Iterable[Score] if logger.isEnabledFor(logging.DEBUG): logger.debug(f'scoring meth: {meth}, ' + f'reverse: {self.reverse_sents}') if not isinstance(context.pairs[0][0], TokenContainer): raise ScorerError(f'Wrong type: {type(context.pairs[0][0])} ' + f' for first item, expecting {TokenContainer}') if not isinstance(context.pairs[0][1], TokenContainer): raise ScorerError(f'Wrong type: {type(context.pairs[0][0])} ' + f' for second item, expecting {TokenContainer}') try: if self.reverse_sents: prev_pairs = context.pairs try: context.pairs = tuple(map( lambda x: (x[1], x[0]), context.pairs)) scores = self._score(meth, context) finally: context.pairs = prev_pairs else: scores = self._score(meth, context) # force generators to realize scores and force any raised exceptions scores = tuple(scores) except Exception as e: logger.info(e, exc_info=True) scores = tuple([ErrorScore(meth, e)] * len(context.pairs)) return scores
def _tokenize(self, context: ScoreContext) -> \ Iterable[Tuple[Tuple[str], Tuple[str]]]: s1: TokenContainer s2: TokenContainer for s1, s2 in context.pairs: s1t: Tuple[str] s2t: Tuple[str] if context.norm: s1t = tuple(map(lambda t: t.norm, s1.token_iter())) s2t = tuple(map(lambda t: t.norm, s2.token_iter())) else: s1t = tuple(map(lambda t: t.text, s1.token_iter())) s2t = tuple(map(lambda t: t.text, s2.token_iter())) yield (s1t, s2t)
[docs] @dataclass class ExactMatchScoreMethod(ScoreMethod): """A scoring method that return 1 for exact matches and 0 otherwise. """ equality_measure: str = field(default='norm') """The method by which to compare, which is one of: * ``norm``: compare with :meth:`.TokenContainer.norm` * ``text``: compare with :obj:`.TokenContainer.text` * ``equal``: compare using a Python object ``__eq__`` equal compare, which also compares the token values """ def _score(self, meth: str, context: ScoreContext) -> Iterable[FloatScore]: s1t: TokenContainer s2t: TokenContainer for s1t, s2t in context.pairs: val: float if self.equality_measure == 'norm': val = 1. if s1t.norm == s2t.norm else 0. elif self.equality_measure == 'text': val = 1. if s1t.text == s2t.text else 0. elif self.equality_measure == 'equal': val = 1. if s1t == s2t else 0. else: raise ScorerError( f"No equality measure: '{self.equality_measure}'") yield FloatScore(val)
[docs] @dataclass class LevenshteinDistanceScoreMethod(ScoreMethod): """A scoring method that computes the Levenshtein distance. """ form: str = field(default='canon') """The form of the of the text used for the evaluation, which is one of: * ``text``: the original text with :obj:`.TokenContainer.text` * ``norm``: the normalized text using :meth:`.TokenContainer.norm` * ``canon``: :obj:`.TokenContainer.canonical` to normalize out whitespace for better comparisons """ normalize: bool = field(default=True) """Whether to normalize the return value as the *distince / the max length of both sentences*. """ @classmethod def _get_external_modules(cls: Type) -> Tuple[str, ...]: return ('editdistance',) def _score(self, meth: str, context: ScoreContext) -> Iterable[FloatScore]: import editdistance def container_to_str(container: TokenContainer) -> str: return container.norm if self.use_norm else container.text s1t: TokenContainer s2t: TokenContainer for s1t, s2t in context.pairs: t1: str t2: str if self.form == 'text': # use the normalized canonical form t1, t2 = s1t.text, s2t.text elif self.form == 'norm': # use the normalized canonical form t1, t2 = s1t.norm, s2t.norm elif self.form == 'canon': # use the normalized canonical form t1, t2 = s1t.canonical, s2t.canonical else: raise ScorerError(f"No form: '{self.form}'") val: int = editdistance.eval(t1, t2) if self.normalize: text_len: int = max(len(t1), len(t2)) val = 1. - (val / text_len) val: float = val yield FloatScore(val)
[docs] @dataclass class BleuScoreMethod(ScoreMethod): """The BLEU scoring method using the :mod:`nltk` package. The first sentences are the references and the second are the hypothesis. """ smoothing_function: bleu.SmoothingFunction = field(default=None) """This is an implementation of the smoothing techniques for segment-level BLEU scores. Citation: `Chen and Cherry (2014)`_ A Systematic Comparison of Smoothing Techniques for Sentence-Level BLEU. In WMT14. .. _Chen and Cherry (2014): http://acl2014.org/acl2014/W14-33/pdf/W14-3346.pdf """ weights: Tuple[float, ...] = field(default=(0.25, 0.25, 0.25, 0.25)) """Weights for each n-gram. For example: a tuple of float weights for unigrams, bigrams, trigrams and so on can be given: ``weights = (0.1, 0.3, 0.5, 0.1)``. """ silence_warnings: bool = field(default=False) """Silence the BLEU warning of n-grams not matching ``The hypothesis contains 0 counts of 3-gram overlaps...`` """ def __post_init__(self): if self.silence_warnings: import warnings # silence the BLEU warning of n-grams not matching # The hypothesis contains 0 counts of 3-gram overlaps... warnings.filterwarnings( 'ignore', message='[.\n]+The hypothesis contains 0 counts.*') def _score(self, meth: str, context: ScoreContext) -> Iterable[FloatScore]: s1t: TokenContainer s2t: TokenContainer for s1t, s2t in self._tokenize(context): val: float = bleu.sentence_bleu( [s1t], s2t, weights=self.weights, smoothing_function=self.smoothing_function) yield FloatScore(val)
[docs] @dataclass class RougeScoreMethod(ScoreMethod): """The ROUGE scoring method using the :mod:`rouge_score` package. """ feature_tokenizer: bool = field(default=True) """Whether to use the :class:`.TokenContainer` tokenization, otherwise use the :mod:`rouge_score` package. """ @classmethod def _get_external_modules(cls: Type) -> Tuple[str, ...]: return ('rouge_score',) def _score(self, meth: str, context: ScoreContext) -> \ Iterable[HarmonicMeanScore]: from rouge_score import rouge_scorer class Tokenizer(object): @staticmethod def tokenize(sent: TokenContainer) -> Tuple[str]: return sents[id(sent)] s1: TokenContainer s2: TokenContainer if self.feature_tokenizer: scorer = rouge_scorer.RougeScorer([meth], tokenizer=Tokenizer) pairs = zip(context.pairs, self._tokenize(context)) for (s1, s2), (s1t, s2t) in pairs: sents = {id(s1): s1t, id(s2): s2t} res: Dict[str, Score] = scorer.score(s1, s2) yield HarmonicMeanScore(*res[meth]) else: scorer = rouge_scorer.RougeScorer([meth]) for s1t, s2t in context.pairs: res: Dict[str, Score] = scorer.score( context.s1.text, context.s2.text) yield HarmonicMeanScore(*res[meth])
[docs] @dataclass class Scorer(object): """A class that scores sentences using a set of registered methods (:obj:`methods`). """ methods: Dict[str, ScoreMethod] = field(default=None) """The registered scoring methods availale, which are accessed from :obj:`.ScoreContext.meth`. """ default_methods: Set[str] = field(default=None) """Methods (keys from :obj:`methods`) to use when none are provided in the :obj:`.ScoreContext.meth` in the call to :meth:`score`. """ @persisted('_get_missing_modules_pw', cache_global=True) def _get_missing_modules(self) -> Tuple[str]: missing: List[str] = [] not_avail: List[str] = [] name: str meth: ScoreMethod for name, meth in self.methods.items(): missing_mods: Tuple[str] = meth.missing_modules() if len(missing_mods) > 0: logger.warning(f'method {meth} is not available: ' + f'missing {missing_mods}') not_avail.append(name) missing.extend(missing_mods) for name in not_avail: del self.methods[name] return tuple(missing_mods)
[docs] def score(self, context: ScoreContext) -> ScoreSet: """Score the sentences in ``context``. :param context: the context containing the data to score :return: the results for each method indicated in ``context`` """ by_meth: Dict[str, Tuple[Score]] = {} by_res: List[ScoreResult] = [] meths: Iterable[str] = context.methods if meths is None: if self.default_methods is None: meths = self.methods.keys() else: meths = self.default_methods self._get_missing_modules() meth: str for meth in meths: smeth: ScoreMethod = self.methods.get(meth) if smeth is None: raise ScorerError(f"No scoring method: '{meth}'") by_meth[meth] = tuple(smeth.score(meth, context)) for i in range(len(context.pairs)): item_res: Dict[str, Score] = {} corr_id: str = None meth: str if context.correlation_ids is not None: corr_id = context.correlation_ids[i] res_tup: Tuple[Score] # for each scored pair for meth, res_tup in by_meth.items(): item_res[meth] = res_tup[i] by_res.append(ScoreResult(item_res, correlation_id=corr_id)) return ScoreSet(results=tuple(by_res))
def __call__(self, context: ScoreContext) -> ScoreSet: """See :meth:`score`.""" return self.score(context)