diff options
Diffstat (limited to 'Lib/sqlite3/test/dbapi.py')
-rw-r--r-- | Lib/sqlite3/test/dbapi.py | 442 |
1 files changed, 194 insertions, 248 deletions
diff --git a/Lib/sqlite3/test/dbapi.py b/Lib/sqlite3/test/dbapi.py index be11337..615ebf5 100644 --- a/Lib/sqlite3/test/dbapi.py +++ b/Lib/sqlite3/test/dbapi.py @@ -1,4 +1,4 @@ -#-*- coding: iso-8859-1 -*- +#-*- coding: ISO-8859-1 -*- # pysqlite2/test/dbapi.py: tests for DB-API compliance # # Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de> @@ -21,12 +21,14 @@ # misrepresented as being the original software. # 3. This notice may not be removed or altered from any source distribution. -import threading import unittest +import sys import sqlite3 as sqlite - -from test.support import TESTFN, unlink - +from test import test_support +try: + import threading +except ImportError: + threading = None class ModuleTests(unittest.TestCase): def CheckAPILevel(self): @@ -43,12 +45,12 @@ class ModuleTests(unittest.TestCase): sqlite.paramstyle) def CheckWarning(self): - self.assertTrue(issubclass(sqlite.Warning, Exception), - "Warning is not a subclass of Exception") + self.assertTrue(issubclass(sqlite.Warning, StandardError), + "Warning is not a subclass of StandardError") def CheckError(self): - self.assertTrue(issubclass(sqlite.Error, Exception), - "Error is not a subclass of Exception") + self.assertTrue(issubclass(sqlite.Error, StandardError), + "Error is not a subclass of StandardError") def CheckInterfaceError(self): self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error), @@ -84,7 +86,6 @@ class ModuleTests(unittest.TestCase): "NotSupportedError is not a subclass of DatabaseError") class ConnectionTests(unittest.TestCase): - def setUp(self): self.cx = sqlite.connect(":memory:") cu = self.cx.cursor() @@ -119,8 +120,11 @@ class ConnectionTests(unittest.TestCase): def CheckFailedOpen(self): YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" - with self.assertRaises(sqlite.OperationalError): + try: con = sqlite.connect(YOU_CANNOT_OPEN_THIS) + except sqlite.OperationalError: + return + self.fail("should have raised an OperationalError") def CheckClose(self): self.cx.close() @@ -138,68 +142,11 @@ class ConnectionTests(unittest.TestCase): self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) - def CheckInTransaction(self): - # Can't use db from setUp because we want to test initial state. - cx = sqlite.connect(":memory:") - cu = cx.cursor() - self.assertEqual(cx.in_transaction, False) - cu.execute("create table transactiontest(id integer primary key, name text)") - self.assertEqual(cx.in_transaction, False) - cu.execute("insert into transactiontest(name) values (?)", ("foo",)) - self.assertEqual(cx.in_transaction, True) - cu.execute("select name from transactiontest where name=?", ["foo"]) - row = cu.fetchone() - self.assertEqual(cx.in_transaction, True) - cx.commit() - self.assertEqual(cx.in_transaction, False) - cu.execute("select name from transactiontest where name=?", ["foo"]) - row = cu.fetchone() - self.assertEqual(cx.in_transaction, False) - - def CheckInTransactionRO(self): - with self.assertRaises(AttributeError): - self.cx.in_transaction = True - - def CheckOpenWithPathLikeObject(self): - """ Checks that we can successfully connect to a database using an object that - is PathLike, i.e. has __fspath__(). """ - self.addCleanup(unlink, TESTFN) - class Path: - def __fspath__(self): - return TESTFN - path = Path() - with sqlite.connect(path) as cx: - cx.execute('create table test(id integer)') - - def CheckOpenUri(self): - if sqlite.sqlite_version_info < (3, 7, 7): - with self.assertRaises(sqlite.NotSupportedError): - sqlite.connect(':memory:', uri=True) - return - self.addCleanup(unlink, TESTFN) - with sqlite.connect(TESTFN) as cx: - cx.execute('create table test(id integer)') - with sqlite.connect('file:' + TESTFN, uri=True) as cx: - cx.execute('insert into test(id) values(0)') - with sqlite.connect('file:' + TESTFN + '?mode=ro', uri=True) as cx: - with self.assertRaises(sqlite.OperationalError): - cx.execute('insert into test(id) values(1)') - - @unittest.skipIf(sqlite.sqlite_version_info >= (3, 3, 1), - 'needs sqlite versions older than 3.3.1') - def CheckSameThreadErrorOnOldVersion(self): - with self.assertRaises(sqlite.NotSupportedError) as cm: - sqlite.connect(':memory:', check_same_thread=False) - self.assertEqual(str(cm.exception), 'shared connections not available') - class CursorTests(unittest.TestCase): def setUp(self): self.cx = sqlite.connect(":memory:") self.cu = self.cx.cursor() - self.cu.execute( - "create table test(id integer primary key, name text, " - "income number, unique_test text unique)" - ) + self.cu.execute("create table test(id integer primary key, name text, income number)") self.cu.execute("insert into test(name) values (?)", ("foo",)) def tearDown(self): @@ -210,12 +157,22 @@ class CursorTests(unittest.TestCase): self.cu.execute("delete from test") def CheckExecuteIllegalSql(self): - with self.assertRaises(sqlite.OperationalError): + try: self.cu.execute("select asdf") + self.fail("should have raised an OperationalError") + except sqlite.OperationalError: + return + except: + self.fail("raised wrong exception") def CheckExecuteTooMuchSql(self): - with self.assertRaises(sqlite.Warning): + try: self.cu.execute("select 5+4; select 4+5") + self.fail("should have raised a Warning") + except sqlite.Warning: + return + except: + self.fail("raised wrong exception") def CheckExecuteTooMuchSql2(self): self.cu.execute("select 5+4; -- foo bar") @@ -230,8 +187,13 @@ class CursorTests(unittest.TestCase): """) def CheckExecuteWrongSqlArg(self): - with self.assertRaises(TypeError): + try: self.cu.execute(42) + self.fail("should have raised a ValueError") + except ValueError: + return + except: + self.fail("raised wrong exception.") def CheckExecuteArgInt(self): self.cu.execute("insert into test(id) values (?)", (42,)) @@ -249,25 +211,29 @@ class CursorTests(unittest.TestCase): row = self.cu.fetchone() self.assertEqual(row[0], "Hu\x00go") - def CheckExecuteNonIterable(self): - with self.assertRaises(ValueError) as cm: - self.cu.execute("insert into test(id) values (?)", 42) - self.assertEqual(str(cm.exception), 'parameters are of unsupported type') - def CheckExecuteWrongNoOfArgs1(self): # too many parameters - with self.assertRaises(sqlite.ProgrammingError): + try: self.cu.execute("insert into test(id) values (?)", (17, "Egon")) + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass def CheckExecuteWrongNoOfArgs2(self): # too little parameters - with self.assertRaises(sqlite.ProgrammingError): + try: self.cu.execute("insert into test(id) values (?)") + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass def CheckExecuteWrongNoOfArgs3(self): # no parameters, parameters are needed - with self.assertRaises(sqlite.ProgrammingError): + try: self.cu.execute("insert into test(id) values (?)") + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass def CheckExecuteParamList(self): self.cu.execute("insert into test(name) values ('foo')") @@ -295,6 +261,10 @@ class CursorTests(unittest.TestCase): self.assertEqual(row[0], "foo") def CheckExecuteDictMapping_Mapping(self): + # Test only works with Python 2.5 or later + if sys.version_info < (2, 5, 0): + return + class D(dict): def __missing__(self, key): return "foo" @@ -306,18 +276,27 @@ class CursorTests(unittest.TestCase): def CheckExecuteDictMappingTooLittleArgs(self): self.cu.execute("insert into test(name) values ('foo')") - with self.assertRaises(sqlite.ProgrammingError): + try: self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass def CheckExecuteDictMappingNoArgs(self): self.cu.execute("insert into test(name) values ('foo')") - with self.assertRaises(sqlite.ProgrammingError): + try: self.cu.execute("select name from test where name=:name") + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass def CheckExecuteDictMappingUnnamed(self): self.cu.execute("insert into test(name) values ('foo')") - with self.assertRaises(sqlite.ProgrammingError): + try: self.cu.execute("select name from test where name=?", {"name": "foo"}) + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass def CheckClose(self): self.cu.close() @@ -346,7 +325,8 @@ class CursorTests(unittest.TestCase): def CheckTotalChanges(self): self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("insert into test(name) values ('foo')") - self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value') + if self.cx.total_changes < 2: + self.fail("total changes reported wrong value") # Checks for executemany: # Sequences are required by the DB-API, iterators @@ -360,7 +340,7 @@ class CursorTests(unittest.TestCase): def __init__(self): self.value = 5 - def __next__(self): + def next(self): if self.value == 10: raise StopIteration else: @@ -377,16 +357,32 @@ class CursorTests(unittest.TestCase): self.cu.executemany("insert into test(income) values (?)", mygen()) def CheckExecuteManyWrongSqlArg(self): - with self.assertRaises(TypeError): + try: self.cu.executemany(42, [(3,)]) + self.fail("should have raised a ValueError") + except ValueError: + return + except: + self.fail("raised wrong exception.") def CheckExecuteManySelect(self): - with self.assertRaises(sqlite.ProgrammingError): + try: self.cu.executemany("select ?", [(3,)]) + self.fail("should have raised a ProgrammingError") + except sqlite.ProgrammingError: + return + except: + self.fail("raised wrong exception.") def CheckExecuteManyNotIterable(self): - with self.assertRaises(TypeError): + try: self.cu.executemany("insert into test(income) values (?)", 42) + self.fail("should have raised a TypeError") + except TypeError: + return + except Exception, e: + print "raised", e.__class__ + self.fail("raised wrong exception.") def CheckFetchIter(self): # Optional DB-API extension. @@ -463,54 +459,24 @@ class CursorTests(unittest.TestCase): self.assertEqual(self.cu.connection, self.cx) def CheckWrongCursorCallable(self): - with self.assertRaises(TypeError): + try: def f(): pass cur = self.cx.cursor(f) + self.fail("should have raised a TypeError") + except TypeError: + return + self.fail("should have raised a ValueError") def CheckCursorWrongClass(self): class Foo: pass foo = Foo() - with self.assertRaises(TypeError): + try: cur = sqlite.Cursor(foo) + self.fail("should have raised a ValueError") + except TypeError: + pass - def CheckLastRowIDOnReplace(self): - """ - INSERT OR REPLACE and REPLACE INTO should produce the same behavior. - """ - sql = '{} INTO test(id, unique_test) VALUES (?, ?)' - for statement in ('INSERT OR REPLACE', 'REPLACE'): - with self.subTest(statement=statement): - self.cu.execute(sql.format(statement), (1, 'foo')) - self.assertEqual(self.cu.lastrowid, 1) - - def CheckLastRowIDOnIgnore(self): - self.cu.execute( - "insert or ignore into test(unique_test) values (?)", - ('test',)) - self.assertEqual(self.cu.lastrowid, 2) - self.cu.execute( - "insert or ignore into test(unique_test) values (?)", - ('test',)) - self.assertEqual(self.cu.lastrowid, 2) - - def CheckLastRowIDInsertOR(self): - results = [] - for statement in ('FAIL', 'ABORT', 'ROLLBACK'): - sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)' - with self.subTest(statement='INSERT OR {}'.format(statement)): - self.cu.execute(sql.format(statement), (statement,)) - results.append((statement, self.cu.lastrowid)) - with self.assertRaises(sqlite.IntegrityError): - self.cu.execute(sql.format(statement), (statement,)) - results.append((statement, self.cu.lastrowid)) - expected = [ - ('FAIL', 2), ('FAIL', 2), - ('ABORT', 3), ('ABORT', 3), - ('ROLLBACK', 4), ('ROLLBACK', 4), - ] - self.assertEqual(results, expected) - - +@unittest.skipUnless(threading, 'This test requires threading.') class ThreadTests(unittest.TestCase): def setUp(self): self.con = sqlite.connect(":memory:") @@ -688,7 +654,8 @@ class ConstructorTests(unittest.TestCase): ts = sqlite.TimestampFromTicks(42) def CheckBinary(self): - b = sqlite.Binary(b"\0'") + with test_support.check_py3k_warnings(): + b = sqlite.Binary(chr(0) + "'") class ExtensionTests(unittest.TestCase): def CheckScriptStringSql(self): @@ -704,24 +671,39 @@ class ExtensionTests(unittest.TestCase): res = cur.fetchone()[0] self.assertEqual(res, 5) + def CheckScriptStringUnicode(self): + con = sqlite.connect(":memory:") + cur = con.cursor() + cur.executescript(u""" + create table a(i); + insert into a(i) values (5); + select i from a; + delete from a; + insert into a(i) values (6); + """) + cur.execute("select i from a") + res = cur.fetchone()[0] + self.assertEqual(res, 6) + def CheckScriptSyntaxError(self): con = sqlite.connect(":memory:") cur = con.cursor() - with self.assertRaises(sqlite.OperationalError): + raised = False + try: cur.executescript("create table test(x); asdf; create table test2(x)") + except sqlite.OperationalError: + raised = True + self.assertEqual(raised, True, "should have raised an exception") def CheckScriptErrorNormal(self): con = sqlite.connect(":memory:") cur = con.cursor() - with self.assertRaises(sqlite.OperationalError): + raised = False + try: cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") - - def CheckCursorExecutescriptAsBytes(self): - con = sqlite.connect(":memory:") - cur = con.cursor() - with self.assertRaises(ValueError) as cm: - cur.executescript(b"create table test(foo); insert into test(foo) values (5);") - self.assertEqual(str(cm.exception), 'script argument must be unicode.') + except sqlite.OperationalError: + raised = True + self.assertEqual(raised, True, "should have raised an exception") def CheckConnectionExecute(self): con = sqlite.connect(":memory:") @@ -743,37 +725,68 @@ class ExtensionTests(unittest.TestCase): self.assertEqual(result, 5, "Basic test of Connection.executescript") class ClosedConTests(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + def CheckClosedConCursor(self): con = sqlite.connect(":memory:") con.close() - with self.assertRaises(sqlite.ProgrammingError): + try: cur = con.cursor() + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") def CheckClosedConCommit(self): con = sqlite.connect(":memory:") con.close() - with self.assertRaises(sqlite.ProgrammingError): + try: con.commit() + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") def CheckClosedConRollback(self): con = sqlite.connect(":memory:") con.close() - with self.assertRaises(sqlite.ProgrammingError): + try: con.rollback() + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") def CheckClosedCurExecute(self): con = sqlite.connect(":memory:") cur = con.cursor() con.close() - with self.assertRaises(sqlite.ProgrammingError): + try: cur.execute("select 4") + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") def CheckClosedCreateFunction(self): con = sqlite.connect(":memory:") con.close() def f(x): return 17 - with self.assertRaises(sqlite.ProgrammingError): + try: con.create_function("foo", 1, f) + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") def CheckClosedCreateAggregate(self): con = sqlite.connect(":memory:") @@ -785,31 +798,57 @@ class ClosedConTests(unittest.TestCase): pass def finalize(self): return 17 - with self.assertRaises(sqlite.ProgrammingError): + try: con.create_aggregate("foo", 1, Agg) + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") def CheckClosedSetAuthorizer(self): con = sqlite.connect(":memory:") con.close() def authorizer(*args): return sqlite.DENY - with self.assertRaises(sqlite.ProgrammingError): + try: con.set_authorizer(authorizer) + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") def CheckClosedSetProgressCallback(self): con = sqlite.connect(":memory:") con.close() def progress(): pass - with self.assertRaises(sqlite.ProgrammingError): + try: con.set_progress_handler(progress, 100) + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") def CheckClosedCall(self): con = sqlite.connect(":memory:") con.close() - with self.assertRaises(sqlite.ProgrammingError): + try: con() + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") class ClosedCurTests(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + def CheckClosed(self): con = sqlite.connect(":memory:") cur = con.cursor() @@ -823,103 +862,15 @@ class ClosedCurTests(unittest.TestCase): else: params = [] - with self.assertRaises(sqlite.ProgrammingError): + try: method = getattr(cur, method_name) - method(*params) - - -class SqliteOnConflictTests(unittest.TestCase): - """ - Tests for SQLite's "insert on conflict" feature. - - See https://www.sqlite.org/lang_conflict.html for details. - """ - - def setUp(self): - self.cx = sqlite.connect(":memory:") - self.cu = self.cx.cursor() - self.cu.execute(""" - CREATE TABLE test( - id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE - ); - """) - - def tearDown(self): - self.cu.close() - self.cx.close() - - def CheckOnConflictRollbackWithExplicitTransaction(self): - self.cx.isolation_level = None # autocommit mode - self.cu = self.cx.cursor() - # Start an explicit transaction. - self.cu.execute("BEGIN") - self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") - self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") - with self.assertRaises(sqlite.IntegrityError): - self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") - # Use connection to commit. - self.cx.commit() - self.cu.execute("SELECT name, unique_name from test") - # Transaction should have rolled back and nothing should be in table. - self.assertEqual(self.cu.fetchall(), []) - - def CheckOnConflictAbortRaisesWithExplicitTransactions(self): - # Abort cancels the current sql statement but doesn't change anything - # about the current transaction. - self.cx.isolation_level = None # autocommit mode - self.cu = self.cx.cursor() - # Start an explicit transaction. - self.cu.execute("BEGIN") - self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") - self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") - with self.assertRaises(sqlite.IntegrityError): - self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") - self.cx.commit() - self.cu.execute("SELECT name, unique_name FROM test") - # Expect the first two inserts to work, third to do nothing. - self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) - - def CheckOnConflictRollbackWithoutTransaction(self): - # Start of implicit transaction - self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") - self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") - with self.assertRaises(sqlite.IntegrityError): - self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") - self.cu.execute("SELECT name, unique_name FROM test") - # Implicit transaction is rolled back on error. - self.assertEqual(self.cu.fetchall(), []) - - def CheckOnConflictAbortRaisesWithoutTransactions(self): - # Abort cancels the current sql statement but doesn't change anything - # about the current transaction. - self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") - self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") - with self.assertRaises(sqlite.IntegrityError): - self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") - # Make sure all other values were inserted. - self.cu.execute("SELECT name, unique_name FROM test") - self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) - - def CheckOnConflictFail(self): - self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") - with self.assertRaises(sqlite.IntegrityError): - self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") - self.assertEqual(self.cu.fetchall(), []) - - def CheckOnConflictIgnore(self): - self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") - # Nothing should happen. - self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") - self.cu.execute("SELECT unique_name FROM test") - self.assertEqual(self.cu.fetchall(), [('foo',)]) - - def CheckOnConflictReplace(self): - self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')") - # There shouldn't be an IntegrityError exception. - self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')") - self.cu.execute("SELECT name, unique_name FROM test") - self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')]) + method(*params) + self.fail("Should have raised a ProgrammingError: method " + method_name) + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError: " + method_name) def suite(): module_suite = unittest.makeSuite(ModuleTests, "Check") @@ -930,12 +881,7 @@ def suite(): ext_suite = unittest.makeSuite(ExtensionTests, "Check") closed_con_suite = unittest.makeSuite(ClosedConTests, "Check") closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check") - on_conflict_suite = unittest.makeSuite(SqliteOnConflictTests, "Check") - return unittest.TestSuite(( - module_suite, connection_suite, cursor_suite, thread_suite, - constructor_suite, ext_suite, closed_con_suite, closed_cur_suite, - on_conflict_suite, - )) + return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite)) def test(): runner = unittest.TextTestRunner() |