Source code for wheezy.core.db

""" ``session`` module.
"""

import warnings

from wheezy.core.introspection import import_name
from wheezy.core.uuid import shrink_uuid

uuid4 = import_name('uuid.uuid4')

SESSION_STATUS_IDLE = 0
SESSION_STATUS_ENTERED = 1
SESSION_STATUS_ACTIVE = 2


class Session(object):
[docs] """ Session works with a pool of database connections. Database connection must be implemented per Database API Specification v2.0 (see `PEP0249 <http://www.python.org/dev/peps/pep-0249/>`_). """ __slots__ = ('pool', 'status', '__connection') def __init__(self, pool): """ Initialize a new instance of database session. The *pool* argument is an object that implement pooling interface (acquire/get_back). """ self.pool = pool self.status = SESSION_STATUS_IDLE self.__connection = None def __enter__(self): assert self.status == SESSION_STATUS_IDLE self.status = SESSION_STATUS_ENTERED return self @property def connection(self):
[docs] """ Return the session connection. Not intended to be used directly, use `cursor` method instead. """ if self.__connection: return self.__connection assert self.status == SESSION_STATUS_ENTERED self.__connection = connection = self.pool.acquire() self.status = SESSION_STATUS_ACTIVE self.on_active(connection) return connection def on_active(self, connection):
pass def cursor(self, *args, **kwargs):
[docs] """ Return a new cursor object using the session connection. """ return self.connection.cursor(*args, **kwargs) def commit(self):
[docs] """ Commit any pending transaction to the database. """ assert self.status != SESSION_STATUS_IDLE if self.status != SESSION_STATUS_ACTIVE: return self.status = SESSION_STATUS_ENTERED connection = self.__connection self.__connection = None try: connection.commit() finally: self.pool.get_back(connection) def __exit__(self, exc_type, exc_value, traceback):
self.status = SESSION_STATUS_IDLE connection = self.__connection if connection: self.__connection = None try: connection.rollback() finally: self.pool.get_back(connection) class TPCSession(object):
[docs] """ Two-Phase Commit protocol session that works with a pool of database connections. Database connection must be implemented per Database API Specification v2.0 (see `PEP0249 <http://www.python.org/dev/peps/pep-0249/>`_). """ __slots__ = ('format_id', 'global_transaction_id', 'branch_qualifier', 'enlised_sessions', 'status') def __init__(self, format_id=7, global_transaction_id=None, branch_qualifier=''): """ Initialize a new instance of Two-Phase Commit protocol database session. """ self.format_id = format_id self.global_transaction_id = global_transaction_id self.branch_qualifier = branch_qualifier self.enlised_sessions = [] self.status = SESSION_STATUS_IDLE def __enter__(self): assert self.status == SESSION_STATUS_IDLE assert not self.enlised_sessions self.status = SESSION_STATUS_ENTERED return self def enlist(self, session):
[docs] """ Begins a TPC transaction with the given session. """ assert session assert self.status != SESSION_STATUS_IDLE self.enlised_sessions.append(session) session.__enter__() c = session.connection xid = c.xid(self.format_id, self.global_transaction_id or shrink_uuid(uuid4()), self.branch_qualifier) c.tpc_begin(xid) self.status = SESSION_STATUS_ACTIVE def commit(self):
[docs] """ Commit any pending transaction to the database. """ assert self.status != SESSION_STATUS_IDLE if self.status != SESSION_STATUS_ACTIVE: return sessions = self.enlised_sessions connections = [s.connection for s in sessions if s.status == SESSION_STATUS_ACTIVE] for c in connections: c.tpc_prepare() for c in connections: c.tpc_commit() for s in sessions: s.__exit__(None, None, None) self.enlised_sessions = [] self.status = SESSION_STATUS_ENTERED def __exit__(self, exc_type, exc_value, traceback):
sessions = self.enlised_sessions self.status = SESSION_STATUS_IDLE self.enlised_sessions = [] for s in sessions: if s.status == SESSION_STATUS_ACTIVE: try: s.connection.tpc_rollback() except Exception: warnings.warn('An error occured while rolling back ' 'two phase transaction.') s.__exit__(exc_type, exc_value, traceback) class NullSession(object):
[docs] """ Null session is supposed to be used in mock scenarios. """ def __init__(self): self.status = SESSION_STATUS_IDLE def __enter__(self): assert self.status == SESSION_STATUS_IDLE self.status = SESSION_STATUS_ENTERED return self @property def connection(self): raise AssertionError('Not intended to be used directly. ' 'Use cursor() method instead.') def cursor(self, *args, **kwargs):
[docs] """ Ensure session is entered. """ assert self.status == SESSION_STATUS_ENTERED def commit(self):
[docs] """ Simulates commit. Asserts the session is used in scope. """ assert self.status != SESSION_STATUS_IDLE self.status = SESSION_STATUS_ENTERED def __exit__(self, exc_type, exc_value, traceback):
assert self.status == SESSION_STATUS_ENTERED self.status = SESSION_STATUS_IDLE class NullTPCSession(object):
[docs] """ Null TPC session is supposed to be used in mock scenarios. """ def __init__(self): self.status = SESSION_STATUS_IDLE def __enter__(self): assert self.status == SESSION_STATUS_IDLE self.status = SESSION_STATUS_ENTERED return self def enlist(self, session):
[docs] """ Ensure session is entered. """ assert session assert self.status != SESSION_STATUS_IDLE self.status = SESSION_STATUS_ACTIVE def commit(self):
[docs] """ Simulates commit. Asserts the session is used in scope. """ assert self.status != SESSION_STATUS_IDLE self.status = SESSION_STATUS_ENTERED def __exit__(self, exc_type, exc_value, traceback):
assert self.status != SESSION_STATUS_IDLE self.status = SESSION_STATUS_IDLE