Source code for zensols.grsync.mover

import logging
import itertools as it
import shutil
from pathlib import Path
from zensols.grsync import (
    Distribution,
    RepoSpec,
    FileEntry,
    FrozenRepo,
)

logger = logging.getLogger(__name__)


[docs] class DistributionMover(object): """This class moves thawed files that are defined in a distribution zip. If the file is not defined in the distribution then it doesn't move it. In situations where you've already deleted the distribution zip, you'll have to create a new distribution by freezing what you have. For this reason it is recommended that you always include the original `grsync.yml` configuration file in your distribution so it *migrates* along with each of your freeze/thaw iterations. """
[docs] def __init__(self, dist: Distribution, target_dir=None, destination_dir: Path = None, force_repo=False, force_dirs=False, dry_run=False): """Initialize. :param dist: the distrbution that represent the distribution zip :param target_dir: the directory with the thawed files :param destination_dir: where the thawed files/repos will be moved :param force_repo: if ``True`` move repositories even if they're dirty :param force_dirs: if ``True`` move directories even if they're not empty :param dry_run: don't actually do anything, but log like we are """ self.dist = dist self.target_dir = target_dir if destination_dir is None: destination_dir = Path('old_dist').absolute() self.destination_dir = destination_dir self.force_repo = force_repo self.force_dirs = force_dirs self.dry_run = dry_run
def _get_paths(self): dist = self.dist objs = (dist.links, dist.repos, dist.files, dist.empty_dirs) paths = it.chain(map(lambda x: (x.path, x), it.chain(*objs)), map(lambda l: (l.path, l), it.chain(*map(lambda r: r.links, dist.repos)))) return sorted(paths, key=lambda x: len(x[0].parts), reverse=True) def _dir_empty(self, path): return sum(map(lambda x: 1, path.iterdir())) == 0 def _get_moves(self): for src, obj in self._get_paths(): if not src.exists() and not src.is_symlink(): logger.warning(f'no longer exists: {src}') else: if isinstance(obj, FrozenRepo): try: grepo = obj.repo_spec.repo except Exception: # git.exc.InvalidGitRepositoryError logger.error(f'invalid repository: {obj}--skipping') continue if grepo.is_dirty(): name = obj.repo_spec.format(RepoSpec.SHORT_FORMAT) if self.force_repo: logger.warning(f'repo is dirty: {name}; moving anyway') else: logger.warning(f'repo is dirty: {name}--skipping') continue elif isinstance(obj, FileEntry) and src.is_dir() and not src.is_symlink(): if not self._dir_empty(src): if self.force_dirs: logger.warning(f'directory not empty: {src}; ' + 'moving anyway') else: logger.warning(f'directory not empty: {src}--skipping') continue dst = self.destination_dir / src.relative_to(self.target_dir) yield (src, dst.absolute())
[docs] def move(self): "Move the files over." logger.info(f'moving installed distribution to {self.destination_dir}') for src, dst in self._get_moves(): logger.info(f'move {src} -> {dst}') if not self.dry_run: if src.exists() or src.is_symlink(): dst.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(src), str(dst)) else: logger.warning(f'no longer exists: {src}')
[docs] def dir_reduce(self, parent=None): "Remove empty directories recursively starting at ``parent``." try: if parent is None: parent = self.target_dir for child in parent.iterdir(): logger.debug(f'descending: {child}') if child.is_dir() and not child.is_symlink(): self.dir_reduce(child) if parent != self.target_dir and parent.is_dir(): if self._dir_empty(parent): logger.info(f'deleting empty directory: {parent}') if not self.dry_run: parent.rmdir() else: logger.info(f'skipping non-empty directory delete: {parent}') except Exception as e: # be robust logger.error(f"couldn't delete {parent}: {e}")