#       m   
#      u    _test.py - Thu Sep 05 15:06 CEST 2013
#  SQLite   test suite
#    d      part of sqmediumlite
#   e       copyright (C): nobody
#  m        

"""
Test script covering network connection, apswdbapi2 and 
rowfactory. Started by "setup.py test".
"""

import random, os, sys, time, threading, subprocess, socket, socket
from unittest import \
        TestCase, TestSuite, TextTestRunner, makeSuite
from sqmedium._common import Socket, e_str, print23
import sqmedium as sqlite
from sqmedium.__main__ import Shell
try: from sqmedium import apswdbapi2
except ImportError: apswdbapi2 = None # APSW not installed
import sqlite3 as pysqlite

if sys.version < "2.7":
    baseTestCase = TestCase
    class TestCase (baseTestCase):
        def assertLessEqual (self, a, b, s=None):
            if not (a <= b):
                if s is None:
                    s = "not %s <= %s" % (a, b)
                self.fail (s)
        def assertGreaterEqual (self, a, b, s=None):
            if not (a >= b):
                if s is None:
                    s = "not %s >= %s" % (a, b)
                self.fail (s)

STARTDIR = os.path.abspath ('.')
TESTDIR = "tmp_sqmedium"
TESTDB = "tmp_test.db"
TESTPORT = 41525
SUBDIR = "subdir"

heavy_query = """
        select 'heavy', max (b2.cbig)
        from tbig b1, tbig b2
        where b1.rowid < 2000
        and b2.cbig = b1.cbig;
        """

def myfunction (x):
    return x + 1

def quicksleep ():
    time.sleep (.25)

def restart_backend ():
    if sqlite.status ():
        sqlite.stop ()
    sqlite.start ()

def reset_frontend (**extras):
    " set non-default port number for this test "
    assert os.path.abspath ('.') == os.path.join (STARTDIR, TESTDIR)
    for (d, dd, ff) in os.walk ('.'):
        for f in ff:
            if f.startswith ("sqmediumconf") and f.endswith (".pyc"):
                os.remove (os.path.join (d, f))
    f = open ("sqmediumconf.py", 'w')
    f.write ("port = %s\n" % TESTPORT)
    for i in extras.items ():
        if type (i[1]) is str:
            f.write ('%s = "%s"\n' % i)
        else:
            f.write ('%s = %s\n' % i)
    f.close ()
    " reload front-end settings "
    try: del sys.modules ['sqmediumconf']
    except KeyError: pass
    try: from sqmediumconf import host
    except ImportError: from sqmedium._conf import host
    try: from sqmediumconf import port
    except ImportError: from sqmedium._conf import port
    sqlite.host, sqlite.port = host, port

class Testthread (threading.Thread):
    """
    execute query in a thread
    """
    def __init__ (self, sqls, con = None):
        self.con = con
        self.sqls = sqls
        self.out = "thr%i.out" % id (self)
        open (self.out, 'w').close ()
        os.remove (self.out)
        threading.Thread.__init__ (
                self,
                target=self.__target,
                )
        threading.Thread.start (self)
        for retry in range (6): # 3 sec.
            time.sleep (2 ** retry * .05)
            if os.access (self.out, os.R_OK):
                break
        else:
            print23 ("Testthread: spoolfile does not appear")
        time.sleep (.10) # for the SQL to start
    def start (self):
        raise AssertionError ("Testthread uses no start method")
    def __target (self):
        if not self.con:
            con = sqlite.Connection (TESTDB, isolation_level = None)
        else:
            con = self.con
        f = open (self.out, 'w')
        try:
            try:
                shell = Shell (db=con, stdout=f, stderr=f)
                for s in self.sqls.split (';'):
                    shell.process_complete_line(s.strip())
            except Exception as e:
                f.write (e_str (e) + '\n')
        finally:
            f.close()
            if not self.con:
                con.close ()
    def join (self, timeout=None):
        if timeout is not None:
            threading.Thread.join (self, timeout)
        else:
            timeout = 7.5 # respond to keyboard interrupts
            while self.isAlive ():
                threading.Thread.join (self, timeout)
        return self
    def get_result (self):
        try:
            return open (self.out, 'r').read ().strip ()
        except IOError as e:
            return None

class Testproc:
    def __init__ (self, sqls, fname=TESTDB):
        self.out = "proc%i.out" % id (self)
        open (self.out, 'w').close ()
        os.remove (self.out)
        pargs = [
                sys.executable,
                "-c", "import sqmedium; sqmedium.main ()",
                fname,
                ".output " + self.out.replace ("\\", "\\\\"),
                ]
        for s in sqls.split (';'):
            pargs.append (s.strip())
        self._proc = subprocess.Popen (pargs)
        for retry in range (6): # 3 sec.
            time.sleep (2 ** retry * .05)
            if os.access (self.out, os.R_OK):
                break
        else:
            print23 ("Testproc: spoolfile does not appear")
        time.sleep (.05) # for the SQL to start TODO korter?
    def wait (self):
        self._proc.wait ()
        return self
    def terminate (self):
        try:
            self._proc.terminate ()
        except Exception as e:
            if "No such process" not in str (e):
                print23 ("Exception ignored:", e_str (e))
        return self._proc.wait ()
    def get_result (self):
        try:
            return open (self.out, 'r').read ().strip ()
        except IOError as e:
            return None


class Control (TestCase):
    def init (self): # executed as first test
        # run the test in a temporary directory
        try: os.mkdir (TESTDIR)
        except OSError: pass
        os.chdir (TESTDIR)
        try: os.mkdir (SUBDIR)
        except OSError: pass
        # find sqmediumconf after chdir
        sys.path.insert (0, '.')
        # same when starting a sub-process
        basePopen = subprocess.Popen
        def Popen (argv):
                argv = list (argv)
                argv [2] = "import sys; sys.path.insert (0, '" + \
                        STARTDIR + \
                        "'); " + \
                        "sys.path.insert (0, '.'); " + \
                        argv [2]
                return basePopen (argv)
        subprocess.Popen = Popen
        # use a non-default port number
        reset_frontend ()
        restart_backend ()
        # create some test data
        con = sqlite.Connection (TESTDB)
        cur = con.cursor ()
        cur.execute ("drop table if exists t")
        cur.execute ("create table t (c)")
        cur.execute ("insert into t values ('a')")
        cur.execute ("insert into t values ('b')")
        cur.execute ("insert into t values ('c')")
        cur.execute ("drop table if exists t2")
        cur.execute ("create table t2(c1, c2)")
        cur.execute ("insert into t values ('c')")
        cur.execute ("insert into t2 values ('a1', 'a2')")
        cur.execute ("insert into t2 values ('b1', 'b2')")
        cur.execute ("drop table if exists tbig")
        cur.execute ("create table tbig (cbig)")
        cur.execute ("insert into tbig values ('c')")
        for i in range (12):
            cur.execute ("insert into tbig select cbig || cbig from tbig")
        con.commit ()
        cur.close ()
        con.close ()
        return self
    def exit (self):
        self._clear ()
    def _clear (self):
        timeout = 7.5
        if sqlite.status ():
            sqlite.stop ()
        os.chdir (STARTDIR)
        for (d, dd, ff) in os.walk (TESTDIR, topdown=False):
            assert d.startswith (TESTDIR), d
            for f in list (ff) + [None]:
                while True:
                    try:
                        if f:
                            os.remove (os.path.join (d, f))
                        else:
                            os.rmdir (d)
                    except Exception as e:
                        if timeout > 0:
                            time.sleep (1)
                            timeout -= 1
                        else:
                            print23 ("ignored: " + e_str (e))
                            break
                    else:
                        break

class VersionCompat (TestCase):
    def setUp (self):
        self.con = sqlite.Connection ()
    def tearDown (self):
        try: self.con.close ()
        except: pass

    def j001VersionNotAvailable (self):
        " Back-end versions are available as connection attributes? "
        self.assertNotEqual (
                self.con.sqlite_version,
                None,
                )

    def j010MinimumWrapperVersion (self):
        " APSW/Pysqlite version up to date? "
        if not apswdbapi2:
            self.assertGreaterEqual (
                    self.con.apsw_version,
                    "2.4.1", # Pysqlite version in Python 2.6 and 3.1
                    )
        else:
            self.assertGreaterEqual (
                    self.con.apsw_version,
                    "3.6.21",
                    )
    def j020MinimumSqliteVersion (self):
        " SQLite version up to date? "
        self.assertGreaterEqual (
                self.con.sqlite_version,
                "3.5.9",
                )

    def j110BackendVersion (self):
        " back-end version is reported? "
        self.assertEqual (
                self.con.backend_version,
                sqlite.version,
                )

    def j120BackendApswVersion (self):
        " back-end wrapper version is reported? "
        if not apswdbapi2:
            self.assertEqual (
                    self.con.apsw_version,
                    pysqlite.version,
                    )
        else:
            self.assertEqual (
                    self.con.apsw_version,
                    apswdbapi2.apsw.apswversion(),
                    )


class baseAttributeCompat (TestCase):
    " applied to sqmediumlite front-end as well as to plain apsw_dbapi "
    sqlite = None
    module_ignores = ( # unsupported DB-API attributes
            "BINARY",
            "DATETIME",
            "Date",
            "DateFromTicks",
            "NUMBER",
            "ROWID",
            "STRING",
            "Time",
            "TimeFromTicks",
            "Timestamp",
            "TimestampFromTicks",
            ) + ( # Pysqlite attributes
            "Cache",
            "OptimizedUnicode",
            "PARSE_COLNAMES",
            "PARSE_DECLTYPES",
            "PrepareProtocol",
            "SQLITE_*",
            "Statement",
            "adapt",
            "adapters",
            "converters",
            "datetime",
            "enable_callback_tracebacks",
            "register_adapter",
            "register_converter",
            "sqlite_version_info",
            "time",
            "version_info",
            ) + tuple ( # sqlite3 constants
            k for k in dir (pysqlite) if k.startswith ("SQLITE_")
            ) + ( # strange attributes
            "",
            "dbapi2",
            "x",
            )
    connection_ignores = (
            "iterdump",
            "text_factory",
            ) + ( # Exceptions at connection level
            "DataError",
            "DatabaseError",
            "Error",
            "IntegrityError",
            "InterfaceError",
            "InternalError",
            "NotSupportedError",
            "OperationalError",
            "ProgrammingError",
            "Warning",
            )
    cursor_ignores = (
            )
    def setUp (self):
        self.con0 = pysqlite.Connection (TESTDB, isolation_level=None)
        self.con = self.sqlite.Connection (TESTDB, isolation_level=None)
        self.cur0 = self.con0.cursor ()
        self.cur = self.con.cursor ()
    def tearDown (self):
        self.cur0.close ()
        self.cur.close ()
        self.con0.close ()
        self.con.close ()
    def compare (self, inst1, inst2, ignores=()):
        errors = []
        for i in dir (inst1):
            if i in ignores or i [0] == '_':
                continue
            try:
                try:
                    v1 = getattr (inst1, i)
                    tp1 = type (getattr (inst1, i))
                except:
                    raise Exception ("Error getting attribute %s.%s" % (inst1, i))
                v2 = getattr (inst2, i)
                tp2 = type (v2)
                if tp2.__name__ != tp1.__name__ and \
                        tp2 != tp1 and not (
                           hasattr (v1, "__call__") and
                           hasattr (v2, "__call__")
                       ) and not (
                           isinstance (v1, dict) and
                           isinstance (v2, dict)
                       ) and not (
                           isinstance (v1, int) and
                           isinstance (v2, long)
                       ):
                    errors.append ("Type difference pysqlite/sqmediumlite: %s: %s / %s" % (i, tp1, tp2))
                elif tp1 in (str, int, float, bool) and \
                        v1 != v2 and "version" not in i and i != "threadsafety":
                    errors.append ("Value difference pysqlite/sqmediumlite: %s: %s / %s" % (i, v1, v2))
            except self.sqlite.NotSupportedError as e:
                pass
            except Exception as e:
                errors.append ("%s: %s" % (i, e_str (e)))
        if errors:
            self.fail ("\n".join (errors))

    def j010ModuleAttrs (self):
        " Module-level attributes correspond with Pysqlite? "
        self.compare (pysqlite, self.sqlite, ignores=self.module_ignores)

    def j020ConnectionAttrs (self):
        " Connection attributes correspond with Pysqlite? "
        self.compare (self.con0, self.con, ignores = self.connection_ignores)

    def j030CursorAttrs (self):
        " Cursor attributes correspond with Pysqlite? "
        # initialize rowcount and lastrowid
        for i in self.cur0, self.cur:
            i.execute ("delete from t where c = 5555")
            i.execute ("insert into t values (5555)")
        self.compare (self.cur0, self.cur, ignores = self.cursor_ignores)

    def j060ReadOnlyError (self):
        " Can not set read-only attribute? "
        self.assertRaises (
                (TypeError, AttributeError),
                setattr,
                self.cur, "rowcount", 10,
                )

    def j110ExecuteReturns (self):
        " Execute returns cursor? "
        self.assertEqual (
                type (self.cur.execute ("select 0")),
                self.sqlite.Cursor,
                )
        self.assertEqual (
                type (self.con.execute ("select 0")),
                self.sqlite.Cursor,
                )
        self.assertEqual (
                type (self.cur.executemany (
                    "insert into t values (?)", ('x', 'y', 'z'))),
                self.sqlite.Cursor,
                )
        self.assertEqual (
                type (self.cur.executescript (
                    "insert into t values ('x'); delete from t where c = 'x';")),
                self.sqlite.Cursor,
                )

    def j140PosArgs (self):
        " Connect accepts only one positional argument? "
        self.assertRaises (
                TypeError,
                self.sqlite.connect,
                TESTDB,
                None,
                None,
                )

    def j150KeywordArgs (self):
        " Connect accepts supported keyword parameters? "
        con = self.sqlite.Connection (
                TESTDB,
                timeout = 1,
                isolation_level = "EXCLUSIVE",
                check_same_thread = False,
                cached_statements = 10,
                ).close ()

    def j160WrongKeyword (self):
        " Connect does not accept unknown keyword parameter? "
        try:
            self.sqlite.connect (uisfddasfg=1).close ()
        except TypeError:
            pass
        else:
            self.fail ("Exception NOT raised")

if not apswdbapi2:
    class apswAttributeCompat (TestCase): pass
else:
    class apswAttributeCompat (baseAttributeCompat):
        sqlite = apswdbapi2

class AttributeCompat (baseAttributeCompat):
    sqlite = sqlite
    module_ignores = baseAttributeCompat.module_ignores + (
            "complete_statement",
            "sqlite_version",
            "sqlite_version_info",
            )
    if sys.version < "3":
        module_ignores += (
                "Binary",
                )
    connection_ignores = baseAttributeCompat.connection_ignores + (
            "create_aggregate",
            "create_collation",
            "create_function",
            "enable_load_extension",
            "load_extension",
            "set_authorizer",
            "set_progress_handler",
            "total_changes",
            )
    cursor_ignores = baseAttributeCompat.cursor_ignores + (
            "next",
            "__next__", # Python3
            )


class InterruptCompat (TestCase):
    def setUp (self):
        self.con = sqlite.Connection (TESTDB)
    def tearDown (self):
        try: self.con.close ()
        except: pass

    def j010InterruptQuery (self):
        " Can we interrupt a heavy query (not on my VPS)? "
        thr = Testthread (heavy_query, self.con)
        t0 = time.time ()
        try:
            self.con.interrupt ()
        finally:
            thr.join ()
        self.assertTrue (
                "interrupt" in thr.get_result(),
                thr.get_result(),
                )
        self.assertLessEqual (
                time.time () - t0,
                1.,
                )

    def j030InterruptClosed (self):
        " Interrupt on a closed connection raises error? "
        self.con.close ()
        quicksleep ()
        self.assertRaises (
                sqlite.Error,
                self.con.interrupt,
                )

    def j040InterruptDone (self):
        " No problem interrupting when execute has finished? "
        self.con.execute ("delete from t where c = 'done before interrupt'")
        self.con.interrupt ()
        self.assertEqual (
                self.con.execute ("select 123").fetchone (),
                (123, ),
                )


class SameThreadCompat (TestCase):
    def setUp (self):
        self.con = sqlite.Connection (TESTDB)
    def tearDown (self):
        try: self.con.close ()
        except: pass

    def j010CheckSameThread (self):
        " Parameter check_same_thread can not be True? "
        self.con.close ()
        self.con = sqlite.connect (TESTDB, check_same_thread = False)
        self.con.close ()
        try:
            self.con = sqlite.connect (TESTDB, check_same_thread = True)
        except sqlite.NotSupportedError:
            pass
        else:
            self.fail ("Exception not raised")

    def j020ThreadAllowed (self):
        " Multiple threads are allowed by default (threadsafety not zero)? "
        self.assertGreaterEqual (
                sqlite.threadsafety,
                1,
                )
        thr = Testthread ("SELECT 123").join ()
        self.assertEqual (
                thr.get_result (),
                "123",
                )

    def j030ExceptionAllowed (self):
        " Mutex is cleared after exception? "
        self.assertRaises (
                sqlite.PicklingError,
                self.con._do,
                "create_function", "f", myfunction,
                )
        thr = Testthread ("SELECT 123", self.con).join ()
        self.assertEqual (
                thr.get_result (),
                "123",
                )

    def j040Overlap (self):
        " Concurrent use from two threads gets synchronized? "
        thr1 = Testthread (heavy_query, self.con)
        thr2 = Testthread ("SELECT 9302", self.con)
        thr2.join () # implies thr1 is finished too
        self.assertEqual (
                thr1.get_result() [:5],
                "heavy",
                )
        self.assertEqual (
                thr2.get_result(),
                "9302",
                )


class TimeoutCompat (TestCase):
    def tearDown (self):
        self.con.close ()
        try: self.thr.join ()
        except AttributeError: pass

    def j010TimeoutSet (self):
        " Timeout set by default? "
        msec = 500 # only test .5 seconds
        self.con = sqlite.Connection (TESTDB)
        self.thr = Testthread ("""
                begin immediate; 
                .sleep %s; 
                rollback
                """ % (msec,))
        t0 = time.time ()
        self.con.execute ( "begin immediate")
        self.assertGreaterEqual (
                time.time () - t0,
                .001 * msec - .10,
                )
        self.assertLessEqual (
                time.time () - t0,
                .001 * msec + .25,
                )

    def j020TimeoutChanged (self):
        " Timeout set by connect parameter? "
        msec = 500
        timeout = .1
        self.con = sqlite.connect (TESTDB, timeout=timeout)
        self.thr = Testthread ("""
                begin immediate; 
                .sleep %s; 
                rollback
                """ % (msec,))
        t0 = time.time ()
        self.assertRaises (
                sqlite.OperationalError,
                self.con.execute,
                "begin immediate",
                )
        self.assertGreaterEqual (
                time.time () - t0,
                .5 * timeout)
        self.assertLessEqual (
                time.time () - t0,
                2 * timeout)

if not apswdbapi2 and sys.version < '3.2' :
    # Pysqlite in_transaction since 3.2
    class TransactionCompat (TestCase): pass
else:
    class TransactionCompat (TestCase):
        def setUp (self):
            self.con = sqlite.connect (TESTDB)
            self.con2 = sqlite.connect (TESTDB, timeout=0.)
            self.v = random.random () # unique
        def tearDown (self):
            try: self.con.close ()
            except: pass
            try: self.con2.close ()
            except: pass

        def j010DefaultIsolation (self):
            " Isolation is enabled by default? "
            self.assertFalse (self.con.in_transaction)
            self.con.execute ("INSERT INTO t (c) VALUES (22)")
            self.assertTrue (self.con.in_transaction)
            self.con.commit ()
            self.assertFalse (self.con.in_transaction)

        def j020IsolationNone (self):
            " Isolation_is disabled when isolation_level is None? "
            self.con = sqlite.connect (TESTDB, isolation_level=None)
            self.assertFalse (self.con.in_transaction)
            self.con.execute ("INSERT INTO t (c) VALUES (22)")
            self.assertFalse (self.con.in_transaction)

        def j010CloseImpliesRollback1 (self):
            " Implicit transaction is rolled back when con. is closed? "
            self.con.execute ("insert into t values (?)", (self.v,))
            self.con.close ()
            self.assertEqual (
                    self.con2.execute ("select count (*) from t where c = ?", (self.v,)).fetchone (),
                    (0,),
                    )

        def j020CloseImpliesRollback2 (self):
            " Explicit transaction is rolled back when con. is closed? "
            self.con.close ()
            self.con = sqlite.connect (TESTDB, isolation_level = None)
            self.con.execute ("begin")
            self.con.execute ("insert into t values (?)", (self.v,))
            self.con.close ()
            self.assertEqual (
                    self.con2.execute ("select count (*) from t where c = ?", (self.v,)).fetchone (),
                    (0,),
                    )

        def j030CommitMethod (self):
            " Commit and rollback methods do not fail if no changes? "
            self.con.commit ()
            self.con.rollback ()

        def j110NotIntrans (self):
            " Autocommit is initially True? "
            self.assertFalse (self.con.in_transaction)

        def j120Intrans (self):
            " Autocommit becomes False after DML or BEGIN? "
            for s in (
                    "INSERT INTO t (c) VALUES (2)",
                    "UPDATE t SET c = 2 WHERE c = 2",
                    "UpDate t SET c = 2 WHERE c = 2 -- mixed case",
                    """
                    UPDATE t SET c = 2 WHERE c = 2 -- incl newlines
                    """,
                    ###"""
                    ###-- incl comments
                    ###UPDATE t SET c = 2 WHERE c = 2
                    ###""",
                    "REPLACE INTO t SELECT 2",
                    "REPLACE INTO t SELECT 2",
                    "DELETE FROM t WHERE c = 2",
                    "BEGIN",
                    ):
                self.con.execute (s)
                self.assertTrue (
                        self.con.in_transaction,
                        "not in trans. after: " + s)
                self.con.commit ()

        def j130CommitMethod (self):
            " Autocommit is reset after commit or rollback method? "
            for i in (
                    self.con.commit,
                    self.con.rollback,
                    ):
                self.con.execute ("INSERT INTO t (c) VALUES (22)")
                i ()
                self.assertFalse (self.con.in_transaction,
                        "still in trans. after: " + i.__name__)

        def j150IsolationlevelNone (self):
            " Begin begins transaction when isolation_level is None? "
            self.con.close ()
            self.con = sqlite.connect (TESTDB, isolation_level= None)
            self.assertFalse (self.con.in_transaction)
            self.con.execute ("insert into t values (33)")
            self.assertFalse (self.con.in_transaction)
            self.con.execute ("begin")
            self.assertTrue (self.con.in_transaction)
            self.con.execute ("rollback")
            self.assertFalse (self.con.in_transaction)


class ExecuteCompat (TestCase):
    def setUp (self):
        self.con = sqlite.Connection (TESTDB)
        self.cur = sqlite.Cursor (self.con)
    def tearDown (self):
        self.con.close ()

    def j110ExecuteMany (self):
        " Can use executemany? "
        n0 = self.cur.execute ("select count (*) from t").fetchone()[0]
        self.cur.executemany (
                "insert into t (c) values (?)",
                [(1,), (2,), (3,), (4,), (5,)],
                )
        self.assertEqual (
                self.cur.execute ("select count (*) from t").fetchone()[0],
                n0 + 5)

    def j120ExecuteManyIterator (self):
        " Can use executemany with binding generator? "
        n0 = self.cur.execute ("select count (*) from t").fetchone()[0]
        self.cur.executemany (
                "insert into t (c) values (?)",
                ((i,) for i in range (5)),
                )
        self.assertEqual (
                self.cur.execute ("select count (*) from t").fetchone()[0],
                n0 + 5)

    if apswdbapi2:
        def j130FetchExecuteMany (self):
            " Can fetch rows from executemany (APSW feature)? "
            bindings = [(1,), (2,), (3,), (4,), (5,)]
            self.assertEqual (
                    self.cur.executemany ("select ?", bindings).fetchall (),
                    bindings
                    )


class baseFetchCompat (TestCase):
    " applied to sqmediumlite front-end as well as to plain apsw_dbapi "
    def setUp (self):
        self.con = self.sqlite.Connection (TESTDB)
        self.cur = self.con.execute ("select rowid-1 from tbig limit 88")
    def tearDown (self):
        self.con.close ()

    def j010FetchOne (self):
        " All rows returned using fetchone? "
        i = 0
        while i < 100:
            row = self.cur.fetchone ()
            if row is None:
                break
            self.assertEqual (row, (i,))
            i += 1
        self.assertEqual (i, 88)

    def j020FetchAll (self):
        " All rows returned using fetchall? "
        rows = self.cur.fetchall ()
        self.assertEqual (type (rows), list)
        self.assertEqual (len (rows), 88)
        for i in range (len (rows)):
            self.assertEqual (rows[i], (i,))

    def j040Iteration (self):
        " All rows returned using iteration? "
        i = 0
        for row in self.cur:
            self.assertEqual (row, (i,))
            i += 1
        self.assertEqual (i, 88)

    def j110FetchMany1 (self):
        " Fetchmany fetches specified size? "
        self.assertEqual (
                len (self.cur.fetchmany (20)),
                20,
                )
        self.assertEqual (
                len (self.cur.fetchall ()),
                88 - 20,
                )

    def j120FetchMany2 (self):
        " Fetchmany default size is one? "
        self.assertEqual (
                len (self.cur.fetchmany ()),
                1,
                )
        self.assertEqual (
                len (self.cur.fetchall ()),
                88 - 1,
                )

    def j130FetchMany3 (self):
        " Fetchmany with zero size does not fail? "
        n1 = len (self.cur.fetchmany (0))
        self.assertEqual (
                n1 + len (self.cur.fetchall ()),
                88,
                )

    def j140FetchMany4 (self):
        " Fetchmany with negeative size does not fail? "
        n1 = len (self.cur.fetchmany (-1))
        self.assertEqual (
                n1 + len (self.cur.fetchall ()),
                88,
                )


    def j150FetchMany5 (self):
        " Fetchmany fetches all if size is greater than rowcount? "
        self.assertEqual (
                len (self.cur.fetchmany (115)),
                88,
                )
        self.assertEqual (
                len (self.cur.fetchall ()),
                0,
                )

    def j160FetchMany6 (self):
        " Fetchmany can be repeated with different step size? "
        self.assertEqual (
                len (self.cur.fetchmany (15)),
                15,
                )
        self.assertEqual (
                len (self.cur.fetchmany (5)),
                5,
                )
        self.assertEqual (
                len (self.cur.fetchmany ()),
                1,
                )
        self.assertEqual (
                len (self.cur.fetchmany (100)),
                88 - 15 - 5 - 1,
                )
        self.assertEqual (
                len (self.cur.fetchmany ()),
                0,
                )

    def j170FetchMany7 (self):
        " Fetchmany uses arraysize as default size? "
        self.cur.arraysize = 8
        self.assertEqual (
                len (self.cur.fetchmany ()),
                8,
                )
        self.assertEqual (
                len (self.cur.fetchall ()),
                88 - 8,
                )

    def j110BinaryRoundtrip (self):
        " Save binary data as BLOB (NOTE: requires Python3)? "
        if sys.version < "3":
            b0 = "abc\xff\xfe"
        else: # Python3
            b0 = b"abc\xff\xfe"
        b1 = self.sqlite.Binary (b0)
        b2 = self.cur.execute ("select ?", (b1,)).fetchone () [0]
        self.assertEqual (len (b2), len (b1))
        self.assertEqual (b2, b1)


class FetchCompat (baseFetchCompat):
    sqlite = sqlite

if not apswdbapi2:
    class apswFetchCompat (TestCase): pass
else:
    class apswFetchCompat (baseFetchCompat):
        sqlite = apswdbapi2


class baseRowfactoryCompat (TestCase):
    " applied to sqmediumlite front-end as well as to plain apsw_dbapi "
    def setUp (self):
        self.con = self.sqlite.connect (TESTDB)
        self.data = self.con.execute ("SELECT * FROM t2").fetchall ()
        self.con.row_factory = self.sqlite.Row
        self.cur = self.con.cursor ()
        self.cur.execute ("SELECT * FROM t2")
    def tearDown (self):
        try: self.cur.close ()
        except: pass
        try: self.con.close ()
        except: pass

    def check_row (self, row, i):
        self.assertEqual (
                row.c2,
                self.data [i] [1],
                )
        self.assertEqual (
                row ["c2"],
                self.data [i] [1],
                )
        self.assertRaises (
                (KeyError, AttributeError),
                row.__getitem__, # columnnumber not supported
                1,
                )
        self.assertEqual (
                len (row),
                2,
                )

    def j005CheckData (self):
        " Test data has been set up? "
        self.assertTrue (1 < len (self.data) < 10)

    def j010FetchOne (self):
        " Fetchone yields proper Row instance? "
        i = 0
        while i < 100:
            row = self.cur.fetchone ()
            if row is None:
                break
            self.check_row (row, i)
            i += 1
        self.assertEqual (i, len (self.data))

    def j020FetchAll (self):
        " Fetchall yields proper Row instances? "
        rows = self.cur.fetchall ()
        self.assertEqual (type (rows), list)
        self.assertEqual (len (rows), len (self.data))
        for i in range (len (rows)):
            self.check_row (rows[i], i)


    def j040Iteration (self):
        " Iteration yields proper Row instances? "
        i = 0
        for row in self.cur:
            self.check_row (row, i)
            i += 1
        self.assertEqual (i, len (self.data))

    def j050FetchAndClose (self):
        " Description is saved with the Row instance? "
        row = self.cur.fetchone ()
        self.cur.execute ("SELECT c FROM t") # deallocate cursor
        self.check_row (row, 0)

    def j060CursorAttribute (self):
        " Cursor row_factory can be set independently? "
        self.cur.row_factory = None
        self.cur.execute ("SELECT * FROM t2")
        self.assertEqual (
                type (self.cur.fetchone ()),
                tuple,
                )

    def j070IndexError (self):
        " Wrong name raises row factory error? "
        row = self.cur.fetchone ()
        self.assertRaises (
                (AttributeError, IndexError, KeyError, ),
                getattr,
                row,
                "easymoney",
                )

    def j080ImplicitCursor (self):
        " Row factory applied to implicit cursor? "
        self.check_row (
                self.con.execute ("SELECT * FROM t2").fetchone (),
                0)

    def j090SetValue (self):
        " Columns can be assigned new value? "
        row = self.con.execute ("SELECT 44 as ff").fetchone ()
        row.ff += 1
        self.assertEqual (row.ff, 45)
        row["ff"] += 1
        self.assertEqual (row["ff"], 46)
        self.assertEqual (row.ff, 46)
        row.nn = 44 + 1 + 1
        self.assertEqual (row.nn, 46)
        self.assertEqual (row["nn"], 46)
        self.assertEqual (row.nn, row.ff)

    def j100UseAsBindVariable (self):
        " The row can be passed to a new query as dict. of bind variables? "
        for i in self.cur.execute (
                "SELECT rowid, c2 FROM t2 WHERE rowid <= 10"
                ):
            if self.sqlite is sqlite:
                i = dict (i)
            self.assertEqual (
                    len (self.con.execute (
                            "SELECT * FROM t2 WHERE rowid=:rowid",
                            i
                            ).fetchall ()),
                    1,
                    )

    def j200CompareRow (self):
        " Rows can be compared? "
        rows = self.con.execute (
                "SELECT 44 UNION ALL SELECT 44 UNION ALL SELECT 45"
                ).fetchall ()
        self.assertEqual (
                rows [0],
                rows [1],
                )
        self.assertNotEqual (
                rows [0],
                rows [2],
                )


    def j210FetchOneNoData (self):
        " Fetchone yields None if no data found? "
        self.cur.execute ("SELECT * FROM t2 WHERE 1 = 0")
        self.assertEqual (
                self.cur.fetchone (),
                None,
                )

    def j220FetchAllNoData (self):
        " Fetchall yields empty list if no data found? "
        self.cur.execute ("SELECT * FROM t2 WHERE 1 = 0")
        self.assertEqual (
                self.cur.fetchall (),
                [],
                )

    def j230IterationNoData (self):
        " Iteration yields no data if no data found? "
        self.cur.execute ("SELECT * FROM t2 WHERE 1 = 0")
        self.assertEqual (
                list (self.cur),
                [],
                )


class RowfactoryCompat (baseRowfactoryCompat):
    sqlite = sqlite

if not apswdbapi2:
    class apswRowfactoryCompat (TestCase): pass
else:
    class apswRowfactoryCompat (baseRowfactoryCompat):
        sqlite = apswdbapi2

class pysqliteRowfactoryCompat (baseRowfactoryCompat):
    sqlite = pysqlite
    from sqmedium.rowfactory import Row
    sqlite.Row = Row


class HeavyLoadCompat (TestCase):
    def setUp (self):
        self.con = sqlite.connect (TESTDB)
        self.cur = self.con.cursor ()
    def tearDown (self):
        try: self.cur.close ()
        except: pass
        try: self.con.close ()
        except: pass

    def j005Stress25 (self):
        " What if 25 jobs concurrently query the database? "
        ii = []
        results = []
        ncon = 25
        check = self.cur.execute ("SELECT MAX (cbig) FROM tbig").fetchone () [0]
        for i in range (ncon):
            ii.append (Testthread ("SELECT MAX (cbig) FROM tbig"))
        for i in ii:
            i.join ()
            results.append (i.get_result().strip())
        self.assertEqual (
                results,
                ncon * [check],
                )

    def j010MultiprocessInserts (self):
        " Handle transactions from multiple processes? "
        self.cur.execute ("DELETE FROM t WHERE c = 7711")
        self.con.commit ()
        ii = []
        ncon = 5
        ntrans = 10
        for i in range (ncon):
            ii.append (Testproc (
                    ntrans * """
                        INSERT INTO t (c) VALUES (7711);
                        """,
                    ))
        for i in ii:
            i.wait ()
            res = i.get_result ()
            for j in range (10):
                res = res.replace (str (j), "")
            res = res.strip ()
            self.assertEqual (
                res,
                ""
                )
        self.assertEqual (
            self.cur.execute ("SELECT COUNT (*) FROM t WHERE c == 7711").fetchone (),
            (ncon * ntrans,),
            )

    def j020MultithreadInserts (self):
        " Handle transactions from multiple threads? "
        self.cur.execute ("DELETE FROM t WHERE c = 7721")
        self.con.commit ()
        ii = []
        ncon = 10
        ntrans = 5
        for i in range (ncon):
            ii.append (Testthread (ntrans * """
                    INSERT INTO t (c) VALUES (7721);
                    """))
        for i in ii:
            i.join ()
            res = i.get_result ()
            for j in range (10):
                res = res.replace (str (j), "")
            res = res.strip ()
            self.assertEqual (
                res,
                ""
                )
        self.assertEqual (
            self.cur.execute ("SELECT COUNT (*) FROM t WHERE c == 7721").fetchone (),
            (ncon * ntrans,),
            )


class AttachCompat (TestCase):
    def setUp (self):
        self.con = sqlite.connect (TESTDB)
        self.cur = self.con.cursor ()
    def tearDown (self):
        try: self.cur.close ()
        except: pass
        try: self.con.close ()
        except: pass

    def j010AttachOK (self):
        " Attach allowed? "
        fname = "tmpj010attach.db"
        self.cur.execute ("attach '%s' as att1" % (fname,))
        self.cur.execute ("detach att1")

    def j020AttachSubdir (self):
        " Attach allowed to subdir? "
        fname = os.path.join (SUBDIR, "attach.db")
        self.cur.execute ("attach '%s' as att1" % fname)
        self.cur.execute ("detach att1")
        open (fname, 'r').close () # really exists

    def j030AttachMemory (self):
        " Attach allowed to :memory:? "
        fname = ":memory:"
        self.cur.execute ("attach '%s' as att1" % fname)
        self.cur.execute ("detach att1")
        self.assertRaises (
                IOError,
                open,
                fname,
                'r',
                )


class MiscCompat (TestCase):
    def tearDown (self):
        try: self.cur.close ()
        except: pass
        try: self.con.close ()
        except: pass

    def j030Subdir (self):
        " DB may include relative path? "
        fname = os.path.join (SUBDIR, "subdir.db")
        self.con = sqlite.connect (fname)
        open (fname, 'r').close () # really exists?

    def j050EmptyStatement (self):
        " Empty statement allowed, resets cursor? "
        self.con = sqlite.connect (TESTDB)
        self.cur = self.con.cursor ()
        self.cur.execute ("SELECT 729 FROM tbig")
        self.cur.execute ("")
        self.assertEqual (
                self.cur.fetchone (),
                None,
                )

    def j060InvalidDatabase (self):
        " Invalid database name is handled? "
        self.assertRaises (
                TypeError,
                sqlite.connect,
                None,
                )
        self.assertRaises (
                sqlite.OperationalError,
                sqlite.connect,
                "adssssae\\we45z/ewqfas\\deqva/dfdasewq",
                )

    def j070CursorClosedTwice (self):
        " It is not an error if cursor is closed twice? "
        self.con = sqlite.Connection (TESTDB)
        self.cur = sqlite.Cursor (self.con)
        self.cur.close ()
        self.cur.close ()

    def j080ConnectionClosedTwice (self):
        " It is not an error if a connection is closed twice? "
        self.con = sqlite.Connection (TESTDB)
        self.cur = sqlite.Cursor (self.con)
        self.con.close ()
        self.con.close ()

    def j090FetchAll (self):
        " Fetchall does not return iterator? "
        self.con = sqlite.connect (TESTDB)
        self.cur = self.con.cursor ()
        self.assertEqual (
                type (self.cur.execute ("SELECT * FROM t2 ").fetchall ()),
                list,
                )

    def j100FetchLarge (self):
        " Large amount of data is succesfully fetched? "
        self.con = sqlite.connect (TESTDB)
        self.cur = self.con.cursor ()
        self.assertEqual (
                len (self.cur.execute ("SELECT ?", (100000 * 'a',)).fetchone () [0]),
                100000,
                )


if not apswdbapi2:
    # transaction features only apply to apswdbapi2
    class TransactionFeatures (TestCase): pass
else:
    class TransactionFeatures (TestCase):
        def setUp (self):
            self.con = sqlite.connect (TESTDB)
            self.cur = self.con.cursor ()
            self.con2 = sqlite.connect (TESTDB, timeout=0, isolation_level=None)
        def tearDown (self):
            try: self.cur.close ()
            except: pass
            try: self.con.close ()
            except: pass
            try: self.con2.close ()
            except: pass

        def j020NoSharedLock (self):
            " Shared-lock released immediately while pre-fetch enabled? "
            self.cur.execute ("select rowid from tbig")
            self.con2.execute ("insert into t values (?)", ("dsfdf",))

        def j030DDLNotCommitted (self):
            " Create table does not imply commit (apswdbapi2 feature)? "
            self.con.execute ("DROP TABLE IF EXISTS t66")
            self.con.commit ()
            self.con.execute ("CREATE TABLE t66 (c)")
            self.assertRaises (
                    sqlite.OperationalError,
                    self.con2.execute,
                    "select * from t66",
                    )

        def j040BeginStatement (self):
            " Can use begin statement as well as implicit begin? "
            self.cur.execute ("begin")
            self.assertTrue (self.con.in_transaction)
            self.cur.execute ("insert into t values (?)", ("dsdkf",))
            self.assertTrue (self.con.in_transaction)
            self.con.commit ()
            self.assertFalse (self.con.in_transaction)

        def j050CommitStatement (self):
            " Can use commit statement as well as method (apswdbapi2 feature)? "
            self.cur.execute ("insert into t values (?)", ("dsdkf",))
            self.assertTrue (self.con.in_transaction)
            self.cur.execute ("commit")
            self.assertFalse (self.con.in_transaction)

        def j060CommitStatement (self):
            " Autocommit is reset after COMMIT or ROLLBACK (apswdbapi2 feature)? "
            for s in (
                    "rOlLbACK",
                    "cOMMIT",
                    "End",
                    ):
                self.con.execute ("INSERT INTO t (c) VALUES (22)")
                self.con.execute (s)
                self.assertFalse (self.con.in_transaction,
                        "still in trans. after: " + s)


class UnsupportedFeatures (TestCase):
    def setUp (self):
        self.con = sqlite.Connection (TESTDB)
        self.cur = self.con.cursor()
    def tearDown(self):
        try: self.cur.close ()
        except: pass
        try: self.con.close ()
        except: pass

    def j010UnsupportedErr1 (self):
        " Class instances can not be returned over network? "
        self.assertRaises (
                sqlite.PicklingError,
                self.con._do,
                "__getattribute__", "_pause"
                )

    def j030UnsupportedErr3 (self):
        " Functions can not be passed over the network? "
        self.assertRaises (
                sqlite.PicklingError,
                self.con._do,
                "create_function", "f", myfunction,
                )

    def j040UnsupportedErr4 (self):
        " A class-instance can not be passed over the network? "
        class C:
            " unmarshallable "
        self.assertRaises (
                sqlite.PicklingError,
                self.con.execute,
                C (),
                )

class AttachFeatures (TestCase):
    def tearDown (self):
        try: self.cur.close ()
        except: pass
        try: self.con.close ()
        except: pass

    def j040AttachError (self):
        " Attach database outside server home NOT allowed? "
        self.con = sqlite.connect (TESTDB)
        self.cur = self.con.cursor ()
        fname = os.path.join ("..", "attach.db")
        self.assertRaises (
                sqlite.Error,
                self.cur.execute,
                "attach '%s' as att1" % fname,
                )

class AddressFeatures (TestCase):
    def tearDown (self):
        try: self.con.close ()
        except: pass
        try: self.con2.close ()
        except: pass
        reset_frontend ()

    def j010SetAndReset (self):
        " Address is used for connect? "
        reset_frontend (host="localhost")
        sqlite.Connection (TESTDB).close ()
        reset_frontend (host="wronghost")
        self.failUnlessRaises (
                sqlite.ServerNotUp,
                sqlite.Connection,
                TESTDB,
                )

    def j020GetAddress (self):
        " Address can be seen? "
        self.failUnlessEqual (
                (sqlite.host, sqlite.port),
                ("127.0.0.1", TESTPORT),
                )

    def j030AfterConnect (self):
        " Address change does not affect exesting connection? "
        self.con = sqlite.connect (TESTDB)
        reset_frontend (
                host="127.0.0.12",
                port=TESTPORT + 2
                )
        self.con.execute ("select 1234")
        self.con.interrupt () # can still interrupt?

    def j040BeforeConnect (self):
        " Address change affects new connection? "
        reset_frontend (port=TESTPORT + 3)
        self.failUnlessRaises (
                sqlite.ServerNotUp,
                sqlite.Connection,
                TESTDB,
                )

    def j050SetBeforeStart (self):
        " Port can be set before start? "
        reset_frontend (port=TESTPORT + 1)
        restart_backend () # 2nd back-end
        try:
            self.con = sqlite.Connection (TESTDB)
            self.assertEqual (
                    sqlite.Connection ()._do ("modget", "port"),
                    TESTPORT + 1,
                    )
        finally:
            sqlite.stop () # 2nd back-end

    def j060MultipleBackends (self):
        " connect to two back-ends at different addresses? "
        self.con = sqlite.connect (TESTDB)
        reset_frontend (port=TESTPORT + 1)
        restart_backend () # 2nd back-end
        self.con2 = sqlite.Connection (TESTDB)
        try:
            self.assertEqual (
                    list (self.con.execute ("select 444")),
                    list (self.con2.execute ("select 444")),
                    )
        finally:
            sqlite.stop () # 2nd back-end

    def j100UriFilename (self):
        " Network location can be given as part of the filename? "
        self.con = sqlite.connect (
                2 * os.path.sep + "127.0.0.1" + os.path.sep + TESTDB)
        self.assertEqual (
                self.con.filename,
                TESTDB,
                )

    def j110UriWrongFilename (self):
        " Wrong network location can be given as part of the filename? "
        self.failUnlessRaises (
                sqlite.ServerNotUp,
                sqlite.Connection,
                2 * os.path.sep + "wronghost" + os.path.sep + TESTDB,
                )


class SharedCacheFeatures (TestCase):
    def tearDown (self):
        reset_frontend ()
        restart_backend ()

    def check_cachesharing (self):
        con = sqlite.connect (TESTDB)
        con2 = sqlite.connect (TESTDB, timeout=0.)
        try:
            con.execute ("pragma cache_size=200") # spill to disk soon
            con.execute ("insert into tbig select * from tbig")
            try:
                con2.execute ("select c2 from t2")
            except sqlite.OperationalError:
                return False # database locked
            else:
                return True
        finally:
            con.close ()
            con2.close ()

    def j010InitiallyDisabled (self):
        " Cache sharing is initially not enabled? "
        self.assertFalse (self.check_cachesharing())

    def j020SetCacheSharing (self):
        " Can not change setting through the front-end "
        self.failUnlessRaises (
                sqlite.Error,
                sqlite.enable_shared_cache,
                None,
                )

    def j025ConfCacheSharing (self):
        " Cache sharing can be enabled using conf-file? "
        reset_frontend (cachesharing = True)
        restart_backend ()
        self.assertTrue (self.check_cachesharing())

    def j030LockUncommitted (self):
        " Pragma read_uncommitted is initially not set? "
        self.assertEqual (
                sqlite.Connection (TESTDB).execute (
                    "pragma read_uncommitted"
                    ).fetchone (),
                    (0,)
                    )

    def j040ConfReadUncommitted (self):
        " Pragma read_uncommitted can be configured as default? "
        reset_frontend (
                cachesharing = True,
                initsql = "PRAGMA read_uncommitted=1",
                )
        restart_backend ()
        self.assertEqual (
                sqlite.Connection (TESTDB).execute (
                    "pragma read_uncommitted"
                    ).fetchone (),
                    (1,)
                    )


class MiscFeatures (TestCase):

    def j010HomeCheck (self):
        " Can not open a database outside home directory? "
        fname = os.path.join ("..", TESTDB)
        self.assertRaises (
                sqlite.Error,
                sqlite.connect,
                fname,
                )

    def j030StopBackend (self):
        " Stop back-end interrupts existing connections? "
        t0 = time.time ()
        thr = Testthread (heavy_query)
        try:
            sqlite.stop ()
            result = thr.join ().get_result ()
            if "interrupt" not in result and "ServerNotUp" not in result:
                self.fail ("not interrupted: " + result)
        finally:
            sqlite.start ()

    def j040DatabaseDump(self):
        " Is the dumped script identical to the creation script? "
        cx = sqlite.connect(":memory:", isolation_level=None)
        expected_sqls = """
-- SQLite dump (by APSW 3.7.8-r1)
-- SQLite version 3.7.9
-- Date: Sat Dec 17 21:12:32 2011
-- Tables like: (All)
-- Database: :memory:
-- User: ed @ mac-mini-van-edzard-pasma.local

-- The values of various per-database settings
-- PRAGMA page_size=1024;
-- PRAGMA encoding='UTF-8';
-- PRAGMA auto_vacuum=NONE;
-- PRAGMA max_page_count=1073741823;

-- This pragma turns off checking of foreign keys as tables would be
-- inconsistent while restoring.  It was introduced in SQLite 3.6.19.
PRAGMA foreign_keys=OFF;

BEGIN TRANSACTION;

-- Table  t1
DROP TABLE IF EXISTS t1;
CREATE TABLE t1(id integer primary key, s1 text, t1_i1 integer not null, i2 integer, unique (s1), constraint t1_idx1 unique (i2));
INSERT INTO t1 VALUES(1,'foo',10,20);
INSERT INTO t1 VALUES(2,'foo2',30,30);
"""
        if sys.version >= "3": # Python3
            expected_sqls += """
-- insert BLOB
INSERT INTO t1 VALUES(3,X'616263FFFE',40,40);
"""
        expected_sqls += """
-- Triggers and indices on  t1
CREATE TRIGGER trigger_1 update of t1_i1 on t1 begin update t2 set t2_i1 = new.t1_i1 where t2_i1 = old.t1_i1; end;

-- Table  t2
DROP TABLE IF EXISTS t2;
CREATE TABLE t2(id integer, t2_i1 integer, t2_i2 integer, primary key (id),foreign key(t2_i1) references t1(t1_i1));

-- Views
DROP VIEW IF EXISTS v1;
CREATE VIEW v1 as select * from t1 left join t2 using (id);

COMMIT TRANSACTION;

-- Restoring foreign key checking back on.  Note that SQLite 3.6.19 is off by
-- default
PRAGMA foreign_keys=ON;
"""
        list (map (cx.execute, expected_sqls.split ("\n")))
        with open ('dump.tmp', 'w') as f:
            cx.dump (f)
        with open ('dump.tmp', 'r') as f:
            actual_sqls = f.read()
        cx.close ()
        self.assertEqual (
            list (i for i in expected_sqls.split("\n") if i.strip () and not i.startswith ("--")),
            list (i for i in actual_sqls.split("\n") if i.strip () and not i.startswith ("--")),
            )

class RecyclingInternals (TestCase):
    def setUp (self):
        self.timeout = random.random () # unique
        self.con = sqlite.connect (TESTDB, timeout=self.timeout)
        self.name0 = self.con.name
    def tearDown (self):
        try: self.con.close ()
        except: pass
        try: self.con2.close ()
        except: pass
        if sqlite.connect ()._do ("modget", "initsql"):
            reset_frontend ()
            restart_backend ()

    def j005UniqueName (self):
        " Each open connection has a unique name? "
        self.con2 = sqlite.connect (TESTDB, timeout=self.timeout)
        self.assertNotEqual (
                self.con2.name,
                self.name0,
                )

    def j010Recycled (self):
        " SQLite connection is reused at DBAPI level? "
        self.con.close ()
        quicksleep ()
        self.con = sqlite.connect (TESTDB, timeout=self.timeout)
        self.assertEqual (
                self.con.name,
                self.name0,
                )

    def j020RecycledTwice (self):
        " SQLite connection is reused more then once? "
        self.con.close ()
        self.con = sqlite.connect (TESTDB, timeout=self.timeout)
        self.con.close ()
        quicksleep ()
        self.con = sqlite.connect (TESTDB, timeout=self.timeout)
        self.assertEqual (
                self.con.name,
                self.name0,
                )

    def j030NotRecycled1 (self):
        " Database name must match? "
        self.con.close ()
        quicksleep ()
        fname = os.path.join (SUBDIR, "othername")
        self.con = sqlite.connect (fname, timeout=self.timeout)
        self.assertNotEqual (
                self.con.name,
                self.name0,
                )

    def j040NotRecycled2 (self):
        " Connect keyword arguments must match? "
        self.con.close ()
        quicksleep ()
        self.con = sqlite.connect (TESTDB, timeout=self.timeout, isolation_level=None)
        self.assertNotEqual (
                self.con.name,
                self.name0,
                )

    def j060Recyclable (self):
        " Connection is still recyclable after DML? "
        cur = self.con.cursor ()
        cur.execute ("DROP TABLE IF EXISTS t3")
        cur.execute ("CREATE TABLE t3(c3)")
        cur.execute ("INSERT INTO t3 values (9837)")
        cur.execute ("SELECT c3 FROM t3").fetchall ()
        cur.execute ("PRAGMA page_size").fetchall ()
        cur.close ()
        self.con.close ()
        quicksleep ()
        self.con = sqlite.connect (TESTDB, timeout=self.timeout)
        self.assertEqual (
                self.con.name,
                self.name0,
                )

    def j080Pragma (self):
        " Pragmatized connection is not recycled? "
        self.con.execute ("PRAGMA case_sensitive_like=1")
        self.con.close ()
        quicksleep ()
        self.con = sqlite.connect (TESTDB, timeout=self.timeout)
        self.assertNotEqual (
                self.con.name,
                self.name0,
                )

    def j085InitPragma (self):
        " Connection is still recyclable if pragma set via initsql? "
        self.tearDown ()
        reset_frontend (initsql = "PRAGMA case_sensitive_like=1")
        restart_backend ()
        self.setUp ()
        self.con.close ()
        quicksleep ()
        self.con = sqlite.connect (TESTDB, timeout=self.timeout)
        self.assertEqual (
                self.con.name,
                self.name0,
                )

    def i090ExecuteScript (self):
        " Connection is still recyclable after executescript? "
        self.con.executescript ("SELECT 1; SELECT 2;")
        self.con.close ()
        quicksleep ()
        self.con = sqlite.connect (TESTDB, timeout=self.timeout)
        self.assertEqual (
                self.con.name,
                self.name0,
                )

    def i100ExecuteScript (self):
        " Connection is no longer recyclable after executescript? "
        self.con.executescript ("SELECT 1; PRAGMA case_sensitive_like=1")
        self.con.close ()
        quicksleep ()
        self.con = sqlite.connect (TESTDB, timeout=self.timeout)
        self.assertNotEqual (
                self.con.name,
                self.name0,
                )

    def j110Attach (self):
        " Connection is unrecyclable after attaching a database "
        self.con.execute ("attach 'attachdb' as att1")
        self.con.close ()
        quicksleep ()
        self.con = sqlite.connect (TESTDB, timeout=self.timeout)
        self.assertNotEqual (
                self.con.name,
                self.name0,
                )

    def j120MemRecyclable (self):
        " Memory-connection are recycled if no DDL is executed? "
        self.con.close ()
        self.con = sqlite.connect (":memory:", timeout=self.timeout)
        self.name0 = self.con.name
        self.con.close ()
        quicksleep ()
        self.con = sqlite.connect (":memory:", timeout=self.timeout)
        self.assertEqual (
                self.con.name,
                self.name0,
                )

    def j121MemNotRecyclable (self):
        " Memory-connection are not longer recycled after DDL? "
        self.con.close ()
        self.con = sqlite.connect ("", timeout=self.timeout)
        self.name0 = self.con.name
        self.con.execute ("create table tmp (c)")
        self.con.close ()
        quicksleep ()
        self.con = sqlite.connect ("", timeout=self.timeout)
        self.assertNotEqual (
                self.con.name,
                self.name0,
                )

    def j125ForceRecycling (self):
        " An unrecyclable connection can be forced to be recycled? "
        self.con.close ()
        self.con = sqlite.connect (":memory:", timeout=self.timeout)
        self.con._do ("__setattr__", "_unrecyclable", False,)
        self.name0 = self.con.name
        self.con.close ()
        quicksleep ()
        self.con = sqlite.connect (":memory:", timeout=self.timeout)
        self.assertEqual (
                self.con.name,
                self.name0,
                )

    def j135DoubleBooked (self):
        " Not recycled to two connections at the same time? "
        self.con.close ()
        quicksleep ()
        self.con = sqlite.connect (TESTDB, timeout=self.timeout)
        self.con2 = sqlite.connect (TESTDB, timeout=self.timeout)
        self.assertNotEqual (
                self.con2.name,
                self.con.name,
                )

    def j150Rollbacked (self):
        " Any changes are rolled back before recycling? "
        t0 = self.con.execute ("SELECT COUNT (*) FROM t WHERE c = 456").fetchone ()
        self.con.execute ("INSERT INTO t VALUES (456)")
        self.con.close ()
        quicksleep ()
        self.con = sqlite.connect (TESTDB, timeout=self.timeout)
        # must have matched
        self.assertEqual (
                self.con.name,
                self.name0,
                )
        # must been rolled back
        self.assertEqual (
                self.con.execute ("SELECT COUNT (*) FROM t WHERE c = 456").fetchone (),
                t0,
                )
        if apswdbapi2 or sys.version >= '3.2' : # Pysqlite in_transaction since 3.2
            self.assertFalse (self.con.in_transaction)

    def j160RecycledBetweenProcesses (self):
        " A connection can be recycled between different processes? "
        self.con.close ()
        fname = os.path.join (SUBDIR, "db" + str (random.randrange(10, 99)))
        # connect like shell (isolation_level None)
        self.con = sqlite.connect (fname, isolation_level=None)
        self.name0 = self.con.name
        self.con.close ()
        quicksleep ()
        prc = Testproc (
                "SELECT 93260; .sleep 500",
                fname = fname,
                )
        self.con = sqlite.connect (fname, isolation_level=None)
        self.assertNotEqual (
                self.con.name,
                self.name0,
                )
        prc.wait ()
        self.con2 = sqlite.connect (fname, isolation_level=None)
        self.assertEqual (
                self.con2.name,
                self.name0,
                )

    def j170NoLongerRecyclable (self):
        " No longer recyclable if changing isolation_level? "
        try:
            self.con.isolation_level = None
        except AttributeError:
            pass # OK, read-only attribute
        else:
            self.con.close ()
            quicksleep ()
            self.con = sqlite.connect (TESTDB, timeout=self.timeout)
            self.assertNotEqual (
                    self.con.name,
                    self.name0,
                    )

    if apswdbapi2:
        def j180StillChange (self):
            " Can change timeout after connect (APSW feature)? "
            self.con.setbusytimeout (551)
            self.con.close ()
            quicksleep ()
            self.con = sqlite.connect (TESTDB, timeout=self.timeout)
            self.assertNotEqual (
                    self.con.name,
                    self.name0,
                    )

    def j210MaxPoolSize1 (self):
        " The pool will not hold to many different databases? "
        for i in range (10):
            sqlite.connect ("tmp_test_%i.db" % i).close ()
            time.sleep (0.1)
        # count pooled conneection threads 
        self.assertLessEqual (
                len (self.con._do ("modstatus", True)),
                3,
                )

    def j220MaxPoolSize2 (self):
        " The pool may grow after many simultaneous connections ? "
        cons = []
        for i in range (10):
            cons.append (sqlite.connect (TESTDB))
        for con in cons:
            con.close ()
        for i in range (10):
            sqlite.connect (TESTDB).close ()
        # count pooled connection threads 
        self.assertGreaterEqual (
                len (tuple (i for i in self.con._do ("modstatus", True).values () if i[0] == TESTDB and not i[1])),
                5,
                )


class DeallocInternals (TestCase):
    """
    What happens if sqmediumlite objects are de-allocated.
    """
    def setUp (self):
        self.con0 = sqlite.Connection ()
        self.n0 = self.ncons ()
    def tearDown(self):
        self.con0.close ()
        try: self.cur.close ()
        except: pass
        try: self.con.close ()
        except: pass
        try: self.proc2.kill ()
        except AttributeError: pass

    def ncons (self):
        " Count connected connection-threads "
        quicksleep ()
        d = self.con0._do ("modstatus", False)
        return len (d)

    def j010CloseConnection (self):
        " Connection closed when manually closed? "
        self.con = sqlite.Connection (TESTDB)
        self.cur = sqlite.Cursor (self.con)
        self.cur.execute ("select 5341614")
        self.assertEqual (
                self.ncons (),
                self.n0 + 1)
        self.cur.close ()
        self.con.close ()
        self.assertEqual (
                self.ncons (),
                self.n0)

    def j020DelConnection (self):
        " Connection closed when deallocated? "
        self.con = sqlite.Connection (TESTDB)
        self.cur = sqlite.Cursor (self.con)
        self.cur.execute ("select 898931")
        self.assertEqual (
                self.ncons (),
                self.n0 + 1, )
        self.cur = None
        self.con = None
        self.assertEqual (
                self.ncons (),
                self.n0, )

    def j030DelModule (self):
        " Connection closed when process finished? "
        self.proc2 = Testproc (
                "SELECT 93260 FROM t; .sleep 1000",
                )
        self.assertEqual (
                self.ncons (),
                self.n0 + 1)
        self.proc2.wait ()
        self.assertEqual (
                self.ncons (),
                self.n0)

    def j050KillProcess (self):
        " Connection closed when process killed? "
        self.proc2 = Testproc (
                "SELECT 93270 FROM t; .sleep 1500",
                )
        self.assertEqual (
                self.ncons (),
                self.n0 + 1)
        self.proc2.terminate ()
        self.assertEqual (
                self.ncons (),
                self.n0)


class MiscInternals (TestCase):
    def setUp (self):
        self.con = sqlite.connect (TESTDB, check_same_thread=False)
        self.cur = self.con.cursor ()
    def tearDown (self):
        try: self.cur.close ()
        except: pass
        try: self.con.close ()
        except: pass

    def j020Prefetch (self):
        " All rows are prefetched upon execute? "
        n = self.cur.execute ("select count (*) from tbig").fetchone ()[0]
        self.cur.execute ("select 1872 from tbig")
        self.assertEqual (
                len (self.cur._rows),
                n,
                )

    def j030DoMethod (self):
        " Invalid _do parameter is handled? "
        self.assertRaises (
                AttributeError,
                self.con._do,
                "dgdfgkfag",
                )

    def j040Spam (self):
        " Back-end does not get upset after inproper connect? "
        Socket ().sconnect (("127.0.0.1", TESTPORT)).close ()
        self.assertEqual (
                list (self.con.execute ("select 453456")),
                [(453456,)],
                )

    if apswdbapi2:
        def j050ErrorMap (self):
            " Are all APSW exceptions mapped to DB-API ones? "
            for j in (getattr (apswdbapi2.apsw, i)
                    for i in dir (apswdbapi2.apsw)):
                if isinstance (j, type) and \
                        issubclass (j, apswdbapi2.apsw.Error):
                    self.assertTrue (
                            isinstance (apswdbapi2._maperr (j()), apswdbapi2.Error),
                            str (j),
                            )

    def j050SocketTimeout (self):
        " Socket timeout is set if and only if on Windows "
        self.assertEqual (
                sys.platform == "win32",
                socket.getdefaulttimeout() is not None,
                )


def main (verbosity=1):
    TextTestRunner (verbosity=verbosity).run (TestSuite ((
            makeSuite (Control , "init"),
            makeSuite (VersionCompat , "j"),
            makeSuite (AttributeCompat , "j"),
            makeSuite (apswAttributeCompat , "j"),
            makeSuite (InterruptCompat, "j"),
            makeSuite (SameThreadCompat, "j"),
            makeSuite (TimeoutCompat, "j"),
            makeSuite (TransactionCompat, "j"),
            makeSuite (ExecuteCompat, "j"),
            makeSuite (FetchCompat, "j"),
            makeSuite (apswFetchCompat, "j"),
            makeSuite (RowfactoryCompat, "j"),
            makeSuite (apswRowfactoryCompat, "j"),
            makeSuite (pysqliteRowfactoryCompat, "j"),
            makeSuite (AttachCompat, "j"),
            makeSuite (HeavyLoadCompat, "j"),
            makeSuite (MiscCompat, "j"),
            makeSuite (UnsupportedFeatures, "j"),
            makeSuite (AttachFeatures, "j"),
            makeSuite (AddressFeatures , "j"),
            makeSuite (SharedCacheFeatures, "j"),
            makeSuite (TransactionFeatures, "j"),
            makeSuite (MiscFeatures , "j"),
            makeSuite (RecyclingInternals, "j"),
            makeSuite (DeallocInternals, "j"),
            makeSuite (MiscInternals , "j"),
            makeSuite (Control , "exit"),
            )))

test = main

if __name__ == "__main__":
    main ()