Source code for zensols.zotsite.db

"""Data access objects to the Zotero SQLite database.

"""
__author__ = 'Paul Landes'

from typing import Tuple, List, Iterable, Dict, Any, Union, Optional
from dataclasses import dataclass, field
import logging
import re
from pathlib import Path
from zensols.persist import persisted
from zensols.db import DbPersister
from . import (
    ZoteroApplicationError, ZoteroObject, Collection, Library, Item, Note, Name,
    CollectionVisitor, UnsortedWalker
)


logger = logging.getLogger(__name__)


[docs] @dataclass class ZoteroDatabase(object): """Provides access to data from the Zotero database. """ _persister: DbPersister = field() """The persister used to get the data.""" _data_dir: Path = field() """The path to the ``zotero.sqlite`` database file.""" _collection_like: str = field() """The SQL pattern to match against subcollection names.""" _library_id: int = field() """The DB ide of the library to export.""" @property def library_id(self) -> int: return self._library_id def _select(self, name: str, *params) -> Tuple[Dict[str, Any], ...]: return self._persister.execute_by_name( name=f'select_{name}', params=params, row_factory='dict') def _select_collections(self) -> Tuple[Dict[str, Any], ...]: """Return items from the database. :param conn: the DB connection """ colls: Dict[str, Any] = {} rows: Tuple[Dict[str, Any], ...] = \ self._select('collections', self._library_id, self._collection_like) row: Dict[str, Any] for row in rows: row['subs'] = [] colls[row['c_id']] = row for coll in colls.values(): c_pid = coll['c_pid'] if c_pid not in colls: coll['c_pid'] = None c_pid = None if c_pid: par = colls[c_pid] par['subs'].append(coll) return tuple(filter(lambda x: x['c_pid'] is None and x['c_id'], colls.values())) def _get_item_meta(self, item: Dict[str, Any]) -> Dict[str, Any]: """Return the item metadata from the database. :param item: the item to fetch data for """ meta: Dict[str, Any] = {} row: Dict[str, Any] for row in self._select('item_metadata', item['i_id']): meta[row['name']] = row['value'] return meta def _get_item_creators(self, item: Dict[str, Any]) -> \ Optional[Tuple[Name, ...]]: """Return the item metadata from the database. :param item: the item to fetch data for """ creators: List[Name] = [] row: Dict[str, Any] for row in self._select('item_creators', item['i_id']): name = Name(row['firstName'], row['lastName']) creators.append(name) if len(creators) > 0: return tuple(creators) def _select_items(self): """Return items from the database. :param conn: the DB connection """ items: Dict[str, Any] = {} item: Dict[str, Any] for item in self._select('items_attachments'): item['subs'] = [] if not item['i_pid'] and not item['c_pid']: item['i_pid'] = item['n_pid'] iid = item['i_id'] if iid in items: items[iid].append(item) else: items[iid] = [item] for itemlst in items.values(): for item in itemlst: meta = self._get_item_meta(item) item['meta'] = meta creators = self._get_item_creators(item) item['creators'] = creators for itemlst in items.values(): for item in itemlst: i_pid = item['i_pid'] if i_pid in items: for par in items[i_pid]: par['subs'].append(item) flst = [] for itemlst in items.values(): flst.extend(itemlst) return flst def _create_item(self, item: Dict[str, Any]) -> Union[Item, Note]: """Return a domain object that represents an item (i.e. PDF attachement, link, note etc). """ children = tuple(map(lambda x: self._create_item(x), item['subs'])) if item['type'] == 'note': item = Note(item) else: item = Item(item, children) return item def _create_collection(self, coll: Dict[str, Any], by_cid: Dict[str, int]): """Return a domain object that represents a Zotero DB (sub)collection. :param by_cid: parent to child collection IDs """ if logger.isEnabledFor(logging.DEBUG): logger.debug('processing: {} ({}, {})'. format(coll['c_name'], coll['c_id'], coll['c_iid'])) cid = coll['c_id'] items = [] if cid in by_cid: toadd = by_cid[cid] items.extend(toadd) logger.debug('children items: %d' % len(toadd)) children = list(map(lambda x: self._create_collection(x, by_cid), coll['subs'])) items = list(map(lambda x: self._create_item(x), items)) return Collection(coll, items, children) def _create_library(self, colls, items) -> Library: """Return a domain object that represents a Zotero DB (sub)collection. :param conn: the DB connection :param by_cid: parent to child collection IDs """ by_cid = {} for i in items: cid = i['c_id'] if cid: if cid in by_cid: cid_lst = by_cid[cid] else: cid_lst = [] by_cid[cid] = cid_lst cid_lst.append(i) fcolls = [] for coll in colls: fcoll = self._create_collection(coll, by_cid) fcolls.append(fcoll) return Library(self._data_dir, self._library_id, fcolls) @persisted('_library') def get_library(self) -> Library: """Get an object graph representing the data in the Zotero database. """ try: colls = self._select_collections() items = self._select_items() return self._create_library(colls, items) finally: # deallocate pooled connection (configured in ``obj.conf``) self._persister.conn_manager.dispose_all() def _get_items(self) -> Iterable[Item]: def filter_items(obj: ZoteroObject): return isinstance(obj, Item) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'collecting items for library: {self._library_id}') lib: Library = self.get_library() cv = CollectionVisitor(filter_items) walker = UnsortedWalker() walker.walk(lib, cv) return cv.collection @property def item_paths(self) -> Dict[str, Path]: """Get paths only for this library. :see: :obj:`paths` """ def find_child_path(i: Item) -> Path: paths = tuple(map(lambda c: c.path, filter( lambda i: isinstance(i, Item) and i.path is not None, item.children))) if len(paths) > 0: pdf_paths = tuple(filter(lambda p: p.suffix == '.pdf', paths)) if len(pdf_paths) == 1: return pdf_paths[0] else: return paths[0] paths: Dict[str, Path] = {} item: Item for item in filter(lambda i: len(i.children) > 0, self._get_items()): path = find_child_path(item) if path is not None: paths[item.sel['key']] = path return paths