diff options
Diffstat (limited to 'Lib/sqlite3')
-rw-r--r-- | Lib/sqlite3/test/dbapi.py | 51 | ||||
-rw-r--r-- | Lib/sqlite3/test/factory.py | 2 | ||||
-rw-r--r-- | Lib/sqlite3/test/hooks.py | 75 | ||||
-rw-r--r-- | Lib/sqlite3/test/regression.py | 62 | ||||
-rw-r--r-- | Lib/sqlite3/test/transactions.py | 19 | ||||
-rw-r--r-- | Lib/sqlite3/test/types.py | 15 |
6 files changed, 199 insertions, 25 deletions
diff --git a/Lib/sqlite3/test/dbapi.py b/Lib/sqlite3/test/dbapi.py index 6d4c4fe..8327aa1 100644 --- a/Lib/sqlite3/test/dbapi.py +++ b/Lib/sqlite3/test/dbapi.py @@ -1,7 +1,7 @@ #-*- coding: ISO-8859-1 -*- # pysqlite2/test/dbapi.py: tests for DB-API compliance # -# Copyright (C) 2004-2005 Gerhard Häring <gh@ghaering.de> +# Copyright (C) 2004-2007 Gerhard Häring <gh@ghaering.de> # # This file is part of pysqlite. # @@ -223,12 +223,41 @@ class CursorTests(unittest.TestCase): except sqlite.ProgrammingError: pass + def CheckExecuteParamList(self): + self.cu.execute("insert into test(name) values ('foo')") + self.cu.execute("select name from test where name=?", ["foo"]) + row = self.cu.fetchone() + self.failUnlessEqual(row[0], "foo") + + def CheckExecuteParamSequence(self): + class L(object): + def __len__(self): + return 1 + def __getitem__(self, x): + assert x == 0 + return "foo" + + self.cu.execute("insert into test(name) values ('foo')") + self.cu.execute("select name from test where name=?", L()) + row = self.cu.fetchone() + self.failUnlessEqual(row[0], "foo") + def CheckExecuteDictMapping(self): self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("select name from test where name=:name", {"name": "foo"}) row = self.cu.fetchone() self.failUnlessEqual(row[0], "foo") + def CheckExecuteDictMapping_Mapping(self): + class D(dict): + def __missing__(self, key): + return "foo" + + self.cu.execute("insert into test(name) values ('foo')") + self.cu.execute("select name from test where name=:name", D()) + row = self.cu.fetchone() + self.failUnlessEqual(row[0], "foo") + def CheckExecuteDictMappingTooLittleArgs(self): self.cu.execute("insert into test(name) values ('foo')") try: @@ -378,6 +407,12 @@ class CursorTests(unittest.TestCase): res = self.cu.fetchmany(100) self.failUnlessEqual(res, []) + def CheckFetchmanyKwArg(self): + """Checks if fetchmany works with keyword arguments""" + self.cu.execute("select name from test") + res = self.cu.fetchmany(size=100) + self.failUnlessEqual(len(res), 1) + def CheckFetchall(self): self.cu.execute("select name from test") res = self.cu.fetchall() @@ -609,20 +644,6 @@ class ExtensionTests(unittest.TestCase): res = cur.fetchone()[0] self.failUnlessEqual(res, 5) - def CheckScriptStringUnicode(self): - con = sqlite.connect(":memory:") - cur = con.cursor() - cur.executescript(""" - create table a(i); - insert into a(i) values (5); - select i from a; - delete from a; - insert into a(i) values (6); - """) - cur.execute("select i from a") - res = cur.fetchone()[0] - self.failUnlessEqual(res, 6) - def CheckScriptErrorIncomplete(self): con = sqlite.connect(":memory:") cur = con.cursor() diff --git a/Lib/sqlite3/test/factory.py b/Lib/sqlite3/test/factory.py index a9a828f..bc56caa 100644 --- a/Lib/sqlite3/test/factory.py +++ b/Lib/sqlite3/test/factory.py @@ -1,7 +1,7 @@ #-*- coding: ISO-8859-1 -*- # pysqlite2/test/factory.py: tests for the various factories in pysqlite # -# Copyright (C) 2005 Gerhard Häring <gh@ghaering.de> +# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de> # # This file is part of pysqlite. # diff --git a/Lib/sqlite3/test/hooks.py b/Lib/sqlite3/test/hooks.py index 28f2404..6872fd6 100644 --- a/Lib/sqlite3/test/hooks.py +++ b/Lib/sqlite3/test/hooks.py @@ -1,7 +1,7 @@ #-*- coding: ISO-8859-1 -*- # pysqlite2/test/hooks.py: tests for various SQLite-specific hooks # -# Copyright (C) 2006 Gerhard Häring <gh@ghaering.de> +# Copyright (C) 2006-2007 Gerhard Häring <gh@ghaering.de> # # This file is part of pysqlite. # @@ -105,9 +105,80 @@ class CollationTests(unittest.TestCase): if not e.args[0].startswith("no such collation sequence"): self.fail("wrong OperationalError raised") +class ProgressTests(unittest.TestCase): + def CheckProgressHandlerUsed(self): + """ + Test that the progress handler is invoked once it is set. + """ + con = sqlite.connect(":memory:") + progress_calls = [] + def progress(): + progress_calls.append(None) + return 0 + con.set_progress_handler(progress, 1) + con.execute(""" + create table foo(a, b) + """) + self.failUnless(progress_calls) + + + def CheckOpcodeCount(self): + """ + Test that the opcode argument is respected. + """ + con = sqlite.connect(":memory:") + progress_calls = [] + def progress(): + progress_calls.append(None) + return 0 + con.set_progress_handler(progress, 1) + curs = con.cursor() + curs.execute(""" + create table foo (a, b) + """) + first_count = len(progress_calls) + progress_calls = [] + con.set_progress_handler(progress, 2) + curs.execute(""" + create table bar (a, b) + """) + second_count = len(progress_calls) + self.failUnless(first_count > second_count) + + def CheckCancelOperation(self): + """ + Test that returning a non-zero value stops the operation in progress. + """ + con = sqlite.connect(":memory:") + progress_calls = [] + def progress(): + progress_calls.append(None) + return 1 + con.set_progress_handler(progress, 1) + curs = con.cursor() + self.assertRaises( + sqlite.OperationalError, + curs.execute, + "create table bar (a, b)") + + def CheckClearHandler(self): + """ + Test that setting the progress handler to None clears the previously set handler. + """ + con = sqlite.connect(":memory:") + action = 0 + def progress(): + action = 1 + return 0 + con.set_progress_handler(progress, 1) + con.set_progress_handler(None, 1) + con.execute("select 1 union select 2 union select 3").fetchall() + self.failUnlessEqual(action, 0, "progress handler was not cleared") + def suite(): collation_suite = unittest.makeSuite(CollationTests, "Check") - return unittest.TestSuite((collation_suite,)) + progress_suite = unittest.makeSuite(ProgressTests, "Check") + return unittest.TestSuite((collation_suite, progress_suite)) def test(): runner = unittest.TextTestRunner() diff --git a/Lib/sqlite3/test/regression.py b/Lib/sqlite3/test/regression.py index 4a68d9d..5e89a6c 100644 --- a/Lib/sqlite3/test/regression.py +++ b/Lib/sqlite3/test/regression.py @@ -21,6 +21,7 @@ # misrepresented as being the original software. # 3. This notice may not be removed or altered from any source distribution. +import datetime import unittest import sqlite3 as sqlite @@ -79,6 +80,67 @@ class RegressionTests(unittest.TestCase): cur.fetchone() cur.fetchone() + def CheckStatementFinalizationOnCloseDb(self): + # pysqlite versions <= 2.3.3 only finalized statements in the statement + # cache when closing the database. statements that were still + # referenced in cursors weren't closed an could provoke " + # "OperationalError: Unable to close due to unfinalised statements". + con = sqlite.connect(":memory:") + cursors = [] + # default statement cache size is 100 + for i in range(105): + cur = con.cursor() + cursors.append(cur) + cur.execute("select 1 x union select " + str(i)) + con.close() + + def CheckOnConflictRollback(self): + if sqlite.sqlite_version_info < (3, 2, 2): + return + con = sqlite.connect(":memory:") + con.execute("create table foo(x, unique(x) on conflict rollback)") + con.execute("insert into foo(x) values (1)") + try: + con.execute("insert into foo(x) values (1)") + except sqlite.DatabaseError: + pass + con.execute("insert into foo(x) values (2)") + try: + con.commit() + except sqlite.OperationalError: + self.fail("pysqlite knew nothing about the implicit ROLLBACK") + + def CheckWorkaroundForBuggySqliteTransferBindings(self): + """ + pysqlite would crash with older SQLite versions unless + a workaround is implemented. + """ + self.con.execute("create table foo(bar)") + self.con.execute("drop table foo") + self.con.execute("create table foo(bar)") + + def CheckEmptyStatement(self): + """ + pysqlite used to segfault with SQLite versions 3.5.x. These return NULL + for "no-operation" statements + """ + self.con.execute("") + + def CheckTypeMapUsage(self): + """ + pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling + a statement. This test exhibits the problem. + """ + SELECT = "select * from foo" + con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) + con.execute("create table foo(bar timestamp)") + con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) + con.execute(SELECT) + con.execute("drop table foo") + con.execute("create table foo(bar integer)") + con.execute("insert into foo(bar) values (5)") + con.execute(SELECT) + def CheckErrorMsgDecodeError(self): # When porting the module to Python 3.0, the error message about # decoding errors disappeared. This verifies they're back again. diff --git a/Lib/sqlite3/test/transactions.py b/Lib/sqlite3/test/transactions.py index 9edc4ac..da5bd21 100644 --- a/Lib/sqlite3/test/transactions.py +++ b/Lib/sqlite3/test/transactions.py @@ -1,7 +1,7 @@ #-*- coding: ISO-8859-1 -*- # pysqlite2/test/transactions.py: tests transactions # -# Copyright (C) 2005 Gerhard Häring <gh@ghaering.de> +# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de> # # This file is part of pysqlite. # @@ -122,6 +122,23 @@ class TransactionTests(unittest.TestCase): except: self.fail("should have raised an OperationalError") + def CheckLocking(self): + """ + This tests the improved concurrency with pysqlite 2.3.4. You needed + to roll back con2 before you could commit con1. + """ + self.cur1.execute("create table test(i)") + self.cur1.execute("insert into test(i) values (5)") + try: + self.cur2.execute("insert into test(i) values (5)") + self.fail("should have raised an OperationalError") + except sqlite.OperationalError: + pass + except: + self.fail("should have raised an OperationalError") + # NO self.con2.rollback() HERE!!! + self.con1.commit() + class SpecialCommandTests(unittest.TestCase): def setUp(self): self.con = sqlite.connect(":memory:") diff --git a/Lib/sqlite3/test/types.py b/Lib/sqlite3/test/types.py index 46bed7d..ce740a5 100644 --- a/Lib/sqlite3/test/types.py +++ b/Lib/sqlite3/test/types.py @@ -21,7 +21,7 @@ # misrepresented as being the original software. # 3. This notice may not be removed or altered from any source distribution. -import bz2, datetime +import zlib, datetime import unittest import sqlite3 as sqlite @@ -221,11 +221,13 @@ class ColNamesTests(unittest.TestCase): self.cur = self.con.cursor() self.cur.execute("create table test(x foo)") - sqlite.converters["BAR"] = lambda x: b"<" + x + b">" + sqlite.converters["FOO"] = lambda x: "[%s]" % x.decode("ascii") + sqlite.converters["BAR"] = lambda x: "<%s>" % x.decode("ascii") sqlite.converters["EXC"] = lambda x: 5/0 sqlite.converters["B1B1"] = lambda x: "MARKER" def tearDown(self): + del sqlite.converters["FOO"] del sqlite.converters["BAR"] del sqlite.converters["EXC"] del sqlite.converters["B1B1"] @@ -252,7 +254,7 @@ class ColNamesTests(unittest.TestCase): self.cur.execute("insert into test(x) values (?)", ("xxx",)) self.cur.execute('select x as "x [bar]" from test') val = self.cur.fetchone()[0] - self.failUnlessEqual(val, b"<xxx>") + self.failUnlessEqual(val, "<xxx>") # Check if the stripping of colnames works. Everything after the first # whitespace should be stripped. @@ -297,7 +299,7 @@ class ObjectAdaptationTests(unittest.TestCase): class BinaryConverterTests(unittest.TestCase): def convert(s): - return bz2.decompress(s) + return zlib.decompress(s) convert = staticmethod(convert) def setUp(self): @@ -309,7 +311,7 @@ class BinaryConverterTests(unittest.TestCase): def CheckBinaryInputForConverter(self): testdata = b"abcdefg" * 10 - result = self.con.execute('select ? as "x [bin]"', (memoryview(bz2.compress(testdata)),)).fetchone()[0] + result = self.con.execute('select ? as "x [bin]"', (memoryview(zlib.compress(testdata)),)).fetchone()[0] self.failUnlessEqual(testdata, result) class DateTimeTests(unittest.TestCase): @@ -341,7 +343,8 @@ class DateTimeTests(unittest.TestCase): if sqlite.sqlite_version_info < (3, 1): return - now = datetime.datetime.utcnow() + # SQLite's current_timestamp uses UTC time, while datetime.datetime.now() uses local time. + now = datetime.datetime.now() self.cur.execute("insert into test(ts) values (current_timestamp)") self.cur.execute("select ts from test") ts = self.cur.fetchone()[0] |