Source code for zensols.zotsite.domain

"""Contains domain and visitor (GoF pattern) classes.

__author__ = 'Paul Landes'

from typing import Callable
import logging
from abc import ABC, abstractmethod
import re
from io import TextIOBase
from pathlib import Path
from zensols.cli import ApplicationError
from zensols.persist import persisted

logger = logging.getLogger(__name__)

[docs] class ZoteroApplicationError(ApplicationError): """Thrown for application errors meant to be reported by the command line. """ pass
[docs] class ZoteroObject(ABC): """Represents any collection, item etc. Zotero data object. """
[docs] def __init__(self, children): self._children = children
@property def children(self): return self._children
[docs] @abstractmethod def get_id(self): pass
@property def id(self): return self.get_id() def __str__(self): return '{} ({})'.format(self.__format_zobj__(), def __repr__(self): return self.__str__() def __format_zobj__(self): return @property def title(self): return @property def note(self): if hasattr(self, 'sel') and 'n_note' in self.sel: return self.sel['n_note']
[docs] def short_title(self, str_len): """Return the short name of this object.""" lstr = self.title return (lstr[:str_len] + '...') if len(lstr) > str_len else lstr
@property def type(self): """Return the type this item is.""" if hasattr(self, 'sel') and 'type' in self.sel: return self.sel['type']
[docs] class Note(ZoteroObject): """Represents a note Zotero data object. """
[docs] def __init__(self, sel): self.sel = sel super().__init__([])
[docs] def get_id(self): return 'n' + str(self.sel['i_id'])
@property def title(self): return self.sel['n_title'] @property def name(self): return '<{}> [note]'.format(self.title)
[docs] class Name(object):
[docs] def __init__(self, first: str, last: str): self.first = first self.last = last
def __str__(self): return f'{self.first} {self.last}' def __repr__(self): return self.__str__()
[docs] class Item(ZoteroObject): """Represents an attachement object, like PDFs, site links etc. """
[docs] def __init__(self, sel, children): self.sel = sel super().__init__(children) self.storage_pat = re.compile('^(?:storage|attachments):(.+)$')
[docs] def get_db_id(self): return self.sel['i_id']
[docs] def get_id(self): if not hasattr(self, '_id'): self._id = 'i' + str(self.get_db_id()) return self._id
[docs] def set_id(self, id): self._id = id
@property def name(self): meta = self.sel['meta'] name = 'none' for k in 'shortTitle title publicationTitle'.split(' '): if k in meta: name = meta[k] break return name @property def metadata(self): return self.sel.get('meta') @property def creators(self) -> (list, Name): return self.sel.get('creators') @property @persisted('_path') def path(self): abs_path = None path = self.sel['path'] if path is not None: m = self.storage_pat.match(path) if m is None: # assume ZoteroFile is used abs_path = Path(path) if not abs_path.exists(): raise ValueError(f'unknown storage and not a file: {path}') else: pdir = self.sel['key'] fpart = abs_path = self.lib.get_storage_path() / f'{pdir}/{fpart}' if logger.isEnabledFor(logging.DEBUG): logger.debug(f'pdir={pdir}, fpart={fpart}, abs={abs_path}') return abs_path def __format_zobj__(self): abs_path = self.path its = self.sel.copy() its.update({'name':, 'abs_path': abs_path}) return '{name} [{type}]{abs_path}'.format(**its)
[docs] class Container(ZoteroObject): """Container class holds items and sub-collections. """
[docs] def __init__(self, items, collections): self.items = items self.collections = collections super().__init__(None)
@property def children(self): ret = [] ret.extend(self.collections) ret.extend(self.items) return ret
[docs] class Collection(Container): """Represents a (sub)collection, which is a container for other collections and items. """
[docs] def __init__(self, sel, items, collections): self.sel = sel super().__init__(items, collections)
[docs] def get_id(self): return 'c{},i{}'.format(self.sel['c_id'], self.sel['c_iid'])
@property def name(self): return self.sel['c_name']
[docs] class Library(Container): """Represents the top level object that contains the root level collections. """
[docs] def __init__(self, data_dir, library_id, collections): self.data_dir = data_dir self.library_id = library_id self.storage_dirname = 'storage' super().__init__([], collections) for c in collections: self._init_child(c)
def _init_child(self, parent): if isinstance(parent, Item): parent.lib = self for c in parent.children: self._init_child(c)
[docs] def get_storage_path(self, fname=None): path = Path(self.data_dir, self.storage_dirname) if fname: path = Path(path, fname) return path
[docs] def get_id(self): return 'l' + str(self.library_id)
[docs] def attachment_resource(self, item): if item.type == 'attachment': return f'{self.storage_dirname}/{item.path}'
@property def name(self): return 'lib' @property def title(self): if self.library_id == 1: return 'Personal Library' else: return 'Library'
[docs] class Visitor(ABC): """The visitor in the GoF *visitor pattern*. """
[docs] @abstractmethod def enter_parent(self, parent: ZoteroObject): """Template method for traversing down/into a node.""" pass
[docs] @abstractmethod def visit_child(self, child: ZoteroObject): """Template method for visiting a node.""" pass
[docs] @abstractmethod def leave_parent(self, parent: ZoteroObject): """Template method for traversing up/out of a node.""" pass
[docs] class PrintVisitor(Visitor): """A visitor that prints items for debugging. """
[docs] def __init__(self, writer: TextIOBase): self.writer = writer self.depth = 0
[docs] def enter_parent(self, parent: ZoteroObject): self.writer.write(f"{' ' * (self.depth * 4)}{str(parent)} " + f'({parent.__class__.__name__})\n') self.depth += 1
[docs] def visit_child(self, child: ZoteroObject): pass
[docs] def leave_parent(self, parent: ZoteroObject): self.depth -= 1
[docs] class Walker(ABC): """Iterates the Zotero data and calls the visitor for each node. """
[docs] @abstractmethod def walk(self, parent: ZoteroObject, visitor: Visitor): """Recursively traverse the object graph.""" pass
[docs] class UnsortedWalker(Walker): """Iterates through the Zotero visiting children in whatever order is provided by the database. """
[docs] def walk(self, parent: ZoteroObject, visitor: Visitor): visitor.enter_parent(parent) for c in parent.children: visitor.visit_child(c) self.walk(c, visitor) visitor.leave_parent(parent)
[docs] class SortedWalker(Walker): """Iterates through the Zotero visiting children in sorted order. """
[docs] def __init__(self, key_fn: Callable = None, reverse: bool = False): """Initialize. :param key_fn: a function/callable used to sort the data that takes a single argument to access compared data, which defaults to :function:`str` :param reverse: whether or not to reverse the visited results """ if key_fn is None: self.key_fn = str else: self.key_fn = key_fn self.reverse = reverse
[docs] def walk(self, parent: ZoteroObject, visitor: Visitor): visitor.enter_parent(parent) kids = sorted(parent.children, key=self.key_fn, reverse=self.reverse) for c in kids: visitor.visit_child(c) self.walk(c, visitor) visitor.leave_parent(parent)