Source code for zensols.db.connpool
"""Connection pool.
"""
__author__ = 'Paul Landes'
from typing import List, Any
from dataclasses import dataclass, field
import logging
from .conn import AbstractDbPersister, ConnectionManager
logger = logging.getLogger(__name__)
[docs]
@dataclass
class PooledConnectionManager(ConnectionManager):
"""Pools database connections.
"""
delegate: ConnectionManager = field()
"""The delegate manager that controls the lifecycle of pooled connections.
"""
size: int = field(default=1)
"""The size of the pool."""
def __post_init__(self):
super().__post_init__()
self._pool: List[Any] = []
[docs]
def register_persister(self, persister: AbstractDbPersister):
super().register_persister(persister)
self.delegate.register_persister(persister)
@property
def is_empty(self) -> bool:
return len(self) == 0
@property
def is_full(self) -> bool:
return len(self) >= self.size
[docs]
def create(self) -> Any:
conn: Any
if self.is_empty:
conn = self.delegate.create()
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'creating connection: size now: {len(self)}')
else:
conn = self._pool.pop()
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'popped connection, size now: {len(self)}')
return conn
[docs]
def dispose(self, conn: Any):
if self.is_full:
self.delegate.dispose(conn)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'disposed connection: size now: {len(self)}')
else:
self._pool.append(conn)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'caching connection: size now: {len(self)}')
[docs]
def dispose_all(self):
"""Dispose of all connections."""
conn: Any
for conn in self._pool:
self.delegate.dispose(conn)
self._pool.clear()
[docs]
def drop(self) -> bool:
self.delegate.drop()
def __len__(self) -> int:
return len(self._pool)