Source code for zensols.config.configbase

"""Abstract base class for a configuration read from a file.

"""
from __future__ import annotations
__author__ = 'Paul Landes'
from typing import (
    Tuple, Sequence, Dict, Set, Iterable, List, Any, Union, Optional, Type
)
from abc import ABCMeta, abstractmethod
import sys
import logging
from collections import OrderedDict
import inspect
from pathlib import Path
from io import TextIOBase
from . import ConfigurationError, Serializer, Dictable, Settings

logger = logging.getLogger(__name__)


[docs] class ConfigurableError(ConfigurationError): """Base class raised for any configuration based errors.""" pass
[docs] class ConfigurableFileNotFoundError(ConfigurableError): """Raised when a configuration file is not found for those file based instances of :class:`.Configurable`. """
[docs] def __init__(self, path: Path, source: Union[Path, Any] = None): msg = f"No such file: '{path}'" if isinstance(source, Path): msg += f' loading from {source}' super().__init__(msg) self.path = path self.source = source
[docs] class Configurable(Dictable, metaclass=ABCMeta): """An abstract base class that represents an application specific configuration. Note that many of the getters are implemented in ``configparser``. However, they are reimplemented here for consistency among parser. """ __slots__ = ('default_section', '_parent', 'serializer')
[docs] def __init__(self, default_section: str = None, *, parent: Configurable = None): """Initialize. :param default_section: used as the default section when non given on the get methds such as :meth:`get_option`; which defaults to ``defualt`` """ if default_section is None: self.default_section = 'default' else: self.default_section = default_section self.serializer = self._create_serializer() self._parent = parent
def _create_serializer(self) -> Serializer: return Serializer() @property def parent(self) -> Optional[Configurable]: """The configurable that creates this instance.""" return self._parent def _get_children(self, name: str = None, section: str = None) -> \ List[Configurable]: def collect(parent: Configurable, coll: List[Configurable]): coll.append(parent) if hasattr(parent, 'children'): for c in parent.children: collect(c, coll) if parent.parent is not None: collect(parent.parent, coll) children: List[Configurable] = [] collect(self, children) if section is not None: keep: List[Configurable] = [] for c in children: if c._is_initialized(): # otherwise infinite recursive loop if name is None: if section in c.sections: keep.append(c) elif c.has_option(name, section): keep.append(c) children = keep return children
[docs] @abstractmethod def get_options(self, section: str = None) -> Dict[str, str]: """Get all options for a section. If ``opt_keys`` is given return only options with those keys. :param section: section in the ini file to fetch the value; defaults to constructor's ``default_section`` """ pass
[docs] @abstractmethod def has_option(self, name: str, section: str = None) -> bool: pass
[docs] def get_option(self, name: str, section: str = None) -> str: """Return an option from ``section`` with ``name``. :param section: section in the ini file to fetch the value; defaults to constructor's ``default_section`` :param vars: contains the defaults for missing values of ``name`` """ val = None opts = self.get_options(section or self.default_section) if opts is not None: val = opts.get(name) if val is None: raise ConfigurableError( f"No option '{name}' found in section: {section}") return val
[docs] def reload(self): """Reload the configuration from the backing store. """ pass
[docs] def get_option_list(self, name: str, section: str = None) -> List[str]: """Just like :meth:`get_option` but parse as a list using ``split``. :param section: section in the ini file to fetch the value; defaults to constructor's ``default_section`` """ val = self.get_option(name, section) return self.serializer.parse_list(val)
[docs] def get_option_boolean(self, name: str, section: str = None) -> bool: """Just like :meth:`get_option` but parse as a boolean (any case `true`). :param section: section in the ini file to fetch the value; defaults to constructor's ``default_section`` :param vars: contains the defaults for missing values of ``name`` """ val = self.get_option(name, section) val = val.lower() if val else 'false' return val == 'true'
[docs] def get_option_int(self, name: str, section: str = None): """Just like :meth:`get_option` but parse as an integer. :param section: section in the ini file to fetch the value; defaults to constructor's ``default_section`` """ val = self.get_option(name, section) if val: return int(val)
[docs] def get_option_float(self, name: str, section: str = None): """Just like :meth:`get_option` but parse as a float. """ val = self.get_option(name, section) if val: return float(val)
[docs] def get_option_path(self, name: str, section: str = None): """Just like :meth:`get_option` but return a ``pathlib.Path`` object of the string. """ val = self.get_option(name, section) path = None if val is not None: path = Path(val) return path
[docs] def get_option_object(self, name: str, section: str = None): """Just like :meth:`get_option` but parse as an object per object syntax rules. :see: :meth:`.Serializer.parse_object` """ val = self.get_option(name, section) if val: return self.serializer.parse_object(val)
@property def options(self) -> Dict[str, Any]: """All options from the default section. """ return self.get_options()
[docs] def populate(self, obj: Union[Dict[str, str], Any] = None, section: str = None, parse_types: bool = True) -> \ Union[Dict[str, Any], Settings]: """Set attributes in ``obj`` with ``setattr`` from the all values in ``section``. :param obj: the object to populate :param section: the section with the source data used to populate :param parse_types: whether to parse string values into Python types :return: ``obj`` if given as non-``None``, otherwise a new :class:`dict` """ section = self.default_section if section is None else section sec = self.get_options(section) if sec is None: # needed for the YamlConfig class raise ConfigurableError( f"No section from which to populate: '{section}'") return self.serializer.populate_state(sec, obj, parse_types)
def __getitem__(self, section: str = None) -> Settings: return self.populate(section=section) def __getstate__(self) -> Dict[str, Any]: # null out parent return tuple(map(lambda x: None if isinstance(x, Configurable) else x, super().__getstate__())) @property def sections(self) -> Set[str]: """All sections of the configuration file. """ return frozenset()
[docs] def set_option(self, name: str, value: str, section: str = None): """Set an option on this configurable. :param name: the name of the option :param value: the value to set :param section: the section (if applies) to add the option :raises NotImplementedError: if this class does not support this operation """ raise NotImplementedError()
[docs] def copy_sections(self, to_populate: Configurable, sections: Iterable[str] = None, robust: bool = False) -> Exception: """Copy all sections from this configuruable to ``to_populate``. :param to_populate: the target configuration object :param sections: the sections to populate or ``None`` to copy allow :param robust: if ``True``, when any exception occurs (namely interplation exceptions), don't copy and remove the section in the target configuraiton :return: the last exception that occured while trying to copy the properties """ last_ex = None if sections is None: sections = self.sections for sec in sections: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'copying section {sec}') try: opts: Dict[str, Any] = self.get_options(sec) if opts is None: raise ConfigurableError(f"No such section: '{sec}'") for k, v in opts.items(): to_populate.set_option(k, v, sec) # robust is needed by lib.ConfigurationImporter._load(); but deal # only with interpolation errors except ConfigurableError as e: raise e except Exception as e: if not robust: raise e else: to_populate.remove_section(sec) last_ex = e return last_ex
def _copy_import_section(self, sections: Sequence[str], target: Configurable = None): """Copy import section definitions defined in this instance to ``target``. :param sections: the name of the sections that have import definitions (per :meth:`.Configurable.from_section`) :param target: the configurable to populate, which defaults to this instance """ from . import ConfigurableFactory target = self if target is None else target secs: Dict[str, Dict[str, Any]] = \ dict(map(lambda s: (s, self.get_options(s)), sections)) sec_name: str params: Dict[str, Any] for sec_name, params in secs.items(): if params is None: raise ConfigurableError(f"No section '{sec_name}' in {target}") if logger.isEnabledFor(logging.DEBUG): logger.debug(f'import sec: {sec_name}') src: Configurable = ConfigurableFactory.from_section( params, sec_name, parent=self) src.copy_sections(target)
[docs] def remove_section(self, section: str): """Remove a seciton with the given name.""" raise NotImplementedError()
[docs] def merge(self, to_populate: Configurable): """Copy all data from this configuruable to ``to_populate``, and clobber any overlapping properties in the process. :param to_populate: the target configuration object """ to_populate.copy_sections(self, to_populate.sections)
def _get_calling_module(self, depth: int = 0): """Get the last module in the call stack that is not this module or ``None`` if the call originated from this module. """ for frame in inspect.stack(): mod = inspect.getmodule(frame[depth]) logger.debug(f'calling module: {mod}') if mod is not None: mod_name = mod.__name__ if mod_name != __name__: return mod
[docs] def resource_filename(self, resource_name: str, module_name: str = None): """Return a resource based on a file name. This uses :meth:`.Serializer.resource_filename` to find the resources. If it doesn't find it, it returns a path on the file system. :param: resource_name the file name of the resource to obtain (or name if obtained from an installed module) :param module_name: the name of the module to obtain the data, which defaults to ``__name__`` :return: a path on the file system or resource of the installed module """ if module_name is None: mod = self._get_calling_module() logger.debug(f'calling module: {mod}') if mod is not None: module_name = mod.__name__ return self.serializer.resource_filename(resource_name, module_name)
[docs] def write(self, depth: int = 0, writer: TextIOBase = sys.stdout): for sec in sorted(self.sections): self._write_line(sec, depth, writer) opts: Dict[str, str] = self.get_options(sec) if opts is None: raise ConfigurationError(f'No such section: {sec}') if not isinstance(opts, dict): raise ConfigurationError( f"Expecting dict but got {type(opts)} in section '{sec}'") for k in sorted(opts.keys()): v = opts[k] self._write_line(f'{k}: {v}', depth + 1, writer)
[docs] def asdict(self, *args, **kwargs) -> Dict[str, Any]: secs = OrderedDict() for sec in sorted(self.sections): svs = OrderedDict() secs[sec] = svs opts = self.get_options(sec) for k in sorted(opts.keys()): svs[k] = opts[k] return secs
[docs] def as_deep_dict(self) -> Dict[str, Any]: """Return a deep :class:`builtins.dict` with the top level with section names as keys and deep (i.e. ``json:``) values as nested dictionaries. """ secs = OrderedDict() for sec in sorted(self.sections): svs = OrderedDict() secs[sec] = svs self.populate(svs, sec) return secs
[docs] def as_one_tier_dict(self, *args, **kwargs) -> Dict[str, Any]: """Return a flat one-tier :class:`builtins.dict` with keys in ``<section>:<option>`` format. """ flat: Dict[str, Any] = {} for sec, opts in self.asdict(False).items(): for k, v in opts.items(): flat[f'{sec}:{k}'] = v return flat
def _get_short_str(self) -> str: return self.__class__.__name__ def _get_container_desc(self, include_type: bool = True, max_path_len: int = 3) -> str: return self.__class__.__name__ def _raise(self, msg: str, err: Exception = None): config_file: Optional[Union[Path, str]] = None if hasattr(self, 'config_file'): config_file = self.config_file if isinstance(config_file, str): msg = f'{msg} in file {config_file}' elif isinstance(config_file, Path): msg = f'{msg} in file {config_file.absolute()}' else: msg = f'{msg} in {self._get_container_desc()}' if err is None: raise ConfigurableError(msg) else: raise ConfigurableError(msg) from err def __str__(self): return f'<{self._get_short_str()}>' def __repr__(self): return self.__str__()
[docs] class TreeConfigurable(Configurable, metaclass=ABCMeta): """A hierarchical configuration. The sections are the root nodes, but each section's values can be nested :class:`~builtins.dict` instances. These values are traversable with a string dot path notation. """
[docs] def __init__(self, default_section: str = None, default_vars: Dict[str, Any] = None, sections_name: str = 'sections', sections: Set[str] = None, parent: Configurable = None): """Initialize. :param default_section: used as the default section when non given on the get methds such as :meth:`get_option`; which defaults to ``defualt`` :param default_vars: used in place of missing variables duing value interpolation; **deprecated**: this will go away in a future release :param sections_name: the dot notated path to the variable that has a list of sections :param sections: used as the set of sections for this instance """ super().__init__(default_section=default_section, parent=parent) self.sections_name = sections_name self.default_vars = default_vars if default_vars else {} self._sections = sections self._options = None
@abstractmethod def _get_config(self) -> Dict[str, Any]: pass @abstractmethod def _set_config(self, source: Dict[str, Any]): pass @abstractmethod def _is_initialized(self) -> bool: """Return whether the configuration is initialized with data.""" pass @property def config(self) -> Dict[str, Any]: """The configuration as a nested set of :class:`~builtins.dict`. :see: :meth:`invalidate` """ return self._get_config() @config.setter def config(self, source: Dict[str, Any]): """The configuration as a nested set of :class:`~builtins.dict`. :see: :meth:`invalidate` """ self._set_config(source) @property def root(self) -> Optional[str]: """The root name of the configuration file, if one exists. If more than one root exists, return the first. """ if not hasattr(self, '_root'): root_keys: Iterable[str] = self.config.keys() if len(root_keys) > 0: self._root = next(iter(root_keys)) else: self._root = None return self._root @classmethod def _is_primitive(cls, obj) -> bool: return isinstance(obj, (float, int, bool, str, set, list, tuple, Type, Path)) def _flatten(self, context: Dict[str, Any], path: str, n: Dict[str, Any], sep: str = '.'): if logger.isEnabledFor(logging.DEBUG): logger.debug(f'path: {path}, n: <{n}>, context: <{context}>') if n is None: context[path] = None elif self._is_primitive(n): context[path] = n elif isinstance(n, (dict, Settings)): for k, v in n.items(): k = path + sep + k if len(path) else k self._flatten(context, k, v, sep) else: self._raise(f'Unknown yaml type {type(n)}: {n}')
[docs] def invalidate(self): """This should be called when the underlying :obj:`config` object graph changes *under the nose* of this instance. """ context = {} context.update(self.default_vars) self._flatten(context, '', self.config) self._all_keys = set(context.keys()) self._sections = None self._options = None if hasattr(self, '_root'): del self._root
def _find_node(self, n: Union[Dict, Any], path: str, name: str): if logger.isEnabledFor(logging.DEBUG): logger.debug(f'search: n={n}, path={path}, name={name}') if path == name: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'found: <{n}>') return n elif isinstance(n, dict): for k, v in n.items(): k = path + '.' + k if len(path) else k v = self._find_node(v, k, name) if v is not None: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'found {name} -> {v}') return v if logger.isEnabledFor(logging.DEBUG): logger.debug('not found: {}'.format(name))
[docs] def get_tree(self, name: Optional[str] = None) -> Dict[str, Any]: """Get the node in the configuration, which is a nested set :class:`~builtins.dict` instances as an object graph. :param name: the doted notation indicating which node in the tree to retrieve """ if name is None: return self.config return self._find_node(self.config, '', name)
def _get_option(self, name: str) -> str: node = self.get_tree(name) if self._is_primitive(node): return node elif self.default_vars is not None and name in self.default_vars: return self.default_vars[name] elif node is None: # values in YAML can be set to ``null`` return None else: self._raise(f'Unknown type or state: {name} ({type(node)})') @property def options(self) -> Dict[str, Any]: if self._options is None: self.config self._options = {} for k in self._all_keys: self._options[k] = self._get_option(k) return self._options
[docs] def has_option(self, name: str, section: str = None) -> bool: opts = self.options return name in opts
[docs] def get_option(self, name: str, section: str = None) -> str: """Return an option using a dot encoded path. :param section: ignored """ if self.default_vars is not None and name in self.default_vars: return self.default_vars[name] else: ops = self.options if name in ops: return ops[name] else: self._raise(f"No such option: '{name}'")
[docs] def get_options(self, name: str = None) -> Dict[str, Any]: name = self.default_section if name is None else name if self.default_vars is not None and name in self.default_vars: return self.default_vars[name] else: node = self.get_tree(name) if not isinstance(node, str) or isinstance(node, list): return node elif name in self.default_vars: return self.default_vars[name] else: self._raise(f'No such option: {name}')
def _get_at_depth(self, node: Any, s_level: int, level: int, path: List[str]) -> Set[str]: def map_node(x: Tuple[str, Any]) -> str: k, v = x if isinstance(v, dict): if len(path) > 0: k = '.'.join(path) + '.' + k else: k = None return k nodes: Set[str] = set() if isinstance(node, dict): if level < s_level: for k, child in node.items(): path.append(k) ns = self._get_at_depth(child, s_level, level + 1, path) path.pop() nodes.update(ns) elif level == s_level: return set(filter(lambda x: x is not None, map(map_node, node.items()))) return nodes def _find_sections(self) -> Set[str]: secs: Set[str] sec_key = f'{self.root}.{self.sections_name}' if self.has_option(sec_key): secs: Dict[str, Any] = self.get_tree(sec_key) if isinstance(secs, str): secs = self.get_option_list(sec_key) elif isinstance(secs, int): secs = self._get_at_depth(self.get_tree(None), secs, 0, []) secs = frozenset(secs) else: secs = self._get_at_depth(self.get_tree(None), 0, 0, []) return secs @property def sections(self) -> Set[str]: """The sections by finding the :obj:`section_name` based from the :obj:`root`. """ if self._sections is None: self._sections = self._find_sections() return self._sections @sections.setter def sections(self, sections: Set[str]): self._sections = sections
[docs] def write(self, depth: int = 0, writer: TextIOBase = sys.stdout): self._write_dict(self.config, depth, writer)
[docs] def asdict(self, *args, **kwargs) -> Dict[str, Any]: return self.config