zensols.multi package

Submodules

zensols.multi.stash module

Stash extensions to distribute item creation over multiple processes.

class zensols.multi.stash.ChunkProcessor(config, name, chunk_id, data)[source]

Bases: object

Represents a chunk of work created by the parent and processed on the child.

__init__(config, name, chunk_id, data)
chunk_id: int

The nth chunk.

config: Configurable

The application context configuration used to create the parent stash.

data: object

The data created by the parent to be processed.

name: str

The name of the parent stash used to create the chunk, and subsequently process this chunk.

process()[source]

Create the stash used to process the data, then persisted in the stash.

Return type:

int

class zensols.multi.stash.MultiProcessDefaultStash(delegate, config, name, chunk_size=0, workers=1, processor_class=<class 'zensols.multi.stash.PoolMultiProcessor'>)[source]

Bases: MultiProcessStash

Just like MultiProcessStash, but provide defaults as a convenience.

__init__(delegate, config, name, chunk_size=0, workers=1, processor_class=<class 'zensols.multi.stash.PoolMultiProcessor'>)
chunk_size: int = 0

The size of each group of data sent to the child process to be handled; in some cases the child process will get a chunk of data smaller than this (the last) but never more; if this number is 0, then evenly divide the work so that each worker takes the largets amount of work to minimize the number of chunks (in this case the data is tupleized).

processor_class

The class of the processor to use for the handling of the work.

alias of PoolMultiProcessor

workers: Union[int, float] = 1

The number of processes spawned to accomplish the work or 0 to use all CPU cores. If this is a negative number, add the number of CPU processors with this number, so -1 would result in one fewer works utilized than the number of CPUs, which is a good policy for a busy server.

If the number is a float, then it is taken to be the percentage of the number of processes. If it is a float, the value must be in range (0, 1].

class zensols.multi.stash.MultiProcessFactoryStash(config, name, factory, enable_preemptive=False, **kwargs)[source]

Bases: MultiProcessDefaultStash

Like FactoryStash, but uses a subordinate factory stash to generate the data in a subprocess(es) in the same manner as the super class MultiProcessStash.

Attributes chunk_size and workers both default to 0.

__init__(config, name, factory, enable_preemptive=False, **kwargs)[source]

Initialize with attributes chunk_size and workers both defaulting to 0.

Parameters:
  • config (Configurable) – the application configuration meant to be populated by zensols.config.factory.ImportClassFactory

  • name (str) – the name of the parent stash used to create the chunk, and subsequently process this chunk

enable_preemptive: bool

If False, do not invoke the factory instance’s data calculation. If the value is always, then always assume the data is not calcuated, which forces the factory prime. Otherwise, if None, then call the super class data calculation falling back on the factory if the super returns False.

factory: Stash

The stash that creates the data, which is not to be confused with the delegate, which persists the data.

prime()[source]

If the delegate stash data does not exist, use this implementation to generate the data and process in children processes.

class zensols.multi.stash.MultiProcessStash(delegate, config, name, chunk_size, workers)[source]

Bases: PrimablePreemptiveStash

A stash that forks processes to process data in a distributed fashion. The stash is typically created by a ImportConfigFactory in the child process. Work is chunked (grouped) and then sent to child processes. In each, a new instance of this same stash is created using ImportConfigFactory and then an abstract method is called to dump the data.

Implementation details:

  • The delegate stash is used to manage the actual persistence of the data.

  • This implemetation of prime() is to fork processes to accomplish the work.

  • The process_class attribute is not set directly on this class since subclasses already have non-default fields. However it is set to PoolMultiProcessor by default.

The _create_data() and _process() methods must be implemented.

abstract _create_data()[source]

Create data in the parent process to be processed in the child process(es) in chunks. The returned data is grouped in to sub lists and passed to _process().

Return type:

Iterable[Any]

Returns:

an iterable of data to be processed

abstract _process(chunk)[source]

Process a chunk of data, each created by _create_data as a group in a subprocess.

Param:

chunk: a list of data generated by _create_data() to be processed in this method

Return type:

Iterable[Tuple[str, Any]]

Returns:

an iterable of (key, data) tuples, where key is used as the string key for the stash and the return value is the data returned by methods like load()

_create_chunk_processor(chunk_id, data)[source]

Factory method to create the ChunkProcessor instance.

Return type:

ChunkProcessor

See:

zensols.config.factory.ImportConfigFactory

ATTR_EXP_META = ('chunk_size', 'workers')
LOG_CONFIG_SECTION = 'multiprocess_log_config'

The name of the section to use to configure the log system. This section should be an instance definition of a LogConfigurator.

__init__(delegate, config, name, chunk_size, workers)
chunk_size: int

The size of each group of data sent to the child process to be handled; in some cases the child process will get a chunk of data smaller than this (the last) but never more; if this number is 0, then evenly divide the work so that each worker takes the largets amount of work to minimize the number of chunks (in this case the data is tupleized).

config: Configurable

The application configuration meant to be populated by zensols.config.factory.ImportClassFactory.

name: str

The name of the instance in the configuration.

prime()[source]

If the delegate stash data does not exist, use this implementation to generate the data and process in children processes.

processor_class: Type[MultiProcessor]

The class of the processor to use for the handling of the work.

workers: Union[int, float]

The number of processes spawned to accomplish the work or 0 to use all CPU cores. If this is a negative number, add the number of CPU processors with this number, so -1 would result in one fewer works utilized than the number of CPUs, which is a good policy for a busy server.

If the number is a float, then it is taken to be the percentage of the number of processes. If it is a float, the value must be in range (0, 1].

class zensols.multi.stash.MultiProcessor(name)[source]

Bases: object

A base class used by MultiProcessStash to divid the work up into chunks. This should be subclassed if the behavior of how divided work is to be processed is needed.

static _process_work(processor)[source]

Process a chunk of data in the child process that was created by the parent process.

Return type:

int

__init__(name)
invoke_work(workers, chunk_size, data)[source]
Return type:

int

name: str

The name of the multi-processor.

class zensols.multi.stash.PoolMultiProcessor(name)[source]

Bases: MultiProcessor

Uses multiprocessing.Pool to fork/exec processes to do the work.

class zensols.multi.stash.SingleMultiProcessor(name)[source]

Bases: PoolMultiProcessor

Does all work in the current process.

Module contents

Submodules provide multithreaded convenience to parallelize work.