"""
DB-API style interface to SQLite3, based on APSW "Another Python
Sqlite Wrapper". Makes it possible to easily switch between the
standard Python sqlite3 module (Pysqlite) and APSW. Advantages
are stricter transactions, increased speed, apswtrace-ability and
customization in Python.
Mandatory DB-API attributes NOT included:
BINARY
DATETIME
Date
DateFromTicks
NUMBER
ROWID
STRING
Time
TimeFromTicks
Timestamp
TimestampFromTicks
Connection.*Error (references to module exceptions)
Pysqlite attributes not included:
Cache
OptimizedUnicode
PARSE_COLNAMES
PARSE_DECLTYPES
PrepareProtocol
SQLITE_* (SQLite3 constants)
Statement
adapt
adapters
converters
datetime
enable_callback_tracebacks
register_adapter
register_converter
sqlite_version_info
time
version_info
Connection.iterdump (but it knows dump)
Connection.text_factory
Operational differences:
Connection.rollback fails if there are unfinished cursors.
Connection.rowcount is undefined after non-DML queries.
Missing bind variables are not detected if supplied in a dict
APSW issue 126:
APSW can be directly referred via the attributes:
apsw
Connection.* (from super class)
Cursor.apswcursor
"""
import sys
import apsw
def connect (fname="", **kwargs):
return Connection (fname=fname, **kwargs)
apilevel = "2.0"
threadsafety = 3
paramstyle = "qmark"
Binary = sys.version < "3" and buffer or memoryview
class Warning(Exception): pass
class Error(Exception): pass
class InterfaceError(Error): pass
class DatabaseError(Error): pass
class DataError(DatabaseError): pass
class OperationalError(DatabaseError): pass
class IntegrityError(DatabaseError): pass
class InternalError(DatabaseError): pass
class ProgrammingError(DatabaseError): pass
class NotSupportedError(DatabaseError): pass
version = apsw.apswversion ()
sqlite_version = apsw.sqlitelibversion ()
class Connection (apsw.Connection, object):
"""
Sub-class of APSW Connection.
"""
def __init__ (self, fname="", **kwargs):
busytimeout = 5000
for k in tuple (kwargs):
if k == "isolation_level":
self.isolation_level = kwargs.pop (k)
elif k == "timeout":
busytimeout = int (1000 * kwargs.pop (k))
elif k == "cached_statements":
kwargs ["statementcachesize"] = kwargs.pop (k)
elif k == "check_same_thread":
if kwargs.pop (k):
raise NotSupportedError (k + " can not be True")
else:
pass
try: apsw.Connection.__init__ (self, fname, **kwargs)
except apsw.Error as e: raise _maperr (e)
apsw.Connection.setbusytimeout (self, busytimeout)
def close (self):
try: apsw.Connection.close (self)
except apsw.Error as e: raise _maperr (e)
def commit (self):
if self.in_transaction:
self.execute ("COMMIT")
def rollback (self):
"""
Unlike Pysqlite, this module does not try to reset unfinished
cursors and one must be prepared for:
'cannot rollback transaction - SQL statements in progress'.
Well, Pysqlite does not succeed here either if a cursor is
in use by another thread.
"""
if self.in_transaction:
self.execute ("ROLLBACK")
def cursor (self):
return Cursor (self)
row_factory = None
isolation_level = ""
total_changes = property (lambda self: self.totalchanges ())
def create_function (self, name, numargs, callable):
try: self.createscalarfunction (name, callable, numargs)
except apsw.Error as e: raise _maperr (e)
def create_aggregate (self, name, numargs, aggcls):
try: self.createaggregatefunction (
name,
lambda: (aggcls (), aggcls.step, aggcls.finalize),
numargs)
except apsw.Error as e: raise _maperr (e)
def create_collation (self, *args, **kwargs):
try: self.createcollation (*args, **kwargs)
except apsw.Error as e: raise _maperr (e)
def set_authorizer (self, callable):
try: self.setauthorizer (callable)
except apsw.Error as e: raise _maperr (e)
def set_progress_handler(self, callable, nsteps=20):
try: self.setprogresshandler (callable, nsteps)
except apsw.Error as e: raise _maperr (e)
def enable_load_extension (self, b):
try: self.enableloadextension (b)
except apsw.Error as e: raise _maperr (e)
def load_extension (self, filename, *args):
try: self.loadextension (filename, *args)
except apsw.Error as e: raise _maperr (e)
def _get_in_transaction (self):
try: return not self.getautocommit()
except apsw.Error as e: raise _maperr (e)
in_transaction = property (_get_in_transaction)
def interrupt (self):
try: apsw.Connection.interrupt (self)
except apsw.Error as e: raise _maperr (e)
def execute (self, *args):
return Cursor (self).execute (*args)
executescript = execute
def executemany (self, *args):
return Cursor (self).executemany (*args)
def dump (self, stdout=None):
" alternative for Pysqlite iterdump "
apsw.Shell (db=self, stdout=stdout).process_command (".dump")
def begin (self):
if not self.in_transaction:
self.execute ("BEGIN")
class Cursor (object):
"""
Adapts APSW cursor.
The fetch* methods do not raise any error if the previous call
to .execute*() did not produce any result set or no call was
issued yet. This is opposed to the DB-API specification but
conform Pysqlite.
"""
def __init__ (self, connection):
self.connection = connection
try: self.apswcursor = apsw.Connection.cursor (connection)
except apsw.Error as e: raise _maperr (e)
if connection.row_factory:
self.row_factory = connection.row_factory
description = None
rowcount = property (
lambda self: Connection.changes (self.connection))
def close (self):
try: self.apswcursor.close (True)
except apsw.Error as e: raise _maperr (e)
def execute (self, operation, parameters=(), many=None):
"""
Injects a 'begin' when needed for the isolation level.
Note that the mechanism is blind for statements that begin
with a comment (same as in Pysqlyte) and for statements in
the middle of a script. A perfect solution could involve an
authorizer function. But that is only invoked once and not
when a statement is reused from the APSW statement cache.
"""
con = self.connection
try:
if con.isolation_level is not None and \
operation.lstrip () [:3].lower () != "sel" and \
con.getautocommit () and \
operation.lstrip () [:3].lower () in (
"cre", "del", "dro", "ins", "rep", "upd"):
self.apswcursor.execute ("BEGIN " + con.isolation_level)
self.description = (
many and self.apswcursor.executemany
or self.apswcursor.execute
) (operation, parameters).getdescription ()
except apsw.ExecutionCompleteError:
pass
except apsw.Error as e:
raise _maperr (e)
return self
def executemany (self, operation, seq_of_parameters):
return self.execute (operation, seq_of_parameters, True)
def fetchone (self):
for i in self:
return i
def fetchmany (self, size = None):
if size is None: size = self.arraysize
return list (i[1] for i in zip (range (size), self))
def fetchall (self):
try: return self.apswcursor.fetchall ()
except apsw.Error as e: raise _maperr (e)
if sys.version < '3':
def next (self):
return self.apswcursor.next ()
else:
def __next__ (self):
return self.apswcursor.__next__ ()
def __iter__ (self):
try: return self.apswcursor.__iter__ ()
except apsw.Error as e: raise _maperr (e)
arraysize = 1
def setinputsizes (self, *args): return self
def setoutputsize (self, *args): return self
executescript = execute
def _set_row_factory (self, row_factory):
if row_factory:
self.apswcursor.setrowtrace (
lambda cur, values: row_factory (self, values))
else:
self.apswcursor.setrowtrace (None)
row_factory = property (
lambda self: self.apswcursor.getrowtrace (),
_set_row_factory,
)
lastrowid = property (
lambda self: Connection.last_insert_rowid (self.connection))
def setexectrace (self, callback):
return self.apswcursor.setexectrace (callback)
def getdescription (self):
return self.apswcursor.getdescription ()
try: from rowfactory import Row
except ImportError:
try: from sqmedium.rowfactory import Row
except ImportError: pass
enable_shared_cache = apsw.enablesharedcache
complete_statement = apsw.complete
def _maperr (e):
return OperationalError ("%s: %s" % (e.__class__.__name__, e))