Source code for zensols.config.configbase

"""Abstract base class for a configuration read from a file.

"""
from __future__ import annotations
__author__ = 'Paul Landes'
from typing import Dict, Set, Iterable, List, Any, Union, Optional, Type, Tuple
from abc import ABCMeta, abstractmethod
import sys
import logging
from collections import OrderedDict
import inspect
from pathlib import Path
from io import TextIOBase
from . import ConfigurationError, Serializer, Dictable, Settings

logger = logging.getLogger(__name__)


[docs] class ConfigurableError(ConfigurationError): """Base class raised for any configuration based errors.""" pass
[docs] class ConfigurableFileNotFoundError(ConfigurableError): """Raised when a configuration file is not found for those file based instances of :class:`.Configurable`. """
[docs] def __init__(self, path: Path, source: Union[Path, Any] = None): msg = f"No such file: '{path}'" if isinstance(source, Path): msg += f' loading from {source}' super().__init__(msg) self.path = path self.source = source
[docs] class Configurable(Dictable, metaclass=ABCMeta): """An abstract base class that represents an application specific configuration. Note that many of the getters are implemented in ``configparser``. However, they are reimplemented here for consistency among parser. """
[docs] def __init__(self, default_section: str = None): """Initialize. :param default_section: used as the default section when non given on the get methds such as :meth:`get_option`; which defaults to ``defualt`` """ if default_section is None: self.default_section = 'default' else: self.default_section = default_section self.serializer = self._create_serializer()
def _create_serializer(self) -> Serializer: return Serializer()
[docs] @abstractmethod def get_options(self, section: str = None) -> Dict[str, str]: """Get all options for a section. If ``opt_keys`` is given return only options with those keys. :param section: section in the ini file to fetch the value; defaults to constructor's ``default_section`` """ pass
[docs] @abstractmethod def has_option(self, name: str, section: str = None) -> bool: pass
[docs] def get_option(self, name: str, section: str = None) -> str: """Return an option from ``section`` with ``name``. :param section: section in the ini file to fetch the value; defaults to constructor's ``default_section`` :param vars: contains the defaults for missing values of ``name`` """ val = None opts = self.get_options(section or self.default_section) if opts is not None: val = opts.get(name) if val is None: raise ConfigurableError( f"No option '{name}' found in section: {section}") return val
[docs] def reload(self): """Reload the configuration from the backing store. """ pass
[docs] def get_option_list(self, name: str, section: str = None) -> List[str]: """Just like :meth:`get_option` but parse as a list using ``split``. :param section: section in the ini file to fetch the value; defaults to constructor's ``default_section`` """ val = self.get_option(name, section) return self.serializer.parse_list(val)
[docs] def get_option_boolean(self, name: str, section: str = None) -> bool: """Just like :meth:`get_option` but parse as a boolean (any case `true`). :param section: section in the ini file to fetch the value; defaults to constructor's ``default_section`` :param vars: contains the defaults for missing values of ``name`` """ val = self.get_option(name, section) val = val.lower() if val else 'false' return val == 'true'
[docs] def get_option_int(self, name: str, section: str = None): """Just like :meth:`get_option` but parse as an integer. :param section: section in the ini file to fetch the value; defaults to constructor's ``default_section`` """ val = self.get_option(name, section) if val: return int(val)
[docs] def get_option_float(self, name: str, section: str = None): """Just like :meth:`get_option` but parse as a float. """ val = self.get_option(name, section) if val: return float(val)
[docs] def get_option_path(self, name: str, section: str = None): """Just like :meth:`get_option` but return a ``pathlib.Path`` object of the string. """ val = self.get_option(name, section) path = None if val is not None: path = Path(val) return path
[docs] def get_option_object(self, name: str, section: str = None): """Just like :meth:`get_option` but parse as an object per object syntax rules. :see: :meth:`.Serializer.parse_object` """ val = self.get_option(name, section) if val: return self.serializer.parse_object(val)
@property def options(self) -> Dict[str, Any]: """All options from the default section. """ return self.get_options()
[docs] def populate(self, obj: Any = None, section: str = None, parse_types: bool = True) -> Union[dict, Settings]: """Set attributes in ``obj`` with ``setattr`` from the all values in ``section``. """ section = self.default_section if section is None else section sec = self.get_options(section) if sec is None: # needed for the YamlConfig class raise ConfigurableError( f"No section from which to populate: '{section}'") return self.serializer.populate_state(sec, obj, parse_types)
def __getitem__(self, section: str = None) -> Settings: return self.populate(section=section) @property def sections(self) -> Set[str]: """All sections of the configuration file. """ return frozenset()
[docs] def set_option(self, name: str, value: str, section: str = None): """Set an option on this configurable. :param name: the name of the option :param value: the value to set :param section: the section (if applies) to add the option :raises NotImplementedError: if this class does not support this operation """ raise NotImplementedError()
[docs] def copy_sections(self, to_populate: Configurable, sections: Iterable[str] = None, robust: bool = False) -> Exception: """Copy all sections from this configuruable to ``to_populate``. :param to_populate: the target configuration object :param sections: the sections to populate or ``None`` to copy allow :param robust: if ``True``, when any exception occurs (namely interplation exceptions), don't copy and remove the section in the target configuraiton :return: the last exception that occured while trying to copy the properties """ last_ex = None if sections is None: sections = self.sections for sec in sections: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'copying section {sec}') try: opts: Dict[str, Any] = self.get_options(sec) if opts is None: raise ConfigurableError(f"No such section: '{sec}'") for k, v in opts.items(): to_populate.set_option(k, v, sec) # robust is needed by lib.ConfigurationImporter._load(); but deal # only with interpolation errors except ConfigurableError as e: raise e except Exception as e: if not robust: raise e else: to_populate.remove_section(sec) last_ex = e return last_ex
[docs] def remove_section(self, section: str): """Remove a seciton with the given name.""" raise NotImplementedError()
[docs] def merge(self, to_populate: Configurable): """Copy all data from this configuruable to ``to_populate``, and clobber any overlapping properties in the process. :param to_populate: the target configuration object """ to_populate.copy_sections(self, to_populate.sections)
def _get_calling_module(self, depth: int = 0): """Get the last module in the call stack that is not this module or ``None`` if the call originated from this module. """ for frame in inspect.stack(): mod = inspect.getmodule(frame[depth]) logger.debug(f'calling module: {mod}') if mod is not None: mod_name = mod.__name__ if mod_name != __name__: return mod
[docs] def resource_filename(self, resource_name: str, module_name: str = None): """Return a resource based on a file name. This uses the ``pkg_resources`` package first to find the resources. If it doesn't find it, it returns a path on the file system. :param: resource_name the file name of the resource to obtain (or name if obtained from an installed module) :param module_name: the name of the module to obtain the data, which defaults to ``__name__`` :return: a path on the file system or resource of the installed module """ if module_name is None: mod = self._get_calling_module() logger.debug(f'calling module: {mod}') if mod is not None: module_name = mod.__name__ return self.serializer.resource_filename(resource_name, module_name)
[docs] def write(self, depth: int = 0, writer: TextIOBase = sys.stdout): for sec in sorted(self.sections): self._write_line(sec, depth, writer) opts: Dict[str, str] = self.get_options(sec) if opts is None: raise ConfigurationError(f'No such section: {sec}') if not isinstance(opts, dict): raise ConfigurationError( f"Expecting dict but got {type(opts)} in section '{sec}'") for k in sorted(opts.keys()): v = opts[k] self._write_line(f'{k}: {v}', depth + 1, writer)
[docs] def asdict(self, *args, **kwargs) -> Dict[str, Any]: secs = OrderedDict() for sec in sorted(self.sections): svs = OrderedDict() secs[sec] = svs opts = self.get_options(sec) for k in sorted(opts.keys()): svs[k] = opts[k] return secs
[docs] def as_deep_dict(self) -> Dict[str, Any]: """Return a deep :class:`builtins.dict` with the top level with section names as keys and deep (i.e. ``json:``) values as nested dictionaries. """ secs = OrderedDict() for sec in sorted(self.sections): svs = OrderedDict() secs[sec] = svs self.populate(svs, sec) return secs
[docs] def as_one_tier_dict(self, *args, **kwargs) -> Dict[str, Any]: """Return a flat one-tier :class:`builtins.dict` with keys in ``<section>:<option>`` format. """ flat: Dict[str, Any] = {} for sec, opts in self.asdict(False).items(): for k, v in opts.items(): flat[f'{sec}:{k}'] = v return flat
def _get_section_short_str(self): try: return next(iter(self.sections)) except StopIteration: return '' def _get_short_str(self) -> str: sec = self._get_section_short_str() return f'{self.__class__.__name__}{{{sec}}}' def _get_container_desc(self, include_type: bool = True, max_path_len: int = 3) -> str: return self.__class__.__name__ def _raise(self, msg: str, err: Exception = None): config_file: Optional[Union[Path, str]] = None if hasattr(self, 'config_file'): config_file = self.config_file if isinstance(config_file, str): msg = f'{msg} in file {config_file}' elif isinstance(config_file, Path): msg = f'{msg} in file {config_file.absolute()}' else: msg = f'{msg} in {self._get_container_desc()}' if err is None: raise ConfigurableError(msg) else: raise ConfigurableError(msg) from err def __str__(self): return f'<{self._get_short_str()}>' def __repr__(self): return self.__str__()
[docs] class TreeConfigurable(Configurable, metaclass=ABCMeta): """A hierarchical configuration. The sections are the root nodes, but each section's values can be nested :class:`~builtins.dict` instances. These values are traversable with a string dot path notation. """
[docs] def __init__(self, default_section: str = None, default_vars: Dict[str, Any] = None, sections_name: str = 'sections', sections: Set[str] = None): """Initialize. :param default_section: used as the default section when non given on the get methds such as :meth:`get_option`; which defaults to ``defualt`` :param default_vars: used in place of missing variables duing value interpolation; **deprecated**: this will go away in a future release :param sections_name: the dot notated path to the variable that has a list of sections :param sections: used as the set of sections for this instance """ super().__init__(default_section=default_section) self.sections_name = sections_name self.default_vars = default_vars if default_vars else {} self._sections = sections self._options = None
@abstractmethod def _get_config(self) -> Dict[str, Any]: pass @abstractmethod def _set_config(self, source: Dict[str, Any]): pass @property def config(self) -> Dict[str, Any]: """The configuration as a nested set of :class:`~builtins.dict`. :see: :meth:`invalidate` """ return self._get_config() @config.setter def config(self, source: Dict[str, Any]): """The configuration as a nested set of :class:`~builtins.dict`. :see: :meth:`invalidate` """ self._set_config(source) @property def root(self) -> Optional[str]: """The root name of the configuration file, if one exists. If more than one root exists, return the first. """ if not hasattr(self, '_root'): root_keys: Iterable[str] = self.config.keys() if len(root_keys) > 0: self._root = next(iter(root_keys)) else: self._root = None return self._root @classmethod def _is_primitive(cls, obj) -> bool: return isinstance(obj, (float, int, bool, str, set, list, tuple, Type, Path)) def _flatten(self, context: Dict[str, Any], path: str, n: Dict[str, Any], sep: str = '.'): if logger.isEnabledFor(logging.DEBUG): logger.debug(f'path: {path}, n: <{n}>, context: <{context}>') if n is None: context[path] = None elif self._is_primitive(n): context[path] = n elif isinstance(n, (dict, Settings)): for k, v in n.items(): k = path + sep + k if len(path) else k self._flatten(context, k, v, sep) else: self._raise(f'Unknown yaml type {type(n)}: {n}')
[docs] def invalidate(self): """This should be called when the underlying :obj:`config` object graph changes *under the nose* of this instance. """ context = {} context.update(self.default_vars) self._flatten(context, '', self.config) self._all_keys = set(context.keys()) self._sections = None self._options = None if hasattr(self, '_root'): del self._root
def _find_node(self, n: Union[Dict, Any], path: str, name: str): if logger.isEnabledFor(logging.DEBUG): logger.debug(f'search: n={n}, path={path}, name={name}') if path == name: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'found: <{n}>') return n elif isinstance(n, dict): for k, v in n.items(): k = path + '.' + k if len(path) else k v = self._find_node(v, k, name) if v is not None: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'found {name} -> {v}') return v if logger.isEnabledFor(logging.DEBUG): logger.debug('not found: {}'.format(name))
[docs] def get_tree(self, name: Optional[str] = None) -> Dict[str, Any]: """Get the node in the configuration, which is a nested set :class:`~builtins.dict` instances as an object graph. :param name: the doted notation indicating which node in the tree to retrieve """ if name is None: return self.config return self._find_node(self.config, '', name)
def _get_option(self, name: str) -> str: node = self.get_tree(name) if self._is_primitive(node): return node elif self.default_vars is not None and name in self.default_vars: return self.default_vars[name] elif node is None: # values in YAML can be set to ``null`` return None else: self._raise(f'Unknown type or state: {name} ({type(node)})') @property def options(self) -> Dict[str, Any]: if self._options is None: self.config self._options = {} for k in self._all_keys: self._options[k] = self._get_option(k) return self._options
[docs] def has_option(self, name: str, section: str = None) -> bool: opts = self.options return name in opts
[docs] def get_option(self, name: str, section: str = None) -> str: """Return an option using a dot encoded path. :param section: ignored """ if self.default_vars is not None and name in self.default_vars: return self.default_vars[name] else: ops = self.options if name in ops: return ops[name] else: self._raise(f'No such option: {name}')
[docs] def get_options(self, name: str = None) -> Dict[str, Any]: name = self.default_section if name is None else name if self.default_vars is not None and name in self.default_vars: return self.default_vars[name] else: node = self.get_tree(name) if not isinstance(node, str) or isinstance(node, list): return node elif name in self.default_vars: return self.default_vars[name] else: self._raise(f'No such option: {name}')
def _get_at_depth(self, node: Any, s_level: int, level: int, path: List[str]) -> Set[str]: def map_node(x: Tuple[str, Any]) -> str: k, v = x if isinstance(v, dict): if len(path) > 0: k = '.'.join(path) + '.' + k else: k = None return k nodes: Set[str] = set() if isinstance(node, dict): if level < s_level: for k, child in node.items(): path.append(k) ns = self._get_at_depth(child, s_level, level + 1, path) path.pop() nodes.update(ns) elif level == s_level: return set(filter(lambda x: x is not None, map(map_node, node.items()))) return nodes def _find_sections(self) -> Set[str]: secs: Set[str] sec_key = f'{self.root}.{self.sections_name}' if self.has_option(sec_key): secs: Dict[str, Any] = self.get_tree(sec_key) if isinstance(secs, str): secs = self.get_option_list(sec_key) elif isinstance(secs, int): secs = self._get_at_depth(self.get_tree(None), secs, 0, []) secs = frozenset(secs) else: secs = self._get_at_depth(self.get_tree(None), 0, 0, []) return secs @property def sections(self) -> Set[str]: """The sections by finding the :obj:`section_name` based from the :obj:`root`. """ if self._sections is None: self._sections = self._find_sections() return self._sections @sections.setter def sections(self, sections: Set[str]): self._sections = sections
[docs] def write(self, depth: int = 0, writer: TextIOBase = sys.stdout): self._write_dict(self.config, depth, writer)
[docs] def asdict(self, *args, **kwargs) -> Dict[str, Any]: return self.config