"""Abstract base class for a configuration read from a file.
"""
from __future__ import annotations
__author__ = 'Paul Landes'
from typing import (
Tuple, Sequence, Dict, Set, Iterable, List, Any, Union, Optional, Type
)
from abc import ABCMeta, abstractmethod
import sys
import logging
from collections import OrderedDict
import inspect
from pathlib import Path
from io import TextIOBase
from . import ConfigurationError, Serializer, Dictable, Settings
logger = logging.getLogger(__name__)
[docs]
class ConfigurableError(ConfigurationError):
"""Base class raised for any configuration based errors."""
pass
[docs]
class ConfigurableFileNotFoundError(ConfigurableError):
"""Raised when a configuration file is not found for those file based
instances of :class:`.Configurable`.
"""
[docs]
def __init__(self, path: Path, source: Union[Path, Any] = None):
msg = f"No such file: '{path}'"
if isinstance(source, Path):
msg += f' loading from {source}'
super().__init__(msg)
self.path = path
self.source = source
[docs]
class Configurable(Dictable, metaclass=ABCMeta):
"""An abstract base class that represents an application specific
configuration.
Note that many of the getters are implemented in ``configparser``.
However, they are reimplemented here for consistency among parser.
"""
__slots__ = ('default_section', '_parent', 'serializer')
[docs]
def __init__(self, default_section: str = None, *,
parent: Configurable = None):
"""Initialize.
:param default_section: used as the default section when non given on
the get methds such as :meth:`get_option`;
which defaults to ``defualt``
"""
if default_section is None:
self.default_section = 'default'
else:
self.default_section = default_section
self.serializer = self._create_serializer()
self._parent = parent
def _create_serializer(self) -> Serializer:
return Serializer()
@property
def parent(self) -> Optional[Configurable]:
"""The configurable that creates this instance."""
return self._parent
def _get_children(self, name: str = None, section: str = None) -> \
List[Configurable]:
def collect(parent: Configurable, coll: List[Configurable]):
coll.append(parent)
if hasattr(parent, 'children'):
for c in parent.children:
collect(c, coll)
if parent.parent is not None:
collect(parent.parent, coll)
children: List[Configurable] = []
collect(self, children)
if section is not None:
keep: List[Configurable] = []
for c in children:
if c._is_initialized():
# otherwise infinite recursive loop
if name is None:
if section in c.sections:
keep.append(c)
elif c.has_option(name, section):
keep.append(c)
children = keep
return children
[docs]
@abstractmethod
def get_options(self, section: str = None) -> Dict[str, str]:
"""Get all options for a section. If ``opt_keys`` is given return only
options with those keys.
:param section: section in the ini file to fetch the value; defaults to
constructor's ``default_section``
"""
pass
[docs]
@abstractmethod
def has_option(self, name: str, section: str = None) -> bool:
pass
[docs]
def get_option(self, name: str, section: str = None) -> str:
"""Return an option from ``section`` with ``name``.
:param section: section in the ini file to fetch the value; defaults to
constructor's ``default_section``
:param vars: contains the defaults for missing values of ``name``
"""
val = None
opts = self.get_options(section or self.default_section)
if opts is not None:
val = opts.get(name)
if val is None:
raise ConfigurableError(
f"No option '{name}' found in section: {section}")
return val
[docs]
def reload(self):
"""Reload the configuration from the backing store.
"""
pass
[docs]
def get_option_list(self, name: str, section: str = None) -> List[str]:
"""Just like :meth:`get_option` but parse as a list using ``split``.
:param section: section in the ini file to fetch the value; defaults to
constructor's ``default_section``
"""
val = self.get_option(name, section)
return self.serializer.parse_list(val)
[docs]
def get_option_boolean(self, name: str, section: str = None) -> bool:
"""Just like :meth:`get_option` but parse as a boolean (any case
`true`).
:param section: section in the ini file to fetch the value; defaults to
constructor's ``default_section``
:param vars: contains the defaults for missing values of ``name``
"""
val = self.get_option(name, section)
val = val.lower() if val else 'false'
return val == 'true'
[docs]
def get_option_int(self, name: str, section: str = None):
"""Just like :meth:`get_option` but parse as an integer.
:param section: section in the ini file to fetch the value; defaults to
constructor's ``default_section``
"""
val = self.get_option(name, section)
if val:
return int(val)
[docs]
def get_option_float(self, name: str, section: str = None):
"""Just like :meth:`get_option` but parse as a float.
"""
val = self.get_option(name, section)
if val:
return float(val)
[docs]
def get_option_path(self, name: str, section: str = None):
"""Just like :meth:`get_option` but return a ``pathlib.Path`` object of
the string.
"""
val = self.get_option(name, section)
path = None
if val is not None:
path = Path(val)
return path
[docs]
def get_option_object(self, name: str, section: str = None):
"""Just like :meth:`get_option` but parse as an object per object syntax
rules.
:see: :meth:`.Serializer.parse_object`
"""
val = self.get_option(name, section)
if val:
return self.serializer.parse_object(val)
@property
def options(self) -> Dict[str, Any]:
"""All options from the default section.
"""
return self.get_options()
[docs]
def populate(self, obj: Union[Dict[str, str], Any] = None,
section: str = None, parse_types: bool = True) -> \
Union[Dict[str, Any], Settings]:
"""Set attributes in ``obj`` with ``setattr`` from the all values in
``section``.
:param obj: the object to populate
:param section: the section with the source data used to populate
:param parse_types: whether to parse string values into Python types
:return: ``obj`` if given as non-``None``, otherwise a new :class:`dict`
"""
section = self.default_section if section is None else section
sec = self.get_options(section)
if sec is None:
# needed for the YamlConfig class
raise ConfigurableError(
f"No section from which to populate: '{section}'")
return self.serializer.populate_state(sec, obj, parse_types)
def __getitem__(self, section: str = None) -> Settings:
return self.populate(section=section)
def __getstate__(self) -> Dict[str, Any]:
# null out parent
return tuple(map(lambda x: None if isinstance(x, Configurable) else x,
super().__getstate__()))
@property
def sections(self) -> Set[str]:
"""All sections of the configuration file.
"""
return frozenset()
[docs]
def set_option(self, name: str, value: str, section: str = None):
"""Set an option on this configurable.
:param name: the name of the option
:param value: the value to set
:param section: the section (if applies) to add the option
:raises NotImplementedError: if this class does not support this
operation
"""
raise NotImplementedError()
[docs]
def copy_sections(self, to_populate: Configurable,
sections: Iterable[str] = None,
robust: bool = False) -> Exception:
"""Copy all sections from this configuruable to ``to_populate``.
:param to_populate: the target configuration object
:param sections: the sections to populate or ``None`` to copy allow
:param robust: if ``True``, when any exception occurs (namely
interplation exceptions), don't copy and remove the
section in the target configuraiton
:return: the last exception that occured while trying to copy the
properties
"""
last_ex = None
if sections is None:
sections = self.sections
for sec in sections:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'copying section {sec}')
try:
opts: Dict[str, Any] = self.get_options(sec)
if opts is None:
raise ConfigurableError(f"No such section: '{sec}'")
for k, v in opts.items():
to_populate.set_option(k, v, sec)
# robust is needed by lib.ConfigurationImporter._load(); but deal
# only with interpolation errors
except ConfigurableError as e:
raise e
except Exception as e:
if not robust:
raise e
else:
to_populate.remove_section(sec)
last_ex = e
return last_ex
def _copy_import_section(self, sections: Sequence[str],
target: Configurable = None):
"""Copy import section definitions defined in this instance to
``target``.
:param sections: the name of the sections that have import definitions
(per :meth:`.Configurable.from_section`)
:param target: the configurable to populate, which defaults to this
instance
"""
from . import ConfigurableFactory
target = self if target is None else target
secs: Dict[str, Dict[str, Any]] = \
dict(map(lambda s: (s, self.get_options(s)), sections))
sec_name: str
params: Dict[str, Any]
for sec_name, params in secs.items():
if params is None:
raise ConfigurableError(f"No section '{sec_name}' in {target}")
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'import sec: {sec_name}')
src: Configurable = ConfigurableFactory.from_section(
params, sec_name, parent=self)
src.copy_sections(target)
[docs]
def remove_section(self, section: str):
"""Remove a seciton with the given name."""
raise NotImplementedError()
[docs]
def merge(self, to_populate: Configurable):
"""Copy all data from this configuruable to ``to_populate``, and clobber
any overlapping properties in the process.
:param to_populate: the target configuration object
"""
to_populate.copy_sections(self, to_populate.sections)
def _get_calling_module(self, depth: int = 0):
"""Get the last module in the call stack that is not this module or
``None`` if the call originated from this module.
"""
for frame in inspect.stack():
mod = inspect.getmodule(frame[depth])
logger.debug(f'calling module: {mod}')
if mod is not None:
mod_name = mod.__name__
if mod_name != __name__:
return mod
[docs]
def resource_filename(self, resource_name: str, module_name: str = None):
"""Return a resource based on a file name. This uses
:meth:`.Serializer.resource_filename` to find the resources. If it
doesn't find it, it returns a path on the file system.
:param: resource_name the file name of the resource to obtain (or name
if obtained from an installed module)
:param module_name: the name of the module to obtain the data, which
defaults to ``__name__``
:return: a path on the file system or resource of the installed module
"""
if module_name is None:
mod = self._get_calling_module()
logger.debug(f'calling module: {mod}')
if mod is not None:
module_name = mod.__name__
return self.serializer.resource_filename(resource_name, module_name)
[docs]
def write(self, depth: int = 0, writer: TextIOBase = sys.stdout):
for sec in sorted(self.sections):
self._write_line(sec, depth, writer)
opts: Dict[str, str] = self.get_options(sec)
if opts is None:
raise ConfigurationError(f'No such section: {sec}')
if not isinstance(opts, dict):
raise ConfigurationError(
f"Expecting dict but got {type(opts)} in section '{sec}'")
for k in sorted(opts.keys()):
v = opts[k]
self._write_line(f'{k}: {v}', depth + 1, writer)
[docs]
def asdict(self, *args, **kwargs) -> Dict[str, Any]:
secs = OrderedDict()
for sec in sorted(self.sections):
svs = OrderedDict()
secs[sec] = svs
opts = self.get_options(sec)
for k in sorted(opts.keys()):
svs[k] = opts[k]
return secs
[docs]
def as_deep_dict(self) -> Dict[str, Any]:
"""Return a deep :class:`builtins.dict` with the top level with section
names as keys and deep (i.e. ``json:``) values as nested dictionaries.
"""
secs = OrderedDict()
for sec in sorted(self.sections):
svs = OrderedDict()
secs[sec] = svs
self.populate(svs, sec)
return secs
[docs]
def as_one_tier_dict(self, *args, **kwargs) -> Dict[str, Any]:
"""Return a flat one-tier :class:`builtins.dict` with keys in
``<section>:<option>`` format.
"""
flat: Dict[str, Any] = {}
for sec, opts in self.asdict(False).items():
for k, v in opts.items():
flat[f'{sec}:{k}'] = v
return flat
def _get_short_str(self) -> str:
return self.__class__.__name__
def _get_container_desc(self, include_type: bool = True,
max_path_len: int = 3) -> str:
return self.__class__.__name__
def _raise(self, msg: str, err: Exception = None):
config_file: Optional[Union[Path, str]] = None
if hasattr(self, 'config_file'):
config_file = self.config_file
if isinstance(config_file, str):
msg = f'{msg} in file {config_file}'
elif isinstance(config_file, Path):
msg = f'{msg} in file {config_file.absolute()}'
else:
msg = f'{msg} in {self._get_container_desc()}'
if err is None:
raise ConfigurableError(msg)
else:
raise ConfigurableError(msg) from err
def __str__(self):
return f'<{self._get_short_str()}>'
def __repr__(self):
return self.__str__()
[docs]
class TreeConfigurable(Configurable, metaclass=ABCMeta):
"""A hierarchical configuration. The sections are the root nodes, but each
section's values can be nested :class:`~builtins.dict` instances. These
values are traversable with a string dot path notation.
"""
[docs]
def __init__(self, default_section: str = None,
default_vars: Dict[str, Any] = None,
sections_name: str = 'sections',
sections: Set[str] = None,
parent: Configurable = None):
"""Initialize.
:param default_section: used as the default section when non given on
the get methds such as :meth:`get_option`;
which defaults to ``defualt``
:param default_vars: used in place of missing variables duing value
interpolation; **deprecated**: this will go away in
a future release
:param sections_name: the dot notated path to the variable that has a
list of sections
:param sections: used as the set of sections for this instance
"""
super().__init__(default_section=default_section, parent=parent)
self.sections_name = sections_name
self.default_vars = default_vars if default_vars else {}
self._sections = sections
self._options = None
@abstractmethod
def _get_config(self) -> Dict[str, Any]:
pass
@abstractmethod
def _set_config(self, source: Dict[str, Any]):
pass
@abstractmethod
def _is_initialized(self) -> bool:
"""Return whether the configuration is initialized with data."""
pass
@property
def config(self) -> Dict[str, Any]:
"""The configuration as a nested set of :class:`~builtins.dict`.
:see: :meth:`invalidate`
"""
return self._get_config()
@config.setter
def config(self, source: Dict[str, Any]):
"""The configuration as a nested set of :class:`~builtins.dict`.
:see: :meth:`invalidate`
"""
self._set_config(source)
@property
def root(self) -> Optional[str]:
"""The root name of the configuration file, if one exists. If more than
one root exists, return the first.
"""
if not hasattr(self, '_root'):
root_keys: Iterable[str] = self.config.keys()
if len(root_keys) > 0:
self._root = next(iter(root_keys))
else:
self._root = None
return self._root
@classmethod
def _is_primitive(cls, obj) -> bool:
return isinstance(obj, (float, int, bool, str, set,
list, tuple, Type, Path))
def _flatten(self, context: Dict[str, Any], path: str,
n: Dict[str, Any], sep: str = '.'):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'path: {path}, n: <{n}>, context: <{context}>')
if n is None:
context[path] = None
elif self._is_primitive(n):
context[path] = n
elif isinstance(n, (dict, Settings)):
for k, v in n.items():
k = path + sep + k if len(path) else k
self._flatten(context, k, v, sep)
else:
self._raise(f'Unknown yaml type {type(n)}: {n}')
[docs]
def invalidate(self):
"""This should be called when the underlying :obj:`config` object graph
changes *under the nose* of this instance.
"""
context = {}
context.update(self.default_vars)
self._flatten(context, '', self.config)
self._all_keys = set(context.keys())
self._sections = None
self._options = None
if hasattr(self, '_root'):
del self._root
def _find_node(self, n: Union[Dict, Any], path: str, name: str):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'search: n={n}, path={path}, name={name}')
if path == name:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'found: <{n}>')
return n
elif isinstance(n, dict):
for k, v in n.items():
k = path + '.' + k if len(path) else k
v = self._find_node(v, k, name)
if v is not None:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'found {name} -> {v}')
return v
if logger.isEnabledFor(logging.DEBUG):
logger.debug('not found: {}'.format(name))
[docs]
def get_tree(self, name: Optional[str] = None) -> Dict[str, Any]:
"""Get the node in the configuration, which is a nested set
:class:`~builtins.dict` instances as an object graph.
:param name: the doted notation indicating which node in the tree to
retrieve
"""
if name is None:
return self.config
return self._find_node(self.config, '', name)
def _get_option(self, name: str) -> str:
node = self.get_tree(name)
if self._is_primitive(node):
return node
elif self.default_vars is not None and name in self.default_vars:
return self.default_vars[name]
elif node is None:
# values in YAML can be set to ``null``
return None
else:
self._raise(f'Unknown type or state: {name} ({type(node)})')
@property
def options(self) -> Dict[str, Any]:
if self._options is None:
self.config
self._options = {}
for k in self._all_keys:
self._options[k] = self._get_option(k)
return self._options
[docs]
def has_option(self, name: str, section: str = None) -> bool:
opts = self.options
return name in opts
[docs]
def get_option(self, name: str, section: str = None) -> str:
"""Return an option using a dot encoded path.
:param section: ignored
"""
if self.default_vars is not None and name in self.default_vars:
return self.default_vars[name]
else:
ops = self.options
if name in ops:
return ops[name]
else:
self._raise(f"No such option: '{name}'")
[docs]
def get_options(self, name: str = None) -> Dict[str, Any]:
name = self.default_section if name is None else name
if self.default_vars is not None and name in self.default_vars:
return self.default_vars[name]
else:
node = self.get_tree(name)
if not isinstance(node, str) or isinstance(node, list):
return node
elif name in self.default_vars:
return self.default_vars[name]
else:
self._raise(f'No such option: {name}')
def _get_at_depth(self, node: Any, s_level: int, level: int,
path: List[str]) -> Set[str]:
def map_node(x: Tuple[str, Any]) -> str:
k, v = x
if isinstance(v, dict):
if len(path) > 0:
k = '.'.join(path) + '.' + k
else:
k = None
return k
nodes: Set[str] = set()
if isinstance(node, dict):
if level < s_level:
for k, child in node.items():
path.append(k)
ns = self._get_at_depth(child, s_level, level + 1, path)
path.pop()
nodes.update(ns)
elif level == s_level:
return set(filter(lambda x: x is not None,
map(map_node, node.items())))
return nodes
def _find_sections(self) -> Set[str]:
secs: Set[str]
sec_key = f'{self.root}.{self.sections_name}'
if self.has_option(sec_key):
secs: Dict[str, Any] = self.get_tree(sec_key)
if isinstance(secs, str):
secs = self.get_option_list(sec_key)
elif isinstance(secs, int):
secs = self._get_at_depth(self.get_tree(None), secs, 0, [])
secs = frozenset(secs)
else:
secs = self._get_at_depth(self.get_tree(None), 0, 0, [])
return secs
@property
def sections(self) -> Set[str]:
"""The sections by finding the :obj:`section_name` based from the
:obj:`root`.
"""
if self._sections is None:
self._sections = self._find_sections()
return self._sections
@sections.setter
def sections(self, sections: Set[str]):
self._sections = sections
[docs]
def write(self, depth: int = 0, writer: TextIOBase = sys.stdout):
self._write_dict(self.config, depth, writer)
[docs]
def asdict(self, *args, **kwargs) -> Dict[str, Any]:
return self.config