"""Abstract base class for a configuration read from a file.
"""
from __future__ import annotations
__author__ = 'Paul Landes'
from typing import Dict, Set, Iterable, List, Any, Union, Optional, Type, Tuple
from abc import ABCMeta, abstractmethod
import sys
import logging
from collections import OrderedDict
import inspect
from pathlib import Path
from io import TextIOBase
from . import ConfigurationError, Serializer, Dictable, Settings
logger = logging.getLogger(__name__)
[docs]
class ConfigurableError(ConfigurationError):
"""Base class raised for any configuration based errors."""
pass
[docs]
class ConfigurableFileNotFoundError(ConfigurableError):
"""Raised when a configuration file is not found for those file based
instances of :class:`.Configurable`.
"""
[docs]
def __init__(self, path: Path, source: Union[Path, Any] = None):
msg = f"No such file: '{path}'"
if isinstance(source, Path):
msg += f' loading from {source}'
super().__init__(msg)
self.path = path
self.source = source
[docs]
class Configurable(Dictable, metaclass=ABCMeta):
"""An abstract base class that represents an application specific
configuration.
Note that many of the getters are implemented in ``configparser``.
However, they are reimplemented here for consistency among parser.
"""
[docs]
def __init__(self, default_section: str = None):
"""Initialize.
:param default_section: used as the default section when non given on
the get methds such as :meth:`get_option`;
which defaults to ``defualt``
"""
if default_section is None:
self.default_section = 'default'
else:
self.default_section = default_section
self.serializer = self._create_serializer()
def _create_serializer(self) -> Serializer:
return Serializer()
[docs]
@abstractmethod
def get_options(self, section: str = None) -> Dict[str, str]:
"""Get all options for a section. If ``opt_keys`` is given return only
options with those keys.
:param section: section in the ini file to fetch the value; defaults to
constructor's ``default_section``
"""
pass
[docs]
@abstractmethod
def has_option(self, name: str, section: str = None) -> bool:
pass
[docs]
def get_option(self, name: str, section: str = None) -> str:
"""Return an option from ``section`` with ``name``.
:param section: section in the ini file to fetch the value; defaults to
constructor's ``default_section``
:param vars: contains the defaults for missing values of ``name``
"""
val = None
opts = self.get_options(section or self.default_section)
if opts is not None:
val = opts.get(name)
if val is None:
raise ConfigurableError(
f"No option '{name}' found in section: {section}")
return val
[docs]
def reload(self):
"""Reload the configuration from the backing store.
"""
pass
[docs]
def get_option_list(self, name: str, section: str = None) -> List[str]:
"""Just like :meth:`get_option` but parse as a list using ``split``.
:param section: section in the ini file to fetch the value; defaults to
constructor's ``default_section``
"""
val = self.get_option(name, section)
return self.serializer.parse_list(val)
[docs]
def get_option_boolean(self, name: str, section: str = None) -> bool:
"""Just like :meth:`get_option` but parse as a boolean (any case
`true`).
:param section: section in the ini file to fetch the value; defaults to
constructor's ``default_section``
:param vars: contains the defaults for missing values of ``name``
"""
val = self.get_option(name, section)
val = val.lower() if val else 'false'
return val == 'true'
[docs]
def get_option_int(self, name: str, section: str = None):
"""Just like :meth:`get_option` but parse as an integer.
:param section: section in the ini file to fetch the value; defaults to
constructor's ``default_section``
"""
val = self.get_option(name, section)
if val:
return int(val)
[docs]
def get_option_float(self, name: str, section: str = None):
"""Just like :meth:`get_option` but parse as a float.
"""
val = self.get_option(name, section)
if val:
return float(val)
[docs]
def get_option_path(self, name: str, section: str = None):
"""Just like :meth:`get_option` but return a ``pathlib.Path`` object of
the string.
"""
val = self.get_option(name, section)
path = None
if val is not None:
path = Path(val)
return path
[docs]
def get_option_object(self, name: str, section: str = None):
"""Just like :meth:`get_option` but parse as an object per object syntax
rules.
:see: :meth:`.Serializer.parse_object`
"""
val = self.get_option(name, section)
if val:
return self.serializer.parse_object(val)
@property
def options(self) -> Dict[str, Any]:
"""All options from the default section.
"""
return self.get_options()
[docs]
def populate(self, obj: Any = None, section: str = None,
parse_types: bool = True) -> Union[dict, Settings]:
"""Set attributes in ``obj`` with ``setattr`` from the all values in
``section``.
"""
section = self.default_section if section is None else section
sec = self.get_options(section)
if sec is None:
# needed for the YamlConfig class
raise ConfigurableError(
f"No section from which to populate: '{section}'")
return self.serializer.populate_state(sec, obj, parse_types)
def __getitem__(self, section: str = None) -> Settings:
return self.populate(section=section)
@property
def sections(self) -> Set[str]:
"""All sections of the configuration file.
"""
return frozenset()
[docs]
def set_option(self, name: str, value: str, section: str = None):
"""Set an option on this configurable.
:param name: the name of the option
:param value: the value to set
:param section: the section (if applies) to add the option
:raises NotImplementedError: if this class does not support this
operation
"""
raise NotImplementedError()
[docs]
def copy_sections(self, to_populate: Configurable,
sections: Iterable[str] = None,
robust: bool = False) -> Exception:
"""Copy all sections from this configuruable to ``to_populate``.
:param to_populate: the target configuration object
:param sections: the sections to populate or ``None`` to copy allow
:param robust: if ``True``, when any exception occurs (namely
interplation exceptions), don't copy and remove the
section in the target configuraiton
:return: the last exception that occured while trying to copy the
properties
"""
last_ex = None
if sections is None:
sections = self.sections
for sec in sections:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'copying section {sec}')
try:
opts: Dict[str, Any] = self.get_options(sec)
if opts is None:
raise ConfigurableError(f"No such section: '{sec}'")
for k, v in opts.items():
to_populate.set_option(k, v, sec)
# robust is needed by lib.ConfigurationImporter._load(); but deal
# only with interpolation errors
except ConfigurableError as e:
raise e
except Exception as e:
if not robust:
raise e
else:
to_populate.remove_section(sec)
last_ex = e
return last_ex
[docs]
def remove_section(self, section: str):
"""Remove a seciton with the given name."""
raise NotImplementedError()
[docs]
def merge(self, to_populate: Configurable):
"""Copy all data from this configuruable to ``to_populate``, and clobber
any overlapping properties in the process.
:param to_populate: the target configuration object
"""
to_populate.copy_sections(self, to_populate.sections)
def _get_calling_module(self, depth: int = 0):
"""Get the last module in the call stack that is not this module or
``None`` if the call originated from this module.
"""
for frame in inspect.stack():
mod = inspect.getmodule(frame[depth])
logger.debug(f'calling module: {mod}')
if mod is not None:
mod_name = mod.__name__
if mod_name != __name__:
return mod
[docs]
def resource_filename(self, resource_name: str, module_name: str = None):
"""Return a resource based on a file name. This uses the
``pkg_resources`` package first to find the resources. If it doesn't
find it, it returns a path on the file system.
:param: resource_name the file name of the resource to obtain (or name
if obtained from an installed module)
:param module_name: the name of the module to obtain the data, which
defaults to ``__name__``
:return: a path on the file system or resource of the installed module
"""
if module_name is None:
mod = self._get_calling_module()
logger.debug(f'calling module: {mod}')
if mod is not None:
module_name = mod.__name__
return self.serializer.resource_filename(resource_name, module_name)
[docs]
def write(self, depth: int = 0, writer: TextIOBase = sys.stdout):
for sec in sorted(self.sections):
self._write_line(sec, depth, writer)
opts: Dict[str, str] = self.get_options(sec)
if opts is None:
raise ConfigurationError(f'No such section: {sec}')
if not isinstance(opts, dict):
raise ConfigurationError(
f"Expecting dict but got {type(opts)} in section '{sec}'")
for k in sorted(opts.keys()):
v = opts[k]
self._write_line(f'{k}: {v}', depth + 1, writer)
[docs]
def asdict(self, *args, **kwargs) -> Dict[str, Any]:
secs = OrderedDict()
for sec in sorted(self.sections):
svs = OrderedDict()
secs[sec] = svs
opts = self.get_options(sec)
for k in sorted(opts.keys()):
svs[k] = opts[k]
return secs
[docs]
def as_deep_dict(self) -> Dict[str, Any]:
"""Return a deep :class:`builtins.dict` with the top level with section
names as keys and deep (i.e. ``json:``) values as nested dictionaries.
"""
secs = OrderedDict()
for sec in sorted(self.sections):
svs = OrderedDict()
secs[sec] = svs
self.populate(svs, sec)
return secs
[docs]
def as_one_tier_dict(self, *args, **kwargs) -> Dict[str, Any]:
"""Return a flat one-tier :class:`builtins.dict` with keys in
``<section>:<option>`` format.
"""
flat: Dict[str, Any] = {}
for sec, opts in self.asdict(False).items():
for k, v in opts.items():
flat[f'{sec}:{k}'] = v
return flat
def _get_section_short_str(self):
try:
return next(iter(self.sections))
except StopIteration:
return ''
def _get_short_str(self) -> str:
sec = self._get_section_short_str()
return f'{self.__class__.__name__}{{{sec}}}'
def _get_container_desc(self, include_type: bool = True,
max_path_len: int = 3) -> str:
return self.__class__.__name__
def _raise(self, msg: str, err: Exception = None):
config_file: Optional[Union[Path, str]] = None
if hasattr(self, 'config_file'):
config_file = self.config_file
if isinstance(config_file, str):
msg = f'{msg} in file {config_file}'
elif isinstance(config_file, Path):
msg = f'{msg} in file {config_file.absolute()}'
else:
msg = f'{msg} in {self._get_container_desc()}'
if err is None:
raise ConfigurableError(msg)
else:
raise ConfigurableError(msg) from err
def __str__(self):
return f'<{self._get_short_str()}>'
def __repr__(self):
return self.__str__()
[docs]
class TreeConfigurable(Configurable, metaclass=ABCMeta):
"""A hierarchical configuration. The sections are the root nodes, but each
section's values can be nested :class:`~builtins.dict` instances. These
values are traversable with a string dot path notation.
"""
[docs]
def __init__(self, default_section: str = None,
default_vars: Dict[str, Any] = None,
sections_name: str = 'sections',
sections: Set[str] = None):
"""Initialize.
:param default_section: used as the default section when non given on
the get methds such as :meth:`get_option`;
which defaults to ``defualt``
:param default_vars: used in place of missing variables duing value
interpolation; **deprecated**: this will go away in
a future release
:param sections_name: the dot notated path to the variable that has a
list of sections
:param sections: used as the set of sections for this instance
"""
super().__init__(default_section=default_section)
self.sections_name = sections_name
self.default_vars = default_vars if default_vars else {}
self._sections = sections
self._options = None
@abstractmethod
def _get_config(self) -> Dict[str, Any]:
pass
@abstractmethod
def _set_config(self, source: Dict[str, Any]):
pass
@property
def config(self) -> Dict[str, Any]:
"""The configuration as a nested set of :class:`~builtins.dict`.
:see: :meth:`invalidate`
"""
return self._get_config()
@config.setter
def config(self, source: Dict[str, Any]):
"""The configuration as a nested set of :class:`~builtins.dict`.
:see: :meth:`invalidate`
"""
self._set_config(source)
@property
def root(self) -> Optional[str]:
"""The root name of the configuration file, if one exists. If more than
one root exists, return the first.
"""
if not hasattr(self, '_root'):
root_keys: Iterable[str] = self.config.keys()
if len(root_keys) > 0:
self._root = next(iter(root_keys))
else:
self._root = None
return self._root
@classmethod
def _is_primitive(cls, obj) -> bool:
return isinstance(obj, (float, int, bool, str, set,
list, tuple, Type, Path))
def _flatten(self, context: Dict[str, Any], path: str,
n: Dict[str, Any], sep: str = '.'):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'path: {path}, n: <{n}>, context: <{context}>')
if n is None:
context[path] = None
elif self._is_primitive(n):
context[path] = n
elif isinstance(n, (dict, Settings)):
for k, v in n.items():
k = path + sep + k if len(path) else k
self._flatten(context, k, v, sep)
else:
self._raise(f'Unknown yaml type {type(n)}: {n}')
[docs]
def invalidate(self):
"""This should be called when the underlying :obj:`config` object graph
changes *under the nose* of this instance.
"""
context = {}
context.update(self.default_vars)
self._flatten(context, '', self.config)
self._all_keys = set(context.keys())
self._sections = None
self._options = None
if hasattr(self, '_root'):
del self._root
def _find_node(self, n: Union[Dict, Any], path: str, name: str):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'search: n={n}, path={path}, name={name}')
if path == name:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'found: <{n}>')
return n
elif isinstance(n, dict):
for k, v in n.items():
k = path + '.' + k if len(path) else k
v = self._find_node(v, k, name)
if v is not None:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'found {name} -> {v}')
return v
if logger.isEnabledFor(logging.DEBUG):
logger.debug('not found: {}'.format(name))
[docs]
def get_tree(self, name: Optional[str] = None) -> Dict[str, Any]:
"""Get the node in the configuration, which is a nested set
:class:`~builtins.dict` instances as an object graph.
:param name: the doted notation indicating which node in the tree to
retrieve
"""
if name is None:
return self.config
return self._find_node(self.config, '', name)
def _get_option(self, name: str) -> str:
node = self.get_tree(name)
if self._is_primitive(node):
return node
elif self.default_vars is not None and name in self.default_vars:
return self.default_vars[name]
elif node is None:
# values in YAML can be set to ``null``
return None
else:
self._raise(f'Unknown type or state: {name} ({type(node)})')
@property
def options(self) -> Dict[str, Any]:
if self._options is None:
self.config
self._options = {}
for k in self._all_keys:
self._options[k] = self._get_option(k)
return self._options
[docs]
def has_option(self, name: str, section: str = None) -> bool:
opts = self.options
return name in opts
[docs]
def get_option(self, name: str, section: str = None) -> str:
"""Return an option using a dot encoded path.
:param section: ignored
"""
if self.default_vars is not None and name in self.default_vars:
return self.default_vars[name]
else:
ops = self.options
if name in ops:
return ops[name]
else:
self._raise(f'No such option: {name}')
[docs]
def get_options(self, name: str = None) -> Dict[str, Any]:
name = self.default_section if name is None else name
if self.default_vars is not None and name in self.default_vars:
return self.default_vars[name]
else:
node = self.get_tree(name)
if not isinstance(node, str) or isinstance(node, list):
return node
elif name in self.default_vars:
return self.default_vars[name]
else:
self._raise(f'No such option: {name}')
def _get_at_depth(self, node: Any, s_level: int, level: int,
path: List[str]) -> Set[str]:
def map_node(x: Tuple[str, Any]) -> str:
k, v = x
if isinstance(v, dict):
if len(path) > 0:
k = '.'.join(path) + '.' + k
else:
k = None
return k
nodes: Set[str] = set()
if isinstance(node, dict):
if level < s_level:
for k, child in node.items():
path.append(k)
ns = self._get_at_depth(child, s_level, level + 1, path)
path.pop()
nodes.update(ns)
elif level == s_level:
return set(filter(lambda x: x is not None,
map(map_node, node.items())))
return nodes
def _find_sections(self) -> Set[str]:
secs: Set[str]
sec_key = f'{self.root}.{self.sections_name}'
if self.has_option(sec_key):
secs: Dict[str, Any] = self.get_tree(sec_key)
if isinstance(secs, str):
secs = self.get_option_list(sec_key)
elif isinstance(secs, int):
secs = self._get_at_depth(self.get_tree(None), secs, 0, [])
secs = frozenset(secs)
else:
secs = self._get_at_depth(self.get_tree(None), 0, 0, [])
return secs
@property
def sections(self) -> Set[str]:
"""The sections by finding the :obj:`section_name` based from the
:obj:`root`.
"""
if self._sections is None:
self._sections = self._find_sections()
return self._sections
@sections.setter
def sections(self, sections: Set[str]):
self._sections = sections
[docs]
def write(self, depth: int = 0, writer: TextIOBase = sys.stdout):
self._write_dict(self.config, depth, writer)
[docs]
def asdict(self, *args, **kwargs) -> Dict[str, Any]:
return self.config