"""Stash extensions to distribute item creation over multiple processes.
"""
__author__ = 'Paul Landes'
from typing import Iterable, List, Any, Tuple, Callable, Union, Type
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass, field
import sys
import os
import logging
import math
from multiprocessing import Pool
from zensols.util.time import time
from zensols.config import Configurable, ConfigFactory, ImportConfigFactory
from zensols.persist import (
Stash, PrimablePreemptiveStash, PreemptiveStash, PrimeableStash,
chunks, Deallocatable,
)
from zensols.cli import LogConfigurator
logger = logging.getLogger(__name__)
[docs]
@dataclass
class ChunkProcessor(object):
"""Represents a chunk of work created by the parent and processed on the
child.
"""
config: Configurable = field()
"""The application context configuration used to create the parent stash.
"""
name: str = field()
"""The name of the parent stash used to create the chunk, and subsequently
process this chunk.
"""
chunk_id: int = field()
"""The nth chunk."""
data: object = field()
"""The data created by the parent to be processed."""
def _create_stash(self) -> Tuple[ImportConfigFactory, Any]:
fac = ImportConfigFactory(self.config)
with time(f'factory inst {self.name} for chunk {self.chunk_id}',
logging.INFO):
inst = fac.instance(self.name)
inst.is_child = True
return fac, inst
[docs]
def process(self) -> int:
"""Create the stash used to process the data, then persisted in the
stash.
"""
factory, stash = self._create_stash()
cnt = 0
self.config_factory = factory
stash._init_child(self)
if logger.isEnabledFor(logging.INFO):
logger.info(f'processing chunk {self.chunk_id} ' +
f'with stash {stash.__class__}')
with time('processed {cnt} items for chunk {self.chunk_id}'):
for i, (id, inst) in enumerate(stash._process(self.data)):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'dumping {id} -> {inst.__class__}')
stash.delegate.dump(id, inst)
Deallocatable._try_deallocate(inst)
cnt += 1
Deallocatable._try_deallocate(stash)
Deallocatable._try_deallocate(factory)
return cnt
def __str__(self):
data = self.data
if data is not None:
if isinstance(data, list) and len(data) > 0:
data = data[0]
dtype = data.__class__.__name__
else:
dtype = 'None'
return f'{self.name} ({self.chunk_id}): data: {dtype}'
[docs]
@dataclass
class MultiProcessor(object, metaclass=ABCMeta):
"""A base class used by :class:`.MultiProcessStash` to divid the work up
into chunks. This should be subclassed if the behavior of how divided work
is to be processed is needed.
.. automethod:: _process_work
"""
name: str = field()
"""The name of the multi-processor."""
[docs]
@staticmethod
def _process_work(processor: ChunkProcessor) -> int:
"""Process a chunk of data in the child process that was created by the
parent process.
"""
if logger.isEnabledFor(logging.DEBUG):
logger.info(f'processing processor {processor}')
with time(f'processed processor {processor}'):
return processor.process()
[docs]
def invoke_work(self, workers: int, chunk_size: int,
data: Iterable[Any]) -> int:
fn: Callable = self.__class__._process_work
if logger.isEnabledFor(logging.INFO):
logger.info(f'{self.name}: spawning work in {type(self)} with ' +
f'chunk size {chunk_size} across {workers} workers')
return self._invoke_work(workers, chunk_size, data, fn)
@abstractmethod
def _invoke_work(self, workers: int, chunk_size: int,
data: Iterable[Any], fn: Callable) -> int:
pass
[docs]
class PoolMultiProcessor(MultiProcessor):
"""Uses :class:`multiprocessing.Pool` to fork/exec processes to do the work.
"""
def _invoke_pool(self, pool: Pool, fn: Callable, data: iter) -> List[int]:
if pool is None:
return tuple(map(fn, data))
else:
return pool.map(fn, data)
def _invoke_work(self, workers: int, chunk_size: int,
data: Iterable[Any], fn: Callable) -> int:
if workers == 1:
with time('processed singleton chunk'):
cnt = self._invoke_pool(None, fn, data)
else:
with Pool(workers) as p:
with time('processed chunks'):
cnt = self._invoke_pool(p, fn, data)
return cnt
[docs]
class SingleMultiProcessor(PoolMultiProcessor):
"""Does all work in the current process.
"""
def _invoke_work(self, workers: int, chunk_size: int,
data: Iterable[Any], fn: Callable) -> int:
return super()._invoke_work(1, chunk_size, data, fn)
[docs]
@dataclass
class MultiProcessStash(PrimablePreemptiveStash, metaclass=ABCMeta):
"""A stash that forks processes to process data in a distributed fashion.
The stash is typically created by a :class:`.ImportConfigFactory` in the
child process. Work is chunked (grouped) and then sent to child processes.
In each, a new instance of this same stash is created using
:class:`.ImportConfigFactory` and then an abstract method is called to dump
the data.
Implementation details:
* The :obj:`delegate` stash is used to manage the actual persistence of
the data.
* This implemetation of :meth:`prime` is to fork processes to accomplish
the work.
* The ``process_class`` attribute is not set directly on this class since
subclasses already have non-default fields. However it is set to
:class:`.PoolMultiProcessor` by default.
The :meth:`_create_data` and :meth:`_process` methods must be
implemented.
.. document private functions
.. automethod:: _create_data
.. automethod:: _process
.. automethod:: _create_chunk_processor
:see: :class:`zensols.config.factory.ImportConfigFactory`
"""
ATTR_EXP_META = ('chunk_size', 'workers')
LOG_CONFIG_SECTION = 'multiprocess_log_config'
"""The name of the section to use to configure the log system. This section
should be an instance definition of a :class:`.LogConfigurator`.
"""
config: Configurable = field()
"""The application configuration meant to be populated by
:class:`zensols.config.factory.ImportClassFactory`."""
name: str = field()
"""The name of the instance in the configuration."""
chunk_size: int = field()
"""The size of each group of data sent to the child process to be handled;
in some cases the child process will get a chunk of data smaller than this
(the last) but never more; if this number is 0, then evenly divide the work
so that each worker takes the largets amount of work to minimize the number
of chunks (in this case the data is tupleized).
"""
workers: Union[int, float] = field()
"""The number of processes spawned to accomplish the work or 0 to use all
CPU cores. If this is a negative number, add the number of CPU processors
with this number, so -1 would result in one fewer works utilized than the
number of CPUs, which is a good policy for a busy server.
If the number is a float, then it is taken to be the percentage of the
number of processes. If it is a float, the value must be in range (0, 1].
"""
processor_class: Type[MultiProcessor] = field(init=False)
"""The class of the processor to use for the handling of the work."""
def __post_init__(self):
super().__post_init__()
self.is_child = False
if not hasattr(self, 'processor_class'):
# sub classes like `MultiProcessDefaultStash` add this as a field,
# which will already be set by the time this is called
self.processor_class: Type[MultiProcessor] = None
[docs]
@abstractmethod
def _create_data(self) -> Iterable[Any]:
"""Create data in the parent process to be processed in the child
process(es) in chunks. The returned data is grouped in to sub lists and
passed to :meth:`_process`.
:return: an iterable of data to be processed
"""
pass
[docs]
@abstractmethod
def _process(self, chunk: List[Any]) -> Iterable[Tuple[str, Any]]:
"""Process a chunk of data, each created by ``_create_data`` as a group
in a subprocess.
:param: chunk: a list of data generated by :meth:`_create_data` to be
processed in this method
:return: an iterable of ``(key, data)`` tuples, where ``key`` is used
as the string key for the stash and the return value is the
data returned by methods like :meth:`load`
"""
pass
def _init_child(self, processor: ChunkProcessor):
"""Initialize the child process.
:param processor: the chunk processor that created this stash in the
child process
"""
self._config_child_logging(processor.config_factory)
def _config_child_logging(self, factory: ConfigFactory):
"""Initalize the logging system in the child process.
:param factory: the factory that was used to create this stash and
child app configi environment
"""
warn = None
config = factory.config
if config.has_option('section', self.LOG_CONFIG_SECTION):
conf_sec = config.get_option('section', self.LOG_CONFIG_SECTION)
if isinstance(factory, ImportConfigFactory):
log_conf = factory.instance(conf_sec)
if isinstance(log_conf, LogConfigurator):
log_conf.config()
else:
warn = f'unknown configuration object: {type(log_conf)}'
else:
warn = f'with unknown factory type: {type(factory)}',
if warn is not None:
print(f'warning: can not configure child process logging: {warn}',
file=sys.stderr)
[docs]
def _create_chunk_processor(self, chunk_id: int, data: Any) -> \
ChunkProcessor:
"""Factory method to create the ``ChunkProcessor`` instance.
"""
if logger.isEnabledFor(logging.DEBUG):
self._debug(f'creating chunk processor for id {chunk_id}')
return ChunkProcessor(self.config, self.name, chunk_id, data)
def _spawn_work(self) -> int:
"""Chunks and invokes a multiprocessing pool to invokes processing on
the children.
"""
multi_proc: MultiProcessor
if self.processor_class is None:
multi_proc = PoolMultiProcessor(self.name)
else:
multi_proc = self.processor_class(self.name)
chunk_size, workers = self.chunk_size, self.workers
if workers <= 0:
workers = os.cpu_count() + workers
elif isinstance(workers, float):
percent = workers
avail = os.cpu_count()
workers = math.ceil(percent * avail)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'calculating as {percent} of ' +
f'total {avail}: {workers}')
data = self._create_data()
if chunk_size == 0:
data = tuple(data)
chunk_size = math.ceil(len(data) / workers)
data = map(lambda x: self._create_chunk_processor(*x),
enumerate(chunks(data, chunk_size)))
return multi_proc.invoke_work(workers, chunk_size, data)
[docs]
def prime(self):
"""If the delegate stash data does not exist, use this implementation to
generate the data and process in children processes.
"""
super().prime()
if logger.isEnabledFor(logging.DEBUG):
self._debug(f'multi prime, is child: {self.is_child}')
has_data = self.has_data
if logger.isEnabledFor(logging.DEBUG):
self._debug(f'has data: {has_data}')
if not has_data:
with time('completed work in {self.__class__.__name__}'):
self._spawn_work()
self._reset_has_data()
[docs]
@dataclass
class MultiProcessDefaultStash(MultiProcessStash):
"""Just like :class:`.MultiProcessStash`, but provide defaults as a
convenience.
"""
chunk_size: int = field(default=0)
"""The size of each group of data sent to the child process to be handled;
in some cases the child process will get a chunk of data smaller than this
(the last) but never more; if this number is 0, then evenly divide the work
so that each worker takes the largets amount of work to minimize the number
of chunks (in this case the data is tupleized).
"""
workers: Union[int, float] = field(default=1)
"""The number of processes spawned to accomplish the work or 0 to use all
CPU cores. If this is a negative number, add the number of CPU processors
with this number, so -1 would result in one fewer works utilized than the
number of CPUs, which is a good policy for a busy server.
If the number is a float, then it is taken to be the percentage of the
number of processes. If it is a float, the value must be in range (0, 1].
"""
processor_class: Type[MultiProcessor] = field(default=PoolMultiProcessor)
"""The class of the processor to use for the handling of the work."""
[docs]
@dataclass(init=False)
class MultiProcessFactoryStash(MultiProcessDefaultStash):
"""Like :class:`~zensols.persist.FactoryStash`, but uses a subordinate
factory stash to generate the data in a subprocess(es) in the same manner as
the super class :class:`.MultiProcessStash`.
Attributes :obj:`chunk_size` and :obj:`workers` both default to ``0``.
"""
factory: Stash = field()
"""The stash that creates the data, which is not to be confused with the
:obj:`delegate`, which persists the data.
"""
enable_preemptive: bool = field()
"""If ``False``, do not invoke the :obj:`factory` instance's data
calculation. If the value is ``always``, then always assume the data is not
calcuated, which forces the factory prime. Otherwise, if ``None``, then
call the super class data calculation falling back on the :obj:`factory` if
the super returns ``False``.
"""
[docs]
def __init__(self, config: Configurable, name: str, factory: Stash,
enable_preemptive: bool = False, **kwargs):
"""Initialize with attributes :obj:`chunk_size` and :obj:`workers` both
defaulting to ``0``.
:param config: the application configuration meant to be populated by
:class:`zensols.config.factory.ImportClassFactory`
:param name: the name of the parent stash used to create the chunk, and
subsequently process this chunk
"""
if 'chunk_size' not in kwargs:
kwargs['chunk_size'] = 0
if 'workers' not in kwargs:
kwargs['workers'] = 0
super().__init__(config=config, name=name, **kwargs)
self.factory = factory
self.enable_preemptive = enable_preemptive
def _calculate_has_data(self) -> bool:
has_data = False
if self.enable_preemptive != 'always':
has_data = super()._calculate_has_data()
if not has_data and \
self.enable_preemptive and \
isinstance(self.factory, PreemptiveStash):
has_data = self.factory._calculate_has_data()
return has_data
[docs]
def prime(self):
if isinstance(self.factory, PrimeableStash):
if logger.isEnabledFor(logging.DEBUG):
self._debug(f'priming factory: {self.factory}')
self.factory.prime()
super().prime()
def _create_data(self) -> Iterable[Any]:
return self.factory.keys()
def _process(self, chunk: List[Any]) -> Iterable[Tuple[str, Any]]:
k: str
for k in chunk:
if logger.isEnabledFor(logging.INFO):
pid: int = os.getpid()
logger.info(f'processing key {k} in process {pid}')
val: Any = self.factory.load(k)
yield (k, val)