"""Abstracts the concept of a Python ``dict`` with additional functionality.
"""
__author__ = 'Paul Landes'
from typing import Any, Iterable, Tuple, Union, Set
from dataclasses import dataclass, field, InitVar
from abc import abstractmethod, ABC, ABCMeta
import sys
import logging
import itertools as it
from pathlib import Path
from . import PersistableError
logger = logging.getLogger(__name__)
[docs]
class NotPickleable(object):
"""A class marker that raises an error on attempts to :mod:`pickle` the
instance.
"""
def __getstate__(self):
"""
:raises PersistableError: is raised by the :mod:`pickle` package
"""
raise PersistableError(f'Instances are not pickleable: {type(self)}')
[docs]
class chunks(object):
"""An iterable that chunks any other iterable in to chunks. Each element
returned is a list of elemnets of the given size or smaller. That element
that might be smaller is the remainer of the iterable once it is exhausted.
"""
[docs]
def __init__(self, iterable: iter, size: int, enum: bool = False):
"""Initialize the chunker.
:param iterable: any iterable object
:param size: the size of each chunk
"""
self.iterable = iterable
self.size = size
self.enum = enum
def __iter__(self):
self.iterable_session = iter(self.iterable)
return self
def __next__(self):
ds = []
for e in range(self.size):
try:
obj = next(self.iterable_session)
except StopIteration:
break
if self.enum:
obj = (e, obj)
ds.append(obj)
if len(ds) == 0:
raise StopIteration()
return ds
[docs]
class Stash(ABC):
"""A dictionary-like pure virtual class for CRUDing data, most of which read
and write to/from the file system. One major difference is dictionaries
iterate over keys while stashes iterate over items, which calls
:meth:`items`.
Note that there are subtle differences a [Stash] and a ``dict`` when
generating or accessing data. For example, when indexing obtaining the
value is sometimes *forced* by using some mechanism to create the item.
When using ``get`` it relaxes this creation mechanism for some
implementations.
"""
[docs]
@abstractmethod
def load(self, name: str) -> Any:
"""Load a data value from the pickled data with key ``name``.
Semantically, this method loads the using the stash's implementation.
For example :class:`.DirectoryStash` loads the data from a file if it
exists, but factory type stashes will always re-generate the data.
:see: :meth:`get`
"""
pass
[docs]
def get(self, name: str, default: Any = None) -> Any:
"""Load an object or a default if key ``name`` doesn't exist.
Semantically, this method tries not to re-create the data if it already
exists. This means that if a stash has built-in caching mechanisms,
this method uses it.
:see: :meth:`load`
"""
if self.exists(name):
item = self.load(name)
else:
item = default
return item
[docs]
def exists(self, name: str) -> bool:
"""Return ``True`` if data with key ``name`` exists.
**Implementation note**: This :meth:`.Stash.exists` method is very
inefficient and should be overriden.
"""
for k in self.keys():
if k == name:
return True
return False
[docs]
@abstractmethod
def dump(self, name: str, inst: Any):
"Persist data value ``inst`` with key ``name``."
pass
[docs]
@abstractmethod
def delete(self, name: str = None):
"""Delete the resource for data pointed to by ``name`` or the entire
resource if ``name`` is not given.
"""
pass
[docs]
def clear(self):
"""Delete all data from the from the stash.
**Important**: Exercise caution with this method, of course.
"""
if logger.isEnabledFor(logging.DEBUG):
self._debug(f'clearing stash {self.__class__}')
for k in self.keys():
if logger.isEnabledFor(logging.DEBUG):
self._debug(f'deleting key: {k}')
self.delete(k)
[docs]
@abstractmethod
def keys(self) -> Iterable[str]:
"""Return an iterable of keys in the collection.
"""
pass
[docs]
def key_groups(self, n):
"Return an iterable of groups of keys, each of size at least ``n``."
return chunks(self.keys(), n)
[docs]
def values(self) -> Iterable[Any]:
"""Return the values in the hash.
"""
return map(lambda k: self.__getitem__(k), self.keys())
[docs]
def items(self) -> Tuple[str, Any]:
"""Return an iterable of all stash items."""
return map(lambda k: (k, self.__getitem__(k)), self.keys())
def _debug(self, msg: str):
"""Utility debugging method that adds the class name to the message to
document the source stash.
This makes no checks for if debugging is enabled since it is assumed
the caller will do so for avoiding double checks of the logger level.
"""
logger.debug(f'[{self.__class__.__name__}] {msg}')
def __getitem__(self, key):
item = self.get(key)
if item is None:
raise KeyError(key)
return item
def __setitem__(self, key, value):
self.dump(key, value)
def __delitem__(self, key):
self.delete(key)
def __contains__(self, key):
return self.exists(key)
def __iter__(self):
return map(lambda x: (x, self.__getitem__(x),), self.keys())
def __len__(self):
return len(tuple(self.keys()))
[docs]
@dataclass
class NoopStash(Stash):
"""A stash that does nothing.
"""
[docs]
def load(self, name: str) -> Any:
return None
[docs]
def get(self, name: str, default: Any = None) -> Any:
return default
[docs]
def exists(self, name: str) -> bool:
return False
[docs]
def dump(self, name: str, inst: Any):
pass
[docs]
def delete(self, name: str = None):
pass
[docs]
def keys(self) -> Iterable[str]:
return iter(())
[docs]
@dataclass
class ReadOnlyStash(Stash):
"""An abstract base class for subclasses that do not support write methods
(i.e. :meth:`dump`). This class is useful to extend for factory type
classes that generate data. Paired with container classes such as
:class:`.DictionaryStash` provide persistence in a reusable way.
The only method that needs to be implemented is :meth:`load` and
:meth:`keys`. However, it is recommended to implement :meth:`exists` to
speed things up.
Setting attribute ``strict`` to ``True`` will raise a
:class:`.PersistableError` for any modification attempts. Otherwise,
setting it to ``False`` (the default) silently ignores and does nothing on
:meth:`.dump`, :meth:`delete` and :meth:`clear`.
Example::
class RangeStash(ReadOnlyStash):
def __init__(self, n, end: int = None):
super().__init__()
self.n = n
self.end = end
def load(self, name: str) -> Any:
if self.exists(name):
return name
def keys(self) -> Iterable[str]:
if self.end is not None:
return map(str, range(self.n, self.end))
else:
return map(str, range(self.n))
def exists(self, name: str) -> bool:
n = int(name)
if self.end is None:
if (n >= self.n):
return False
elif (n < self.n) or (n >= self.end):
return False
return True
"""
def __post_init__(self):
self.strict = False
def _ro_check(self, meth: str):
if self.strict:
meth: str = meth.capitalize()
raise PersistableError(
f'{meth} not implemented for read only stashes ({type(self)}')
[docs]
def dump(self, name: str, inst: Any):
self._ro_check('dump')
[docs]
def delete(self, name: str = None):
self._ro_check('delete')
[docs]
def clear(self):
self._ro_check('clear')
[docs]
@dataclass
class CloseableStash(Stash):
"""Any stash that has a resource that needs to be closed.
"""
[docs]
@abstractmethod
def close(self):
"Close all resources created by the stash."
pass
[docs]
class DelegateDefaults(object):
"""Defaults set in :class:`.DelegateStash`.
"""
# setting to True breaks stash reloads from ImportConfigFactory, so set to
# True for tests etc
CLASS_CHECK = False
DELEGATE_ATTR = False
[docs]
@dataclass
class DelegateStash(CloseableStash, metaclass=ABCMeta):
"""Delegate pattern. It can also be used as a no-op if no delegate is
given.
A minimum functioning implementation needs the :meth:`load` and
:meth:`keys` methods overriden. Inheriting and implementing a
:class:`.Stash` such as this is usually used as the ``factory`` in a
:class:`.FactoryStash`.
This class delegates attribute fetches to the delegate for the
unimplemented methods and attributes using a decorator pattern when
attribute :py:obj:`delegate_attr` is set to ``True``.
**Note:** Delegate attribute fetching can cause strange and unexpected
behavior, so use this funcationlity with care. It is advised to leave it
off if unexpected ``AttributeError`` are raised due to incorrect attribute
is access or method dispatching.
:see: :py:obj:`delegate_attr`
"""
delegate: Stash = field()
"""The stash to delegate method invocations."""
def __post_init__(self):
if self.delegate is None:
raise PersistableError('Delegate not set')
if not isinstance(self.delegate, Stash):
msg = f'not a stash: {self.delegate.__class__} or reloaded'
if DelegateDefaults.CLASS_CHECK:
raise PersistableError(msg)
else:
logger.warning(msg)
self.delegate_attr = DelegateDefaults.DELEGATE_ATTR
def __getattr__(self, attr, default=None):
if attr == 'delegate_attr':
return False
if self.delegate_attr:
try:
delegate = super().__getattribute__('delegate')
except AttributeError:
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute " +
f"'{attr}'; delegate not set'")
return delegate.__getattribute__(attr)
else:
return super().__getattribute__(attr)
def _debug_meth(self, meth: str):
if logger.isEnabledFor(logging.DEBUG):
self._debug(
f'calling method <{meth}> on delegate {type(self.delegate)}')
[docs]
def load(self, name: str) -> Any:
self._debug_meth('load')
if self.delegate is not None:
return self.delegate.load(name)
[docs]
def get(self, name: str, default: Any = None) -> Any:
"""Load an object or a default if key ``name`` doesn't exist.
**Implementation note:** sub classes will probably want to override
this method given the super method is cavalier about calling
:meth:`exists:` and :meth:`load`. Based on the implementation, this
can be problematic.
"""
self._debug_meth('get')
if self.delegate is None:
return super().get(name, default)
else:
return self.delegate.get(name, default)
[docs]
def exists(self, name: str) -> bool:
self._debug_meth('exists')
if self.delegate is not None:
return self.delegate.exists(name)
else:
return False
[docs]
def dump(self, name: str, inst):
self._debug_meth('dump')
if self.delegate is not None:
return self.delegate.dump(name, inst)
[docs]
def delete(self, name=None):
self._debug_meth('delete')
if self.delegate is not None:
self.delegate.delete(name)
[docs]
def keys(self) -> Iterable[str]:
self._debug_meth('keys')
if self.delegate is not None:
return self.delegate.keys()
return ()
[docs]
def clear(self):
self._debug_meth('clear')
if self.delegate is not None:
if logger.isEnabledFor(logging.DEBUG):
self._debug(
f'calling super clear on {self.delegate.__class__}')
self.delegate.clear()
[docs]
def close(self):
self._debug_meth('close')
if self.delegate is not None:
return self.delegate.close()
[docs]
@dataclass
class ReadOnlyDelegateStash(DelegateStash, ReadOnlyStash):
"""Makes any stash read only.
"""
def __post_init__(self):
super().__post_init__()
ReadOnlyStash.__post_init__(self)
[docs]
def dump(self, name: str, inst: Any):
ReadOnlyStash.dump(self, name, inst)
[docs]
def delete(self, name: str = None):
ReadOnlyStash.delete(self, name)
[docs]
def clear(self):
ReadOnlyStash.clear(self)
[docs]
@dataclass
class KeyLimitStash(DelegateStash):
"""A stash that limits the number of generated keys useful for debugging.
For most stashes, this also limits the iteration output since that is based
on key mapping.
"""
ATTR_EXP_META = ('limit',)
limit: int = field(default=sys.maxsize)
"""The max number of keys provided as a slice of the delegate's keys."""
[docs]
def keys(self) -> Iterable[str]:
ks = super().keys()
return it.islice(ks, self.limit)
[docs]
def exists(self, name: str) -> bool:
return name in self.keys()
[docs]
@dataclass
class KeySubsetStash(ReadOnlyDelegateStash):
"""A stash that exposes a subset of the keys available in the
:obj:`delegate`.
"""
key_subset: InitVar[Union[Path, Set[str]]] = field()
"""A subset of the keys availble. If this is set to a
:class:`~pathlib.Path`, then the keys are read from a newline delimited
file.
"""
dynamic_subset: InitVar[bool] = field()
"""Whether the delegate keys are dynamic, which forces inefficient key
checks on the delegate.
"""
def __post_init__(self, key_subset: Union[Path, Set[str]],
dynamic_subset: bool):
super().__post_init__()
if isinstance(key_subset, Path):
with open(key_subset) as f:
self._key_subset = filter(
lambda ln: len(ln) > 0,
map(str.strip, f.readlines()))
else:
self._key_subset = frozenset(key_subset)
self._dynamic_subset = dynamic_subset
[docs]
def load(self, name: str) -> Any:
if self.exists(name):
return super().load(name)
[docs]
def get(self, name: str, default: Any = None) -> Any:
if self.exists(name):
return super().get(name)
[docs]
def keys(self) -> Iterable[str]:
if self._dynamic_subset:
return self._key_subset | super().keys()
else:
return self._key_subset
[docs]
def exists(self, name: str) -> bool:
if self._dynamic_subset and not super().exists(name):
return False
return name in self._key_subset
[docs]
@dataclass
class PreemptiveStash(DelegateStash):
"""Provide support for preemptively creating data in a stash. It provides
this with :obj:`has_data` and provides a means of keeping track if the data
has yet been created.
**Implementation note**: This stash retrieves data from the delegate
without checking to see if it exists first since the data might not have
been (preemptively) yet created.
"""
def __post_init__(self):
super().__post_init__()
self._has_data = None
[docs]
def get(self, name: str, default: Any = None) -> Any:
"""See class doc's implementation note."""
item = self.load(name)
if item is None:
item = default
return item
@property
def has_data(self) -> bool:
"""Return whether or not the stash has any data available or not.
"""
return self._calculate_has_data()
def _calculate_has_data(self) -> bool:
"""Return ``True`` if the delegate has keys.
"""
if self._has_data is None:
try:
next(iter(self.delegate.keys()))
self._has_data = True
except StopIteration:
self._has_data = False
return self._has_data
def _reset_has_data(self):
"""Reset the state of whether the stash has data or not.
"""
self._has_data = None
def _set_has_data(self, has_data: bool = True):
"""Set the state of whether the stash has data or not.
"""
self._has_data = has_data
[docs]
def clear(self):
if logger.isEnabledFor(logging.DEBUG):
self._debug('not clearing--has no data')
super().clear()
self._reset_has_data()
[docs]
class Primeable(ABC):
"""Any subclass that has the ability (and need) to do preprocessing. For
stashes, this means processing before an CRUD method is invoked. For all
other classes it usually is some processing that must be done in a single
process.
"""
[docs]
def prime(self):
if logger.isEnabledFor(logging.INFO):
logger.info(f'priming {type(self)}...')
[docs]
@dataclass
class PrimeableStash(Stash, Primeable):
"""Any subclass that has the ability to do processing before any CRUD method
is invoked.
"""
[docs]
def prime(self):
if isinstance(self, DelegateStash) and \
isinstance(self.delegate, PrimeableStash):
self.delegate.prime()
[docs]
def get(self, name: str, default: Any = None) -> Any:
self.prime()
return super().get(name, default)
[docs]
def load(self, name: str) -> Any:
self.prime()
return super().load(name)
[docs]
def keys(self) -> Iterable[str]:
self.prime()
return super().keys()
[docs]
@dataclass
class PrimablePreemptiveStash(PrimeableStash, PreemptiveStash):
"""A stash that's primable and preemptive.
"""
pass
[docs]
@dataclass
class ProtectiveStash(DelegateStash):
"""A stash that guards :meth:`dump` so that when :class:`Exception` is
raised, the instance of the exception is dumped instead the instance data.
"""
log_errors: bool = field()
"""When ``True`` log caught exceptions as warnings."""
[docs]
def dump(self, name: str, inst: Any):
try:
super().dump(name, inst)
except Exception as e:
if self.log_errors:
logger.warning(f"Could not dump '{name}', using as value: {e}",
exc_info=True)
super().dump(name, e)
[docs]
@dataclass
class FactoryStash(PreemptiveStash):
"""A stash that defers to creation of new items to another :obj:`factory`
stash. It does this by calling first getting the data from the
:obj:`delegate` stash, then when it does not exist, it uses the the
:obj:`factory` to create the data when loading with :meth:`load`.
Similarly, when accessing with :meth:`get` or indexing, the factory created
item is dumped back to the delegate when the delegate does not have it.
"""
ATTR_EXP_META = ('enable_preemptive',)
factory: Stash = field()
"""The stash used to create using ``load`` and ``keys``."""
enable_preemptive: bool = field(default=True)
"""If ``False``, do not invoke the super class's data calculation."""
dump_factory_nones: bool = field(default=True)
"""Whether to pass on ``None`` values to the delegate when the factory
creates them.
"""
def _calculate_has_data(self) -> bool:
if self.enable_preemptive:
return super()._calculate_has_data()
else:
return False
[docs]
def load(self, name: str) -> Any:
item = super().load(name)
if logger.isEnabledFor(logging.DEBUG):
self._debug(f'loaded item {name} -> {type(item)}')
if item is None:
if logger.isEnabledFor(logging.DEBUG):
self._debug(f'resetting data and loading from factory: {name}')
item = self.factory.load(name)
if item is not None or self.dump_factory_nones:
if logger.isEnabledFor(logging.DEBUG):
self._debug(f'dumping {name} -> {type(item)}')
super().dump(name, item)
self._reset_has_data()
if logger.isEnabledFor(logging.DEBUG):
self._debug(f'reset data: has_data={self.has_data}')
return item
[docs]
def keys(self) -> Iterable[str]:
if self.has_data:
if logger.isEnabledFor(logging.DEBUG):
self._debug('super (delegate) keys')
ks = super().keys()
else:
if logger.isEnabledFor(logging.DEBUG):
self._debug('factory keys')
ks = self.factory.keys()
return ks
[docs]
def clear(self):
super().clear()
if not isinstance(self.factory, ReadOnlyStash):
self.factory.clear()
[docs]
@dataclass
class CacheFactoryStash(FactoryStash):
"""Like :class:`.FactoryStash` but suitable for :class:`.ReadOnlyStash`
factory instances that have a defined key set and only need a backing stash
for caching.
"""
dump_factory_nones: bool = field(default=False)
"""Whether to pass on ``None`` values to the delegate when the factory
creates them.
"""
[docs]
def keys(self) -> Iterable[str]:
return self.factory.keys()
[docs]
def exists(self, name: str) -> bool:
return self.factory.exists(name)