Source code for zensols.multi.factory

"""Stash implementations that use existing factory stashes.

"""
__author__ = 'Paul Landes'

from typing import Iterable, List, Tuple, Set, Any, Union, Type
from dataclasses import dataclass, field
import os
import logging
from zensols.util import Failure
from zensols.config import Configurable
from zensols.persist import Stash, PreemptiveStash, PrimeableStash
from .stash import MultiProcessor, MultiProcessStash, PoolMultiProcessor

logger = logging.getLogger(__name__)


[docs] @dataclass class MultiProcessDefaultStash(MultiProcessStash): """Just like :class:`.MultiProcessStash`, but provide defaults as a convenience. """ chunk_size: int = field(default=0) """The size of each group of data sent to the child process to be handled; in some cases the child process will get a chunk of data smaller than this (the last) but never more; if this number is 0, then evenly divide the work so that each worker takes the largets amount of work to minimize the number of chunks (in this case the data is tupleized). """ workers: Union[int, float] = field(default=1) """The number of processes spawned to accomplish the work or 0 to use all CPU cores. If this is a negative number, add the number of CPU processors with this number, so -1 would result in one fewer works utilized than the number of CPUs, which is a good policy for a busy server. If the number is a float, then it is taken to be the percentage of the number of processes. If it is a float, the value must be in range (0, 1]. """ processor_class: Type[MultiProcessor] = field(default=PoolMultiProcessor) """The class of the processor to use for the handling of the work."""
[docs] @dataclass(init=False) class MultiProcessFactoryStash(MultiProcessDefaultStash): """Like :class:`~zensols.persist.domain.FactoryStash`, but uses a subordinate factory stash to generate the data in a subprocess(es) in the same manner as the super class :class:`.MultiProcessStash`. Attributes :obj:`chunk_size` and :obj:`workers` both default to ``0``. """ factory: Stash = field() """The stash that creates the data, which is not to be confused with the :obj:`delegate`, which persists the data. """ enable_preemptive: Union[str, bool] = field(default=False) """If ``False``, do not invoke the :obj:`factory` instance's data calculation. If the value is ``always``, then always assume the data is not calcuated, which forces the factory prime. Otherwise, if ``None``, then call the super class data calculation falling back on the :obj:`factory` if the super returns ``False``. """
[docs] def __init__(self, config: Configurable, name: str, factory: Stash, enable_preemptive: bool = False, **kwargs): """Initialize with attributes :obj:`chunk_size` and :obj:`workers` both defaulting to ``0``. :param config: the application configuration meant to be use by a :class:`~zensols.config.importfac.ImportConfigFactory` :param name: the name of the parent stash used to create the chunk, and subsequently process this chunk """ if 'chunk_size' not in kwargs: kwargs['chunk_size'] = 0 if 'workers' not in kwargs: kwargs['workers'] = 0 super().__init__(config=config, name=name, **kwargs) self.factory = factory self.enable_preemptive = enable_preemptive
def _calculate_has_data(self) -> bool: has_data = False if self.enable_preemptive != 'always': has_data = super()._calculate_has_data() if not has_data and \ self.enable_preemptive and \ isinstance(self.factory, PreemptiveStash): has_data = self.factory._calculate_has_data() return has_data
[docs] def prime(self): if isinstance(self.factory, PrimeableStash): if logger.isEnabledFor(logging.DEBUG): self._debug(f'priming factory: {self.factory}') self.factory.prime() super().prime()
def _create_data(self) -> Iterable[Any]: return self.factory.keys() def _process(self, chunk: List[Any]) -> Iterable[Tuple[str, Any]]: k: str for k in chunk: if logger.isEnabledFor(logging.INFO): pid: int = os.getpid() if logger.isEnabledFor(logging.INFO): logger.info(f'processing key {k} in process {pid}') val: Any = self._load_from_factory(k) yield (k, val) def _load_from_factory(self, k: str) -> Any: return self.factory.load(k)
[docs] def clear(self): super().clear() self.factory.clear()
[docs] @dataclass(init=False) class MultiProcessRobustStash(MultiProcessFactoryStash): """Like :class:`.MultiProcessFactoryStash` but robustly recover from a a previously failed process. It does this by calculating the keys the factory stash has but the delegate does not, and then starts the work to generate them using the factory stash in child processes. """ protect_work: bool = field(default=True) """If ``True`` :class:~zensols.util.fail.Failure` instances are created for items that can not be created due to an exception being thrown. """
[docs] def __init__(self, config: Configurable, name: str, protect_work: bool = True, **kwargs): super().__init__(config=config, name=name, **kwargs) self.protect_work = protect_work # force _create_data via prime to create missing keys self._set_has_data(False)
def _get_missing_keys(self) -> Iterable[Any]: fkeys: Set[str] = set(self.factory.keys()) dkeys: Set[str] = set(self.delegate.keys()) return fkeys - dkeys def _create_data(self) -> Iterable[Any]: return self._get_missing_keys()
[docs] def invalidate(self): """Force a check for missing keys and start the work to recover them. This should only be needed after the creation of the stash if files were deleted under "it's nose". """ self._set_has_data(False) self.prime()
[docs] def iterfails(self) -> Iterable[Tuple[str, Failure]]: """Return an iterable of all the items that failed due to a raised exception. """ return filter(lambda t: isinstance(t[1], Failure), self.items())
[docs] def reprocess_failures(self) -> Set[str]: """Reprocess failures. First find them in the dataset, delete them, then start the work to recreate them. :return: the set of failure keys that have been deleted """ deleted: Set[str] = set() k: str for k in map(lambda t: t[0], self.iterfails()): deleted.add(k) self.delegate.delete(k) self.invalidate() return deleted
def _load_from_factory(self, k: str) -> Any: item: Any if self.protect_work: try: item = super()._load_from_factory(k) except Exception as e: msg = (f'Could not load from stash {type(self.factory)} ' + f'in {type(self)} (section={self.name}): {e}') item = Failure(e, self, message=msg) else: item = super()._load_from_factory(k) return item