Source code for zensols.config.iniconfig

"""Implementation classes that are used as application configuration containers
parsed from files.

"""
__author__ = 'Paul Landes'

from typing import Set, Dict, List, Union
from abc import ABCMeta, abstractmethod
import logging
import os
from io import TextIOBase, StringIO
from pathlib import Path
from copy import deepcopy
from configparser import ConfigParser, ExtendedInterpolation
from ..persist.domain import Primeable
from . import ConfigurableFileNotFoundError, ConfigurableError, Configurable

logger = logging.getLogger(__name__)


[docs] class IniConfig(Configurable, Primeable): """Application configuration utility. This reads from a configuration and returns sets or subsets of options. """
[docs] def __init__(self, config_file: Union[Path, TextIOBase] = None, default_section: str = None, use_interpolation: bool = False): """Create with a configuration file path. :param config_file: the configuration file path to read from; if the type is an instance of :class:`io.TextIOBase`, then read it as a file object :param default_section: default section (defaults to `default`) :param use_interpolation: if ``True``, interpolate variables using :class:`~configparser.ExtendedInterpolation` :param robust: if `True`, then don't raise an error when the configuration file is missing """ super().__init__(default_section) if isinstance(config_file, str): self.config_file = Path(config_file).expanduser() else: self.config_file = config_file self.use_interpolation = use_interpolation self.nascent = deepcopy(self.__dict__) self._cached_sections = {} self._raw = False self._conf = None
def _create_config_parser(self) -> ConfigParser: "Factory method to create the ConfigParser." if self.use_interpolation: parser = ConfigParser(interpolation=ExtendedInterpolation()) else: parser = ConfigParser() return parser def _read_config_content(self, cpath: Path, parser: ConfigParser): if cpath.is_file(): with open(cpath) as f: parser.read_file(f) elif cpath.is_dir(): writer = StringIO() for fpath in cpath.iterdir(): if fpath.is_file(): with open(fpath) as f: writer.write(f.read()) writer.write('\n') writer.seek(0) parser.read_file(writer) def _create_and_load_parser_from_file(self, cpath: Path, parser: ConfigParser): if logger.isEnabledFor(logging.INFO): logger.info(f'{self.__class__.__name__}: loading config: {cpath}') if not cpath.exists(): raise ConfigurableFileNotFoundError(cpath) elif cpath.is_file() or cpath.is_dir(): self._read_config_content(cpath, parser) else: raise ConfigurableError(f'Unknown file type: {cpath}') return parser def _create_and_load_parser(self, parser: ConfigParser): if isinstance(self.config_file, (str, Path)): self._create_and_load_parser_from_file(self.config_file, parser) elif isinstance(self.config_file, TextIOBase): writer = self.config_file writer.seek(0) parser.read_file(writer) writer.seek(0) elif isinstance(self.config_file, Configurable): is_ini = isinstance(self.config_file, IniConfig) src: Configurable = self.config_file sec: str = None if is_ini: self.config_file._raw = True try: for sec in src.sections: parser.add_section(sec) for k, v in src.get_options(sec).items(): parser.set(sec, k, v) finally: if is_ini: self.config_file._raw = False elif self.config_file is None: pass else: raise ConfigurableError( f'Unknown create type: {type(self.config_file)}') @property def parser(self) -> ConfigParser: """Load the configuration file. """ if self._conf is None: parser: ConfigParser = self._create_config_parser() self._create_and_load_parser(parser) self._conf = parser return self._conf
[docs] def reload(self): self._conf = None
[docs] def has_option(self, name: str, section: str = None) -> bool: section = self.default_section if section is None else section conf = self.parser if conf.has_section(section): return conf.has_option(section, name) else: return False
[docs] def get_options(self, section: str = None) -> Dict[str, str]: opts = None section = self.default_section if section is None else section conf: ConfigParser = self.parser if conf is None: if not self.robust: raise self._raise('No configuration given') elif conf.has_section(section): opts = dict(conf.items(section, raw=self._raw)) if opts is None: self._raise(f"No section: '{section}'") return opts
[docs] def get_option(self, name: str, section: str = None) -> str: opt = None section = self.default_section if section is None else section conf: ConfigParser = self.parser if conf is None: if not self.robust: self._raise('No configuration given') elif conf.has_option(section, name): opt = conf.get(section, name, raw=self._raw) if opt is None: if not conf.has_section(section): self._raise(f"No section: '{section}'") self._raise(f"No option: '{section}:{name}'") return opt
@property def sections(self) -> Set[str]: """All sections of the INI file. """ return frozenset(self.parser.sections() or ()) def _format_option(self, name: str, value: str, section: str) -> str: try: value = self.serializer.format_option(value) except TypeError as e: raise ConfigurableError( f'Can not serialize {section}:{name}: {e}') from e return value
[docs] def set_option(self, name: str, value: str, section: str = None): section = self.default_section if section is None else section value = self._format_option(name, value, section) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'setting option {name}: {section}:{value}') if not self.parser.has_section(section): self.parser.add_section(section) try: self.parser.set(section, name, value) except Exception as e: raise ConfigurableError( f'Cannot set {section}:{name} = {value}: {e}') from e
[docs] def remove_section(self, section: str): self.parser.remove_section(section)
[docs] def get_raw_str(self) -> str: """"Return the contents of the configuration parser with no interpolated values. """ sio = StringIO() self.parser.write(sio) return sio.getvalue()
[docs] def derive_from_resource(self, path: str, copy_sections=()) -> \ Configurable: """Derive a new configuration from the resource file name ``path``. :param path: a resource file (i.e. ``resources/app.conf``) :param copy_sections: a list of sections to copy from this to the derived configuration """ kwargs = deepcopy(self.nascent) kwargs['config_file'] = path conf = self.__class__(**kwargs) self.copy_sections(conf, copy_sections) return conf
[docs] def prime(self): self.parser
def _get_container_desc(self, include_type: bool = True, max_path_len: int = 3) -> str: mod = '' if isinstance(self.config_file, (str, Path)): parts = self.config_file.parts path = Path(*parts[max(0, len(parts) - max_path_len):]) tpe = 'f=' if include_type else '' mod = f'{tpe}{path}' elif isinstance(self.config_file, Configurable): tpe = 'c=' if include_type else '' mod = f'{tpe}[{self.config_file}]' return mod def _get_section_short_str(self) -> str: if self._conf is None: # getting sections invokes parsing, which causes issues if used in # a debugging statement when we're not yet ready to parse return '' secs = tuple(self.parser.sections()) if len(secs) > 0: return secs[0] return '' def _get_short_str(self) -> str: sec: str = self._get_section_short_str() cname: str = self.__class__.__name__ return f'{cname}({self._get_container_desc()}){{{sec}}}'
[docs] class rawconfig(object): """Treat all option fetching on ``config`` as raw, or without interpolation. This is usually used when ``config`` is the target of section copying with :meth:`.Configuration.copy_sections`, """
[docs] def __init__(self, config: Configurable): self.config = config if isinstance(config, IniConfig) else None
def __enter__(self): if self.config is not None: self.config._raw = True def __exit__(self, type, value, traceback): if self.config is not None: self.config._raw = False
[docs] class ExtendedInterpolationConfig(IniConfig): """Configuration class extends using advanced interpolation with :class:`~configparser.ExtendedInterpolation`. """
[docs] def __init__(self, *args, **kwargs): kwargs['use_interpolation'] = True super().__init__(*args, **kwargs)
[docs] class ExtendedInterpolationEnvConfig(ExtendedInterpolationConfig): """An :class:`.IniConfig` implementation that creates a section called ``env`` with environment variables passed. """
[docs] def __init__(self, *args, remove_vars: List[str] = None, env: dict = None, env_sec: str = 'env', **kwargs): self.remove_vars = remove_vars if env is None: env = {} for k, v in os.environ.items(): env[k] = v.replace('$', '$$') self.env = env else: self.env = env self.env_sec = env_sec super().__init__(*args, **kwargs)
def _munge_default_vars(self, vars): if vars is not None and self.remove_vars is not None: for n in self.remove_vars: if n in vars: del vars[n] return vars def _create_config_parser(self) -> ConfigParser: parser = super()._create_config_parser() sec = self.env_sec parser.add_section(sec) for k, v in self.env.items(): logger.debug(f'adding env section {sec}: {k} -> {v}') v = self._format_option(k, v, sec) parser.set(sec, k, v) return parser
[docs] class CommandLineConfig(IniConfig, metaclass=ABCMeta): """A configuration object that allows creation by using command line arguments as defaults when the configuration file is missing. Sub classes must implement the ``set_defaults`` method. All defaults set in this method are then created in the default section of the configuration when created with the static method ``from_args``, which is called with the parsed command line arguments (usually from some instance or instance of subclass :class:`.SimpleActionCli`. """
[docs] def set_default(self, name: str, value: str, clobber: bool = None): """Set a default value in the ``default`` section of the configuration. """ if clobber is not None: self.set_option(name, clobber, self.default_section) elif name not in self.options and value is not None: self.set_option(name, value, self.default_section)
[docs] @abstractmethod def set_defaults(self, *args, **kwargs): pass
[docs] @classmethod def from_args(cls, config=None, *args, **kwargs): if config is None: self = cls() self._conf = self._create_config_parser() self.parser.add_section(self.default_section) else: self = config self.set_defaults(*args, **kwargs) return self