"""Contains access to the Zotero SQLite database.
"""
__author__ = 'Paul Landes'
from dataclasses import dataclass, field
import logging
from pathlib import Path
import sqlite3
from zensols.zotsite.domain import Collection, Library, Item, Note, Name
logger = logging.getLogger(__name__)
[docs]
@dataclass
class DatabaseReader(object):
"""Database access to Zotero store.
"""
data_dir: Path = field()
"""Directory containing the Zotero DB files (sqlite and collections)."""
collection_like: str = field(default='%')
"""The SQL pattern to match against subcollection names."""
library_id: int = field(default=1)
"""The DB ide of the library to export."""
def _collection_sql(self, whparams):
"""Create an SQL string to get collections rows."""
return """
select c.collectionId c_id, ci.itemId c_iid,
c.parentCollectionId c_pid, c.collectionName c_name
from collections c
left join collectionItems ci on c.collectionId = ci.collectionId
where c.libraryId = %(library_id)s and
c.collectionName like '%(coll_name)s'
""" % whparams
def _item_sql(self, whparams):
"""Create an SQL string to get items (attachments) rows."""
return """
select c.collectionId c_id, c.parentCollectionId c_pid,
c.collectionName c_name,
it.itemId i_id, ia.parentItemId i_pid, it.key, iy.typeName type,
ia.contentType content_type, ia.path,
itn.title n_title, itn.note n_note, itn.parentItemId n_pid
from items it, itemTypes iy
left join itemAttachments ia on it.itemId = ia.itemId
left join collectionItems ci on ci.itemId = it.itemId
left join collections c on c.collectionId = ci.collectionId
left join itemNotes itn on it.itemId = itn.itemId
where it.itemTypeId = iy.itemTypeId and
it.itemId not in (select itemId from deletedItems)
order by ci.orderIndex;
""" % whparams
def _item_meta_sql(self, whparams):
"""Create an SQL string to get items metadata rows."""
return """
select f.fieldName name, iv.value
from items i, itemTypes it, itemData id, itemDataValues iv, fields f
where i.itemTypeId = it.itemTypeId and
i.itemId = id.itemId and
id.valueId = iv.valueId and
id.fieldId = f.fieldId and
i.itemId = %(item_id)s and
i.itemId not in (select itemId from deletedItems)""" % whparams
def _item_creators_sql(self, whparams):
"""Return SQL for creators (authors) across several items"""
return """
select c.firstName, c.lastName
from itemCreators ic, creators c
where ic.creatorID = c.creatorID and
ic.itemID = %(item_id)s
order by ic.orderIndex""" % whparams
[docs]
def get_connection(self):
"""Return a database connection the SQLite database.
"""
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
db_file = Path(self.data_dir, 'zotero.sqlite')
logger.info(f'reading SQLite file: {db_file}')
if not db_file.exists():
raise OSError(f'no such data file: {db_file}')
conn = sqlite3.connect(db_file)
conn.row_factory = dict_factory
return conn
def _get_item_meta(self, item, conn, whparams):
"""Return the item metadata from the database.
:param item: the item to fetch data for
:param conn: the DB connection
:param whparams: dict of parameters used for the metadata SQL query
"""
whparams['item_id'] = item['i_id']
meta = {}
for row in conn.execute(self._item_meta_sql(whparams)):
meta[row['name']] = row['value']
return meta
def _get_item_creators(self, item, conn, whparams):
"""Return the item metadata from the database.
:param item: the item to fetch data for
:param conn: the DB connection
:param whparams: dict of parameters used for the metadata SQL query
"""
whparams['item_id'] = item['i_id']
creators = []
for row in conn.execute(self._item_creators_sql(whparams)):
name = Name(row['firstName'], row['lastName'])
creators.append(name)
if len(creators) > 0:
return creators
def _select_items(self, conn):
"""Return items from the database.
:param conn: the DB connection
"""
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'data_dir: {self.data_dir}')
wparams = {'library_id': self.library_id}
if logger.isEnabledFor(logging.DEBUG):
logger.debug('wparams: %s' % wparams)
items = {}
for item in conn.execute(self._item_sql(wparams)):
item['subs'] = []
if not item['i_pid'] and not item['c_pid']:
item['i_pid'] = item['n_pid']
iid = item['i_id']
if iid in items:
items[iid].append(item)
else:
items[iid] = [item]
for itemlst in items.values():
for item in itemlst:
meta = self._get_item_meta(item, conn, wparams)
item['meta'] = meta
creators = self._get_item_creators(item, conn, wparams)
item['creators'] = creators
for itemlst in items.values():
for item in itemlst:
i_pid = item['i_pid']
if i_pid in items:
for par in items[i_pid]:
par['subs'].append(item)
flst = []
for itemlst in items.values():
flst.extend(itemlst)
return flst
def _select_collections(self, conn):
"""Return items from the database.
:param conn: the DB connection
"""
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'data_dir: {self.data_dir} ' +
f'pattern: {self.collection_like}')
wparams = {'library_id': self.library_id,
'coll_name': self.collection_like}
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'wparams: {wparams}')
colls = {}
for row in conn.execute(self._collection_sql(wparams)):
row['subs'] = []
colls[row['c_id']] = row
for coll in colls.values():
c_pid = coll['c_pid']
if c_pid not in colls:
coll['c_pid'] = None
c_pid = None
if c_pid:
par = colls[c_pid]
par['subs'].append(coll)
return list(filter(lambda x: x['c_pid'] is None and x['c_id'],
colls.values()))
def _create_item(self, item):
"""Return a domain object that represents an item (i.e. PDF attachement,
link, note etc).
"""
children = list(map(lambda x: self._create_item(x), item['subs']))
if item['type'] == 'note':
item = Note(item)
else:
item = Item(item, children)
return item
def _create_collection(self, coll, by_cid):
"""Return a domain object that represents a Zotero DB (sub)collection.
:param conn: the DB connection
:param by_cid: parent to child collection IDs
"""
if logger.isEnabledFor(logging.DEBUG):
logger.debug('processing: {} ({}, {})'.
format(coll['c_name'], coll['c_id'], coll['c_iid']))
cid = coll['c_id']
items = []
if cid in by_cid:
toadd = by_cid[cid]
items.extend(toadd)
logger.debug('children items: %d' % len(toadd))
children = list(map(lambda x: self._create_collection(x, by_cid),
coll['subs']))
items = list(map(lambda x: self._create_item(x), items))
return Collection(coll, items, children)
def _create_library(self, colls, items) -> Library:
"""Return a domain object that represents a Zotero DB (sub)collection.
:param conn: the DB connection
:param by_cid: parent to child collection IDs
"""
by_cid = {}
for i in items:
cid = i['c_id']
if cid:
if cid in by_cid:
cid_lst = by_cid[cid]
else:
cid_lst = []
by_cid[cid] = cid_lst
cid_lst.append(i)
fcolls = []
for coll in colls:
fcoll = self._create_collection(coll, by_cid)
fcolls.append(fcoll)
return Library(self.data_dir, self.library_id, fcolls)
[docs]
def get_library(self) -> Library:
"""Get an object graph representing the data in the Zotero database.
"""
conn = self.get_connection()
try:
colls = self._select_collections(conn)
items = self._select_items(conn)
lib = self._create_library(colls, items)
finally:
conn.close()
return lib