"""
Back-end process for the network interface. The main thread of
this process waits for connections over a TCP socket interface.
Connections are further handled in seperate threads. The process
is started through the front-end module and may run either as a
service in Windows or as s detached process in Unix.
"""
from os import walk
from os.path import isabs
from threading import Thread, ThreadError, Lock, enumerate
from sqmedium._common import Error, ServerNotUp, PicklingError, \
Socket, e_str, print23, version
try:
from sqmedium.apswdbapi2 import __name__ as wrapper_name, \
Connection, Cursor, NotSupportedError, Error as sqliteError, \
complete_statement, enable_shared_cache, sqlite_version, \
version as apsw_version
except ImportError:
from sqlite3 import __name__ as wrapper_name, \
Connection, Cursor, NotSupportedError, Error as sqliteError, \
complete_statement, enable_shared_cache, sqlite_version, \
version as apsw_version
try: from sqmediumconf import port
except ImportError: from sqmedium._conf import port
try: from sqmediumconf import initsql
except ImportError: from sqmedium._conf import initsql
try: from sqmediumconf import cachesharing
except ImportError: from sqmedium._conf import cachesharing
try: from sqmediumconf import allowedhosts
except ImportError: from sqmedium._conf import allowedhosts
else: import re
class Bconnection (Connection, Thread):
_unrecyclable = None
filename = None
def __init__ (self, fname, kkvv):
self.filename = fname
self._kkvv = kkvv
checkrelative (fname)
if "check_same_thread" in kkvv and kkvv.pop ("check_same_thread"):
raise NotSupportedError ("check_same_thread can not be True")
Connection.__init__ (self, fname, check_same_thread=False, **kkvv)
Thread.__init__ (self)
self.set_authorizer (self._authorizer)
self._pause = Lock ();
self._pause.acquire ()
self._cursor = Cursor (self)
if initsql:
self.executescript (initsql)
def __enter__ (self):
Connection.__enter__ (self)
def execute (self, *args):
return self._cursor.execute (*args).description, \
self._cursor.fetchall()
def executemany (self, *args):
return self._cursor.executemany (*args).description, \
self._cursor.fetchall()
def executescript (self, *args):
if wrapper_name == "sqlite3":
if args [1:] and not args [1]:
args = args [:1]
if not args [0].endswith (';'):
args = (args [0] + ';',) + args [1:]
return self._cursor.executescript (*args).description, \
self._cursor.fetchall()
def setbusytimeout (self, busytimeout):
Connection.setbusytimeout (self, busytimeout)
self._unrecyclable = True
def last_insert_rowid (self):
return self._cursor.lastrowid
def changes (self):
return self._cursor.rowcount
def modget (self, k):
" get module-level attribute "
return globals() [k]
def moddo (self, meth, *args):
" do module-level request "
return globals() [meth] (*args)
def modstatus (self, pooled=None):
" return list of connections "
return dict ((i.name, (i.filename, i._kkvv, i._socket and i._socket.sgetpeername ()[0]))
for i in connections (pooled))
def modstop (self):
if self._socket.sgetpeername ()[0] != "127.0.0.1":
raise Error ("Should only stop from local host")
self._socket.close ()
mainsocket.close ()
Socket ().sconnect (("127.0.0.1", port)).close()
def _authorizer(self, op, p1, p2, p3, p4):
" only called when SQL statement is compiled (OK for me) "
if op == 21:
pass
elif op in (2, 8, 29):
if self.filename in ("", ":memory:"):
self._unrecyclable = True
elif op == 19:
if p2 is not None:
if self.is_alive ():
self._unrecyclable = True
elif op == 24:
checkrelative (p1)
self._unrecyclable = True
return 0
def run (self):
while True:
try:
while True:
req = self._socket.srecv ()
try:
ret = getattr (self, req [0]) (*req [1:])
self._socket.ssend ((ret, None))
except Exception as e:
self._socket.ssend ((None, dumperr (e)))
except ServerNotUp:
pass
if self._unrecyclable:
break
self._socket = None
self.rollback ()
self._pause.acquire ()
if not self._socket:
break
self._cursor.close ()
self.close ()
def interrupt (name):
for i in connections (False):
if i.name == name:
i.interrupt ()
break
else:
raise Error ("no such connection: " + str (name))
def dir (path="."):
checkrelative (path)
for i in walk (path):
return i [1] + i [2]
def connections (pooled=None):
return [i for i in enumerate ()
if type (i) is Bconnection and \
bool (i._socket or i._unrecyclable) is not pooled]
def dumperr (e):
return e.__class__.__name__, e.args
def checkrelative (s):
if ".." in s or isabs (s):
raise Error ("db name not relative: %s" % (s,))
def main ():
global mainsocket
mainsocket = Socket ().sbindlisten (("0.0.0.0", port))
enable_shared_cache (cachesharing)
try:
while True:
socket = mainsocket.saccept ()
try:
if allowedhosts and not re.match (
"127.0.0.1|" + allowedhosts, socket.sgetpeername ()[0]):
socket.close ()
continue
fname, kkvv = socket.srecv ()
pool = connections (True)
for i in pool:
if i.filename == fname and i._kkvv == kkvv:
final_target = i._pause.release
break
else:
i = Bconnection (fname, kkvv)
final_target = i.start
if len (pool) > 2:
pool [0]._unrecyclable = True
pool [0]._pause.release ()
socket.ssend ((i.name, None))
except Exception as e:
try: socket.ssend ((None, dumperr (e)))
except ServerNotUp: pass
else:
i._socket = socket
final_target ()
except ServerNotUp:
pass
except Exception as e:
print23 ("main loop error " + e_str (e))
mainsocket.close ()
for i in connections ():
i._unrecyclable = True
try:
i._pause.release ()
i.interrupt ()
i._socket.sshutdown ()
i._socket.close ()
except (ThreadError, sqliteError, AttributeError, ServerNotUp):
pass
for i in connections ():
i.join (timeout = 7.5)
if i.isAlive (): print23 ("failed to join %s" % i)
if __name__ == "__main__":
main ()