"""
A Pysqlite-style interface to SQLite, front-end of the
sqmediumlite network connection. Requires a back-end
process to be started on the target network location.
Includes special methods to be used on the back-end side,
also available as commands in the interactive shell:
service (on Windows only)
start
status
stop
Mandatory DB-API attributes that are NOT included:
BINARY
Binary (is included with Python 3 only)
DATETIME
Date
DateFromTicks
NUMBER
ROWID
STRING
Time
TimeFromTicks
Timestamp
TimestampFromTicks
Connection.*Error (references to module exceptions)
Cursor.next/__next__
Pysqlite attributes not included:
Cache
OptimizedUnicode
PARSE_COLNAMES
PARSE_DECLTYPES
PrepareProtocol
SQLITE_* (SQLite3 constants)
Statement
adapt
adapters
complete_statement (moved to Connection)
converters
datetime
enable_callback_tracebacks
register_adapter
register_converter
sqlite_version (moved to Connection)
sqlite_version_info
time
version_info
Connection.create_aggregate
Connection.create_collation
Connection.create_function
Connection.enable_load_extension
Connection.iterdump (but it has 'dump')
Connection.load_extension
Connection.set_authorizer
Connection.set_progress_handler
Connection.text_factory
Connection.total_changes
APSW methods that are added to support the interactive shell:
main (interactive shell)
Connection.filename
Connection.setbusytimeout
Cursor.getdescription
Cursor.setexectrace
Known limitations:
Saving data as BLOB requires Python3
"""
import sys
from threading import Lock
from time import sleep
from sqmedium._common import Error, PicklingError, ServerNotUp, \
Socket, version
from sqmedium.rowfactory import Row
try: from sqmediumconf import host
except ImportError: from sqmedium._conf import host
try: from sqmediumconf import port
except ImportError: from sqmedium._conf import port
apilevel = "2.0"
threadsafety = 3
paramstyle = "qmark"
class Warning (Exception): pass
class InterfaceError (Error): pass
class DatabaseError (Error): pass
class InternalError (DatabaseError): pass
class OperationalError (DatabaseError): pass
class ProgrammingError (DatabaseError): pass
class IntegrityError (DatabaseError): pass
class DataError (DatabaseError): pass
class NotSupportedError (DatabaseError): pass
if sys.version >= "3":
Binary = memoryview
class Connection (object):
row_factory = None
isolation_level = property (
lambda self: self._do ("__getattribute__", "isolation_level"))
in_transaction = property (
lambda self: self._do ("__getattribute__", "in_transaction"))
def __init__ (self, fname="", **kwargs):
" understands //host/fname to connect to another host "
if fname [:2] in ("//", r"\\"):
nhost, fname = fname [2:].split (fname [0], 1)
if ":" in nhost:
nhost, nport = nhost.split (":")
self._socket = Socket ().sconnect ((nhost, int (nport)))
else:
self._socket = Socket ().sconnect ((nhost, port))
else:
self._socket = Socket ().sconnect ((host, port))
self._active = Lock ()
self.name = self._do (fname, kwargs)
def cursor (self):
return Cursor (self)
def execute (self, *args):
return Cursor (self).execute (*args)
def executemany (self, *args):
return Cursor (self).executemany (*args)
def executescript (self, *args):
return Cursor (self).executescript (*args)
def commit (self):
self._do ("commit")
def rollback (self):
self._do ("rollback")
def interrupt (self):
Connection ("//%s:%s/" % self._socket.sgetpeername ())._do (
"moddo", "interrupt", self.name)
def dump (self, stdout=sys.stdout):
" alternative for Pysqlite iterdump "
from sqmedium.__main__ import Shell
Shell (db=self, stdout=stdout).process_command (".dump")
def __enter__ (self):
self._do ("__enter__")
return self
def __exit__ (self, *args):
return self._do ("__exit__", *args)
def complete_statement (self, sql):
return self._do ("moddo", "complete_statement", sql)
sqlite_version = property (
lambda self: self._do ("modget", "sqlite_version"))
apsw_version = property (
lambda self: self._do ("modget", "apsw_version"))
backend_version = property (
lambda self: self._do ("modget", "version"))
filename = property (lambda self:
self._do ("__getattribute__", "filename"))
def setbusytimeout (self, v):
return self._do ("setbusytimeout", v)
def _do (self, *args):
" Interface from front- to back-end. "
with self._active:
self._socket.ssend (args)
ret, err = self._socket.srecv ()
if err:
raise _loaderr (*err)
return ret
def close (self):
self._socket.close ()
class Cursor (object):
lastrowid = property (lambda self:
self.connection._do ("last_insert_rowid"))
rowcount = property (lambda self: self.connection._do ("changes"))
exectrace = None
def __init__ (self, connection):
self.connection = connection
self.row_factory = connection.row_factory
self._rows = []
def execute (self, operation, parameters=(), meth="execute"):
if self.exectrace:
self.exectrace (self, operation, parameters)
try:
self.description, self._rows = \
self.connection._do (meth, operation, parameters)
except:
if not self.connection:
raise ProgrammingError ("Cannot operate on a closed cursor")
self._rows = []
raise
if self.row_factory:
self._rows = list (self.row_factory (self, i) for i in self._rows)
return self
def executemany (self, operation, seq_of_parameters):
return self.execute (operation, list (seq_of_parameters),
meth="executemany")
def executescript (self, operation):
return self.execute (operation, meth="executescript")
def fetchone (self):
if self._rows:
return self._rows.pop (0)
def fetchall (self):
rows, self._rows = self._rows, []
return rows
def fetchmany (self, size = None):
if size is None: size = self.arraysize
rows, self._rows = self._rows [:size], self._rows [size:]
return rows
arraysize = 1
def setinputsizes (self, *args):
return self
def setoutputsize (self, *args):
return self
def __iter__ (self):
while self._rows:
yield self._rows.pop (0)
def setexectrace (self, callback):
" calls back only once when executing a script (a.o.t. APSW) "
self.exectrace = callback
def getdescription (self):
return (i [:2] for i in self.description)
def close (self):
self._rows = []
self.connection = None
connect = Connection
def enable_shared_cache (b):
raise Error ("cache sharing must be set in sqmediumconf file")
def status ():
" See if back-end exists and list connections (at least 1) "
try: return Connection ()._do ("modstatus")
except ServerNotUp: pass
def start (foreground=None):
"""
Start back-end process in the current working directory.
Intended for Unix, where this starts a detached process.
On Windows, the service method is preferred.
"""
if host != "127.0.0.1":
raise Error ("Can only start on local host")
if status ():
raise Error ("No need to start twice")
if foreground:
from sqmedium import _backend
_backend.main ()
else:
from subprocess import Popen
Popen ((sys.executable, "-c",
"from sqmedium._backend import main; main ()"))
for i in range (7):
sleep (2 ** i * .05)
if status (): break
else:
raise ServerNotUp ("back-end process not started")
def stop ():
" stop back-end process "
if not status ():
raise ServerNotUp ("back-end process does not exist")
try: Connection ()._do ("modstop")
except ServerNotUp: pass
for i in range (7):
sleep (2 ** i * .05)
if not status (): break
else:
raise Error ("back-end process not stopped")
def service (*args):
"""
Manage back-end process as a service on Windows.
Options are install, start, stop and remove.
"""
from sqmedium import _service
_service.main (argv = [None] + list (args))
def main ():
" interactive shell "
from sqmedium.__main__ import main
main ()
def test (**kwargs):
" integrated test suite "
from sqmedium._test import main
main (**kwargs)
def dir (path=".", fname=""):
" list database files "
return Connection (fname)._do ("moddo", "dir", path)
def _loaderr (k, v):
" re-raise exception returned from back-end "
if sys.version < '3':
import exceptions as builtins
else:
import builtins
if k in globals ():
return globals() [k] (*v)
elif hasattr (builtins, k):
return getattr (builtins, k) (*v)
else:
return ValueError ("failed to load %s: %s" % (k, v))