"""Contains general purpose persistence library classes.
"""
__author__ = 'Paul Landes'
from typing import Any, Union, Callable, Tuple, ClassVar, Dict
from abc import ABC
import logging
import collections
import traceback
from io import StringIO
from zensols.util import APIError
logger = logging.getLogger(__name__)
[docs]
class Deallocatable(ABC):
"""All subclasses have the ability to deallocate any resources. This is
useful for cases where there could be reference cycles or deallocation
(i.e. CUDA tensors) need happen implicitly and faster.
.. document private functions
.. automethod:: _print_undeallocated
.. automethod:: _deallocate_attribute
.. automethod:: _try_deallocate
"""
PRINT_TRACE: ClassVar[bool] = False
"""When ``True``, print the stack trace when deallocating with
:meth:`deallocate`.
"""
ALLOCATION_TRACKING: ClassVar[bool] = False
"""Enables allocation tracking. When this if ``False``, this functionality
is not used and disabled.
"""
_ALLOCATIONS: Dict[int, Any] = {}
"""The data structure that retains all allocated instances.
"""
# when true, recurse through deallocatable instances while freeing
_RECURSIVE: ClassVar[bool] = False
[docs]
def __init__(self):
super().__init__()
if self.ALLOCATION_TRACKING:
k = id(self)
sio = StringIO()
traceback.print_stack(file=sio)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'adding allocated key: {k} -> {type(self)}')
self._ALLOCATIONS[k] = (self, sio.getvalue())
[docs]
def deallocate(self):
"""Deallocate all resources for this instance.
"""
k = id(self)
if self.PRINT_TRACE:
traceback.print_stack()
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'deallocating {k}: {self._deallocate_str()}')
self._mark_deallocated(k)
@classmethod
def _num_deallocations(cls) -> int:
"""Return the number of objects currently allocated."""
return len(cls._ALLOCATIONS)
def _mark_deallocated(self, obj: Any = None):
"""Mark ``obj`` as deallocated regardless if it is, or ever will be
deallocated. After this is called, it will not be reported in such
methods as :meth:`_print_undeallocated`.
"""
if obj is None:
k = id(self)
else:
k = obj
if self.ALLOCATION_TRACKING:
if k in self._ALLOCATIONS:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'removing allocated key: {k}')
del self._ALLOCATIONS[k]
else:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'no key to deallocate: {k} ' +
f'({self._deallocate_str()})')
[docs]
@staticmethod
def _try_deallocate(obj: Any, recursive: bool = False) -> bool:
"""If ``obj`` is a candidate for deallocation, deallocate it.
:param obj: the object instance to deallocate
:return: ``True`` if the object was deallocated, otherwise return
``False`` indicating it can not and was not deallocated
"""
cls = globals()['Deallocatable']
recursive = recursive or cls._RECURSIVE
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'trying to deallocate: {type(obj)}')
if isinstance(obj, cls):
obj.deallocate()
return True
elif recursive and isinstance(obj, (tuple, list, set)):
for o in obj:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'deallocate tuple item: {type(o)}')
cls._try_deallocate(o, recursive)
return True
elif recursive and isinstance(obj, dict):
for o in obj.values():
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'deallocate dict item: {type(o)}')
cls._try_deallocate(o, recursive)
return True
return False
[docs]
def _deallocate_attribute(self, attrib: str) -> bool:
"""Deallocate attribute ``attrib`` if possible, which means it both
exists and extends from this class.
"""
deallocd = False
if hasattr(self, attrib):
inst = getattr(self, attrib)
deallocd = self._try_deallocate(inst)
if logger.isEnabledFor(logging.DEBUG):
logging.debug(f'deallocated {type(self)}.{attrib}')
delattr(self, attrib)
return deallocd
def _deallocate_attributes(self, attribs: Tuple[str, ...]) -> int:
"""Deallocates all attributes in ``attribs`` using
:meth:`_deallocate_attribute`.
"""
cnt = 0
for attrib in attribs:
if self._deallocate_attribute(attrib):
cnt += 1
return cnt
[docs]
@classmethod
def _print_undeallocated(cls, include_stack: bool = False,
only_counts: bool = False,
fail: bool = False):
"""Print all unallocated objects.
:param include_stack: if ``True`` print out the stack traces of all the
unallocated references; if ``only_counts`` is
``True``, this is ignored
:param only_counts: if ``True`` only print the counts of each
unallocated class with counts for each
:param fail: if ``True``, raise an exception if there are any
unallocated references found
"""
allocs = cls._ALLOCATIONS
if len(allocs) > 0:
print(f'total allocations: {len(allocs)}')
if only_counts:
cls_counts = collections.defaultdict(lambda: 0)
for cls in map(lambda o: type(o[0]), allocs.values()):
cls_counts[cls] += 1
for k in sorted(cls_counts.keys(), key=lambda x: x.__name__):
print(f'{k}: {cls_counts[k]}')
else:
for k, (v, stack) in allocs.items():
vstr = str(type(v))
if hasattr(v, 'name'):
vstr = f'{vstr} ({v.name})'
print(f'{k} -> {vstr}')
if include_stack:
print(stack)
if fail:
cls.assert_dealloc()
@classmethod
def _deallocate_all(cls):
"""Deallocate all the objects that have not yet been and clear the data
structure.
"""
allocs = cls._ALLOCATIONS
to_dealloc = tuple(allocs.values())
allocs.clear()
for obj, trace in to_dealloc:
obj.deallocate()
def _deallocate_str(self) -> str:
return str(self.__class__)
[docs]
@classmethod
def assert_dealloc(cls):
cnt = len(cls._ALLOCATIONS)
if cnt > 0:
raise APIError(f'resource leak with {cnt} intances')
[docs]
class dealloc_recursive(object):
[docs]
def __init__(self):
self.org_rec_state = Deallocatable._RECURSIVE
def __enter__(self):
Deallocatable._RECURSIVE = True
def __exit__(self, type, value, traceback):
Deallocatable._RECURSIVE = self.org_rec_state
[docs]
class dealloc(object):
"""Object used with a ``with`` scope for deallocating any subclass of
:class:`.Deallocatable`. The first argument can also be a function, which
is useful when tracking deallocations when ``track`` is ``True``.
Example::
with dealloc(lambda: ImportClassFactory('some/path')) as fac:
return fac.instance('stash')
"""
[docs]
def __init__(self, inst: Union[Callable, Deallocatable],
track: bool = False, include_stack: bool = False):
"""
:param inst: either an object instance to deallocate or a callable that
creates the instance to deallocate
:param track: when ``True``, set
:obj:`.Deallocatable.ALLOCATION_TRACKING` to ``True`` to
start tracking allocations
:param include_stack: adds stack traces in the call to
:meth:`.Deallocatable._print_undeallocated`
"""
self.track = track
self.include_stack = include_stack
self.org_track = Deallocatable.ALLOCATION_TRACKING
if track:
Deallocatable.ALLOCATION_TRACKING = True
if callable(inst) and not isinstance(inst, Deallocatable):
inst = inst()
self.inst = inst
def __enter__(self):
return self.inst
def __exit__(self, type, value, traceback):
self.inst.deallocate()
if self.track:
Deallocatable._print_undeallocated(self.include_stack)
Deallocatable.ALLOCATION_TRACKING = self.org_track