Source code for zensols.config.importini

"""Contains a class for importing child configurations.

"""
from __future__ import annotations
__author__ = 'Paul Landes'
from typing import Iterable, Tuple, List, Dict, Any, Set, Sequence, Union
import logging
from itertools import chain
from collections import ChainMap
from pathlib import Path
from configparser import (
    ConfigParser, ExtendedInterpolation, InterpolationMissingOptionError
)
from zensols.introspect import ClassImporterError
from . import (
    ConfigurableError, ConfigurableFileNotFoundError,
    Configurable, ConfigurableFactory, IniConfig, ImportYamlConfig, rawconfig,
)

logger = logging.getLogger(__name__)


class _ParserAdapter(object):
    """Adapts a :class:`~configparser.ConfigParser` to a :class:`.Configurable`.

    """
    def __init__(self, conf: Configurable, defs: Dict[str, str]):
        self.conf = conf
        self.defs = defs

    def get(self, section: str, option: str, *args, **kwags):
        if logger.isEnabledFor(logging.TRACE):
            logger.trace(
                f'get ({type(self.conf).__name__}): {section}:{option}')
        if self.conf.has_option(option, section):
            if logger.isEnabledFor(logging.TRACE):
                logger.trace('contains option')
            val = self.conf.get_option(option, section)
        else:
            if logger.isEnabledFor(logging.TRACE):
                logger.trace(f'option not found, trying defs: {self.defs}')
            val = self.defs.get(f'{section}:{option}')
            if logger.isEnabledFor(logging.TRACE):
                logger.trace(f'using defaults value: {val}')
            if val is None:
                # raise an InterpolationMissingOptionError
                try:
                    self.conf.get_option(option, section)
                except ConfigurableError as e:
                    raise ConfigurableError(
                        f'Can not get option {section}:{option}') from e
        return val

    def optionxform(self, option: str) -> str:
        return option.lower()

    def items(self, section: str, raw: bool = False):
        return self.conf.get_options(section)

    def __str__(self) -> str:
        return str(self.conf.__class__.__name__)

    def __repr__(self) -> str:
        return self.__str__()


class _SharedExtendedInterpolation(ExtendedInterpolation):
    """Adds other :class:`Configurable` instances to available parameter to
    substitute.

    """
    def __init__(self, children: Tuple[Configurable, ...],
                 robust: bool = False):
        super().__init__()
        defs = {}
        for child in children:
            with rawconfig(child):
                for sec in child.sections:
                    for k, v in child.get_options(sec).items():
                        defs[f'{sec}:{k}'] = v
        self.children = tuple(map(lambda c: _ParserAdapter(c, defs), children))
        self.robust = robust

    def before_get(self, parser: ConfigParser, section: str, option: str,
                   value: str, defaults: ChainMap):
        if logger.isEnabledFor(logging.TRACE):
            logger.trace(f'before_get: section: {section}:{option}: {value}')
        res = value
        last_ex = None
        parsers = tuple(chain.from_iterable([[parser], self.children]))
        if logger.isEnabledFor(logging.TRACE):
            logger.trace(f'defaults: {defaults}')
        for pa in parsers:
            try:
                if logger.isEnabledFor(logging.TRACE):
                    logger.trace(f'inter: {pa}: {section}:{option} = {value}')
                res = super().before_get(pa, section, option, value, defaults)
                last_ex = None
                break
            except InterpolationMissingOptionError as e:
                last_ex = e
                if logger.isEnabledFor(logging.TRACE):
                    logger.trace(f'missing option: {e}')
        if (not self.robust) and (last_ex is not None):
            msg = f'can not set {section}:{option} = {value}: {last_ex}'
            raise ConfigurableError(msg)
        return res


class _BootstrapConfig(IniConfig):
    """Configuration class extends using advanced interpolation with
    :class:`~configparser.ExtendedInterpolation`.  One of these is created
    every time an instance of :class:`.ImportIniConfig` is created, which
    includes nested configruation imports when we *descend* recursively.

    """
    def __init__(self, parent: IniConfig, children: Tuple[Configurable, ...]):
        """Initialize.

        :param parent: the initial config having only the import, load and
                       reference sections

        :param children: the children initialized with
                         :class:`.ImportIniConfig`, which are later used to
                         copy forward configuration as configurations are
                         loaded

        """
        super().__init__(parent, parent.default_section)
        self.children = [parent] + list(children)

    def append_child(self, child: Configurable):
        self.children.append(child)
        for c in self.children:
            with rawconfig(c):
                c.copy_sections(self)

    def _create_config_parser(self) -> ConfigParser:
        parser = ConfigParser(
            interpolation=_SharedExtendedInterpolation(self.children))
        with rawconfig(self.config_file):
            for sec in self.config_file.sections:
                parser.add_section(sec)
                for k, v in self.config_file.get_options(sec).items():
                    parser.set(sec, k, v)
        return parser

    def _create_and_load_parser(self, parser: ConfigParser):
        # skip reloading, as that was done when the parser was created
        pass


[docs] class ImportIniConfig(IniConfig): """A configuration that uses other :class:`.Configurable` classes to load other sections. A special ``import`` section is given that indicates what other sections to load as children configuration. Each of those indicated to import are processed in order by: 1. Creating the delegate child :class:`Configurable` given in the section. 2. Copying all sections from child instance to the parent. 3. Variable interpolation as a function of :class:`~configparser.ConfigParser` using :class:`~configparser.ExtendedInterpolation`. The ``import`` section has a ``sections`` entry as list of sections to load, a ``references`` entry indicating which sections to provide as children sections in child loaders, a ``config_file`` and ``config_files` entries to load as children directly. For example:: [import] references = list: default, package, env sections = list: imp_obj [imp_obj] type = importini config_file = resource: resources/obj.conf This configuration loads a resource import INI, which is an implementation of this class, and provides sections ``default``, ``package`` and ``env`` for any property string interpolation while loading ``obj.conf``. See the `API documentation <https://plandes.github.io/util/doc/config.html#import-ini-configuration>`_ for more information. """ IMPORT_SECTION = 'import' SECTIONS_SECTION = 'sections' SINGLE_CONFIG_FILE = ConfigurableFactory.SINGLE_CONFIG_FILE CONFIG_FILES = 'config_files' REFS_NAME = 'references' CLEANUPS_NAME = 'cleanups' TYPE_NAME = ConfigurableFactory.TYPE_NAME _IMPORT_SECTION_FIELDS = {SECTIONS_SECTION, SINGLE_CONFIG_FILE, CONFIG_FILES, REFS_NAME, CLEANUPS_NAME} _VISITED_FILES = None
[docs] def __init__(self, *args, config_section: str = IMPORT_SECTION, exclude_config_sections: bool = True, children: Tuple[Configurable, ...] = (), use_interpolation: bool = True, **kwargs): """Initialize. :param config_file: the configuration file path to read from :param default_section: default section (defaults to `default`) :param robust: if `True`, then don't raise an error when the configuration file is missing :param config_section: the name of the section that has the configuration (i.e. the ``sections`` entry) :param exclude_config_sections: if ``True``, the import and other configuration sections are removed :param children: additional configurations used both before and after bootstrapping :param use_interpolation: if ``True``, interpolate variables using :class:`~configparser.ExtendedInterpolation` """ super().__init__(*args, use_interpolation=use_interpolation, **kwargs) self.config_section = config_section self.exclude_config_sections = exclude_config_sections if children is None: self._raise('Missing importini children') self.children = children if exclude_config_sections and \ (self.default_section == self.config_section): self._raise('You must set exclude_config_sections to False ' + 'when the import and config section are the same')
def _get_bootstrap_config(self) -> _BootstrapConfig: """Create the config that is used to read only the sections needed to import/load other configuration. This adds the import section, any sections it *refers* to, and the sections it indicates to load. References are those needed to continue parsing the rest of the boot strap configuration for this instance. This usually includes a ``default`` section that might have a ``resources`` property used to populate a load section paths. """ if logger.isEnabledFor(logging.TRACE): logger.trace('creating bootstrap parser') conf_sec = self.config_section bs_config = IniConfig(self.config_file) cparser = bs_config.parser has_secs = bs_config.has_option(self.SECTIONS_SECTION, conf_sec) has_refs = bs_config.has_option(self.REFS_NAME, conf_sec) # add sections and references to the temporary config if has_secs or has_refs: secs = set() # add load sections if has_secs: sec_lst: List[Union[str, Path]] = self.serializer.parse_object( bs_config.get_option(self.SECTIONS_SECTION, conf_sec)) secs.update(set(sec_lst)) # add references if has_refs: refs: List[Union[str, Path]] = self.serializer.parse_object( bs_config.get_option(self.REFS_NAME, conf_sec)) secs.update(set(refs)) # add the import section itself, used later to load children config secs.add(conf_sec) # remove all sections but import, load and reference from the # parser to_remove = set(bs_config.sections) - secs for r in to_remove: cparser.remove_section(r) return _BootstrapConfig(bs_config, self.children) def _validate_bootstrap_config(self, config: Configurable): """Validate that the import section doesn't have bad configuration.""" conf_sec: str = self.config_section if conf_sec in config.sections: import_sec: Dict[str, str] = config.populate({}, conf_sec) import_props: Set[str] = set(import_sec.keys()) refs: List[str] = import_sec.get(self.REFS_NAME) file_props: Set[str] = {self.SINGLE_CONFIG_FILE, self.CONFIG_FILES} aliens = import_props - self._IMPORT_SECTION_FIELDS if len(aliens) > 0: props = ', '.join(map(lambda p: f"'{p}'", aliens)) self._raise(f"Invalid options in section '{conf_sec}'" + f": {props}") if len(file_props & import_props) == 2: self._raise( f"Cannot have both '{self.SINGLE_CONFIG_FILE}' " + f"and '{self.CONFIG_FILES}' in section '{conf_sec}'") if refs is not None: for ref in refs: if ref not in config.sections: self._raise( f"Reference '{ref}' in section '{conf_sec}' not " + f"found, got: {set(config.sections)}") def _create_config(self, section: str, params: Dict[str, Any]) -> Configurable: """Create a config from a section.""" return ConfigurableFactory.from_section(params, section) def _create_configs(self, section: str, params: Dict[str, Any], bs_config: _BootstrapConfig) -> List[Configurable]: """Create one or more :class:`~zensols.config.Configuration` instance depending on if one or more configuration files are given. Configurations are created with using a :class:`~zensols.config.ConfigurationFactory` in :meth:`_create_config`. This method is called once to create all configuration files for obj:`CONFIG_FILES` and again for each section for :obj:`SECTIONS_SECTION`. :param section: the import ini section to load :param params: the section options/properties :param bs_config: the bootstrap loader created in :meth:`_get_bootstrap_config` """ configs: List[Configurable] = [] conf_files: List[str] = params.get(self.CONFIG_FILES) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'creating configs from section: [{section}]') if conf_files is None: try: # create a configuration from the section as a section load configs.append(self._create_config(section, params)) except ClassImporterError as e: raise ConfigurableError( f"Could not import section '{section}': {e}") from e else: # otherwise, synthesize a section load for each given config file sparams = dict(params) del sparams[self.CONFIG_FILES] try: for cf in conf_files: parsed_cf = self.serializer.parse_object(cf) # skip Nones substituted by introplation (like when # ConfigurationImporter subtitutues a missing config file) if parsed_cf is not None: sparams[self.SINGLE_CONFIG_FILE] = parsed_cf conf = self._create_config(section, sparams) configs.append(conf) except ClassImporterError as e: raise ConfigurableError( f"Could not import '{cf}' in section '{section}': {e}") \ from e # add configurations as children to the bootstrap config for config in configs: # recursively create new import ini configs and add the children # we've created thus far for forward interpolation capability if isinstance(config, (ImportIniConfig, ImportYamlConfig)): if logger.isEnabledFor(logging.INFO): logger.info(f'descending: {config.config_file}') if logger.isEnabledFor(logging.DEBUG): logger.debug(f'adding bootstrap {bs_config.children} + ' + f'self {self.children} to {config}') if self._VISITED_FILES is not None: self._VISITED_FILES.append(config.config_file) # add children bootstrap config that aren't add duplicates # children created with this instance ids: Set[int] = set(map(lambda c: id(c), bs_config.children)) new_children = list(bs_config.children) new_children.extend( tuple(filter(lambda c: id(c) not in ids, self.children))) config.children = tuple(new_children) # add the configurable to the bootstrap config bs_config.append_child(config) return configs def _get_children(self) -> Tuple[List[str], Iterable[Configurable]]: """"Get children used for this config instance. This is done by import each import section and files by delayed loaded for each. Order is important as each configuration can refer to previously loaded configurations. For this reason, the :class:`_ConfigLoader` is needed to defer loading: one for loading sections, and one for loading file. """ # guard on OS level config file since the super class allows different # types such as directory; we only deal with files in this class if isinstance(self.config_file, Path) and \ not self.config_file.is_file(): raise ConfigurableFileNotFoundError(self.config_file) # create the bootstrap config used to start the import process bs_config: _BootstrapConfig = self._get_bootstrap_config() conf_sec: str = self.config_section conf_secs: Set[str] = {conf_sec} if logger.isEnabledFor(logging.DEBUG): logger.debug(f'parsing section: {conf_sec}') # look for bad configuration in the import section self._validate_bootstrap_config(bs_config) if logger.isEnabledFor(logging.TRACE): logger.trace(f'creating children for: {conf_sec}') # first load files given in the import section if bs_config.has_option(self.SINGLE_CONFIG_FILE, conf_sec): fname: Union[Path, str] = self.serializer.parse_object( bs_config.get_option(self.SINGLE_CONFIG_FILE, conf_sec)) params = {self.SINGLE_CONFIG_FILE: fname} self._create_configs('<no section>', params, bs_config) elif bs_config.has_option(self.CONFIG_FILES, conf_sec): sec = bs_config.populate(section=conf_sec) fnames: List[str] = self.serializer.parse_object( bs_config.get_option(self.CONFIG_FILES, conf_sec)) for fname in fnames: # enable resource descriptors fname: Any = self.serializer.parse_object(fname) params = {self.SINGLE_CONFIG_FILE: fname} self._create_configs('<no section>', params, bs_config) # load each import section, again in order if bs_config.has_option(self.SECTIONS_SECTION, conf_sec): secs: List[Union[Path, str]] = self.serializer.parse_object( bs_config.get_option(self.SECTIONS_SECTION, conf_sec)) for sec in secs: if logger.isEnabledFor(logging.TRACE): logger.trace( f"populating section '{sec}', {bs_config.children}") conf_secs.add(sec) params = bs_config.populate({}, section=sec) self._create_configs(sec, params, bs_config) # allow the user to remove more sections after import if bs_config.has_option(self.CLEANUPS_NAME, conf_sec): cleanups: Sequence[str] = self.serializer.parse_object( bs_config.get_option(self.CLEANUPS_NAME, conf_sec)) conf_secs.update(cleanups) return conf_secs, bs_config.children def _load_imports(self, parser: ConfigParser): if logger.isEnabledFor(logging.DEBUG): logger.debug(f'importing {self._get_container_desc()}, ' + f'children={self.children}') csecs, children = self._get_children() overwrites: Set = set() # copy each configuration added to the bootstrap loader in the order we # added them. c: Configurable for c in children: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'loading configuration {c} -> {self}') par_secs: List[str] = parser.sections() sec: str # copy every section from the child to target our new parser for sec in c.sections: if logger.isEnabledFor(logging.TRACE): logger.trace(f'importing section {c}:[{sec}]') if sec not in par_secs: parser.add_section(sec) # assume everything is resolvable as this is the last step in # the loading of this instance try: opts = c.get_options(sec) except InterpolationMissingOptionError as e: msg = f'Could not populate {c}:[{sec}]: {e}' self._raise(msg, e) for k, v in opts.items(): key = f'{sec}:{k}' has = parser.has_option(sec, k) fv = self._format_option(k, v, sec) # overwrite the option/property when not yet set or its # already by overwriten by a previous child; however, don't # set it when its new per this instance's import iteration if not has or key in overwrites: if logger.isEnabledFor(logging.TRACE): logger.trace(f'overwriting {sec}:{k}: {v} -> {fv}') parser.set(sec, k, fv) overwrites.add(key) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'imported {len(children)} children to {self}') if self.exclude_config_sections: self._config_sections = csecs def _create_and_load_parser(self, parser: ConfigParser): if logger.isEnabledFor(logging.TRACE): logger.trace('creating and loading parser') super()._create_and_load_parser(parser) self._load_imports(parser) if hasattr(self, '_config_sections'): for sec in self._config_sections: parser.remove_section(sec) del self._config_sections del self.children return parser
[docs] def start_file_capture(self): self.__class__._VISITED_FILES = []
[docs] def stop_file_capture(self) -> List[Path]: files: List[Path] = self.__class__._VISITED_FILES self.__class__._VISITED_FILES = None return files