Source code for zensols.install.resource

"""Installable resource.

"""

from __future__ import annotations
__author__ = 'Paul Landes'
from typing import Sequence, Union, ClassVar
from dataclasses import dataclass, field
from abc import ABCMeta, abstractmethod
import logging
import re
import urllib
import shutil
from pathlib import Path
from urllib.parse import ParseResult
import patoolib
from zensols.config import Dictable
from . import InstallError

logger = logging.getLogger(__name__)


[docs] @dataclass class FileSystemUpdateContext(Dictable): """The context given to a :class:`.FileSystemUpdate`. """ resource: Resource = field() """The :class:`.Resource` that created this context and updating the file system. """ check_path: Path = field() """The installer relative :obj:`.Resource.check_path`.""" target: Path = field() """The installer relative target path from :class:`.Resource`."""
[docs] @dataclass class FileSystemUpdate(Dictable, metaclass=ABCMeta): """A command (GoF pattern) to udpate the file system after a resource has decompressed a file. First experiment with :class:`.ListUpdate`, then find the corresponding command with :obj:`dry_run` turned on, then turn it off once you've validated its doing the right thing. Path fields (i.e. :obj:`.ListUpdate.path`) are formatted with the dictionary version of :class:`.FileSystemUpdateContext` and also a ``target`` property with the uncompressed path. """ dry_run: bool = field() """If ``True`` don't do anything, just act like it.""" def _format_str(self, context: FileSystemUpdateContext, val: str) -> str: return val.format(**context.asdict()) def _format_path(self, context: FileSystemUpdateContext, attr: str, make_path: bool = True) -> \ Union[Path, str]: val: str = getattr(self, attr) path_str: str = self._format_str(context, val) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'{attr}({val}) -> {path_str}') return Path(path_str) if make_path else path_str
[docs] @abstractmethod def invoke(self, context: FileSystemUpdateContext): pass
[docs] @dataclass class ListUpdate(FileSystemUpdate): """Lists the contents of :obj:`path`. """ path: str = field() """A file or directory to list."""
[docs] def invoke(self, context: FileSystemUpdateContext): pdir: Path = self._format_path(context, 'path') if logger.isEnabledFor(logging.INFO): logger.info(f'listing {pdir}') for path in pdir.iterdir(): logger.info(f'list: {path.absolute()}')
[docs] @dataclass class MoveUpdate(FileSystemUpdate): """Move file(s) from :obj:`source` to :obj:`target`. """ source: str = field() """The source glob (i.e. ``{target}/../*)`` from where to move the files.""" target: str = field() """The target directory where the files end up."""
[docs] def invoke(self, context: FileSystemUpdateContext): source: str = self._format_path(context, 'source').resolve().absolute() target: Path = self._format_path(context, 'target').resolve().absolute() source: Path if logger.isEnabledFor(logging.INFO): logger.info(f'moving {source} -> {target}') for source in source.parent.glob(source.name): source = source.resolve().absolute() if logger.isEnabledFor(logging.INFO): logger.info(f'renaming {source} -> {target}') if not self.dry_run: shutil.move(source, target)
[docs] @dataclass class RemoveUpdate(FileSystemUpdate): """Remove/clean up files to help keep the file system "clean". """ paths: Sequence[str] = field() """The list of path formatted files to remove. For example ``{target}/../__MACOSX``. """ def _rm_path(self, path: Path): if path.is_dir(): if logger.isEnabledFor(logging.INFO): logger.info(f'removing clean up dir: {path}') if not self.dry_run: shutil.rmtree(path) elif path.is_file(): if logger.isEnabledFor(logging.INFO): logger.info(f'removing clean up file: {path}') if not self.dry_run: path.unlink() elif logger.isEnabledFor(logging.INFO): logger.info(f'skipping non-existant clean up dir: {path}')
[docs] def invoke(self, context: FileSystemUpdateContext): for path_str in self.paths: path = Path(self._format_str(context, path_str)) path = path.resolve().absolute() self._rm_path(path)
[docs] @dataclass class Resource(Dictable): """A resource that is installed by downloading from the Internet and then optionally uncompressed. Once the file is downloaded, it is only uncompressed if it is an archive file. This is determined by the file extension. """ _DICTABLE_ATTRIBUTES: ClassVar[List[str]] = \ 'remote_name is_compressed compressed_name'.split() _FILE_REGEX: ClassVar[re.Pattern] = re.compile( r'^(.+)\.(tar\.gz|tgz|tar\.bz2|gz|bz2|' + '|'.join(patoolib.ArchiveFormats) + ')$') _NO_FILE_REGEX: ClassVar[re.Pattern] = re.compile(r'^(?:.+/)?(.+?)\.(.+)?$') url: str = field() """The URL that locates the file to install.""" name: str = field(default=None) """Used for local file naming.""" remote_name: str = field(default=None) """The name of extracted file (or root directory if a compressed file) after being downloaded. If this isn't set, it is taken from the file name portion of the path of the URL. """ is_compressed: bool = field(default=None) """Whether or not the file is compressed. If this isn't set, it is derived from the file name. """ rename: bool = field(default=True) """If ``True`` then rename the directory to the :obj:`name`.""" check_path: str = field(default=None) """The file to check for existance before doing uncompressing.""" sub_path: Path = field(default=None) """The path to a file in the compressed file after it is extracted. This is only used to obtain the file name in :meth:`get_file_name` when used to locate the uncompressed resource file. """ clean_up: bool = field(default=True) """Whether or not to remove the downloaded compressed after finished.""" updates: Sequence[FileSystemUpdate] = field(default=()) """The file system updates to apply after the file has been decompressed.""" def __post_init__(self): url: ParseResult = urllib.parse.urlparse(self.url) remote_path: Path = Path(url.path) remote_name: str m = self._FILE_REGEX.match(remote_path.name) if m is None: m = self._NO_FILE_REGEX.match(remote_path.name) self._extension = None if m is None: remote_name = remote_path.name else: remote_name = m.group(1) if self.name is None: self.name = remote_path.name else: remote_name, self._extension = m.groups() if self.name is None: self.name = remote_name if self.remote_name is None: self.remote_name = remote_name if self.is_compressed is None: self.is_compressed = self._extension is not None
[docs] def uncompress(self, path: Path = None, out_dir: Path = None) -> bool: """Uncompress the file. :param path: the file to uncompress :param out_dir: where the uncompressed files are extracted """ uncompressed = False if path is None: src = Path(self.compressed_name) out_dir = Path('.') else: src = path if out_dir is None: out_dir = path.parent # the target is the name we want after the process completes target: Path = out_dir / self.name # this is the name of the resulting file of what we expect, or the user # can override it if they know what the real resulting file is if self.check_path is None: check_path = target else: check_path = out_dir / self.check_path if logger.isEnabledFor(logging.DEBUG): logger.debug(f'check path: {check_path}') # uncompress if we can't find where the output is suppose to go if not check_path.exists(): if logger.isEnabledFor(logging.INFO): logger.info(f'uncompressing {src} to {out_dir}') out_dir.mkdir(parents=True, exist_ok=True) patoolib.extract_archive(str(src), outdir=str(out_dir)) uncompressed = True if logger.isEnabledFor(logging.DEBUG): logger.debug(f'rename: {self.rename}, ' + f'path ({check_path}) exists: {check_path.exists()}') # the extracted data can either be a file (gz/bz2) or a directory; # compare to what we want to rename the target directory # # note: the check path has to be what extracts as, otherwise it it will # unextract it again next time it checks; if the directory extracts as # something other than the file name, set both the name and the check # path to whatever that path is if self.rename and not check_path.exists(): # the source is where it was extracted extracted: Path = out_dir / self.remote_name if not extracted.exists(): raise InstallError(f'Trying to create {check_path} but ' + f'missing extracted path: {extracted}') if logger.isEnabledFor(logging.INFO): logger.info(f'renaming {extracted} to {target}') extracted.rename(target) if self.clean_up: if logger.isEnabledFor(logging.INFO): logger.info(f'cleaning up downloaded file: {src}') src.unlink() update_context = FileSystemUpdateContext(self, check_path, target) update: FileSystemUpdate for update in self.updates: logger.info(f'updating: {update}') update.invoke(update_context) return uncompressed
@property def compressed_name(self) -> str: """The file name with the extension and used to uncompress. If the resource isn't compressed, just the name is returned. """ if self.is_compressed: name = f'{self.name}' if self._extension is not None: name = f'{name}.{self._extension}' else: name = self.name return name
[docs] def get_file_name(self, compressed: bool = False) -> str: """Return the path where a resource is installed. :param compressed: if ``True``, return the path where its compressed file (if any) lives :return: the path of the resource """ fname = self.compressed_name if compressed else self.name if fname is None: fname = self.remote_name if not compressed and self.sub_path is not None: fname = str(Path(fname, self.sub_path)) return fname
def __str__(self) -> str: return f'{self.name}: {self.url}'