diff options
Diffstat (limited to 'Lib/sqlite3/test/dbapi.py')
| -rw-r--r-- | Lib/sqlite3/test/dbapi.py | 306 |
1 files changed, 146 insertions, 160 deletions
diff --git a/Lib/sqlite3/test/dbapi.py b/Lib/sqlite3/test/dbapi.py index 04649fc..7fb8d7e 100644 --- a/Lib/sqlite3/test/dbapi.py +++ b/Lib/sqlite3/test/dbapi.py @@ -122,11 +122,8 @@ class ConnectionTests(unittest.TestCase): def CheckFailedOpen(self): YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" - try: + with self.assertRaises(sqlite.OperationalError): con = sqlite.connect(YOU_CANNOT_OPEN_THIS) - except sqlite.OperationalError: - return - self.fail("should have raised an OperationalError") def CheckClose(self): self.cx.close() @@ -180,6 +177,12 @@ class ConnectionTests(unittest.TestCase): with self.assertRaises(sqlite.OperationalError): cx.execute('insert into test(id) values(1)') + @unittest.skipIf(sqlite.sqlite_version_info >= (3, 3, 1), + 'needs sqlite versions older than 3.3.1') + def CheckSameThreadErrorOnOldVersion(self): + with self.assertRaises(sqlite.NotSupportedError) as cm: + sqlite.connect(':memory:', check_same_thread=False) + self.assertEqual(str(cm.exception), 'shared connections not available') class CursorTests(unittest.TestCase): def setUp(self): @@ -196,22 +199,12 @@ class CursorTests(unittest.TestCase): self.cu.execute("delete from test") def CheckExecuteIllegalSql(self): - try: + with self.assertRaises(sqlite.OperationalError): self.cu.execute("select asdf") - self.fail("should have raised an OperationalError") - except sqlite.OperationalError: - return - except: - self.fail("raised wrong exception") def CheckExecuteTooMuchSql(self): - try: + with self.assertRaises(sqlite.Warning): self.cu.execute("select 5+4; select 4+5") - self.fail("should have raised a Warning") - except sqlite.Warning: - return - except: - self.fail("raised wrong exception") def CheckExecuteTooMuchSql2(self): self.cu.execute("select 5+4; -- foo bar") @@ -226,13 +219,8 @@ class CursorTests(unittest.TestCase): """) def CheckExecuteWrongSqlArg(self): - try: + with self.assertRaises(ValueError): self.cu.execute(42) - self.fail("should have raised a ValueError") - except ValueError: - return - except: - self.fail("raised wrong exception.") def CheckExecuteArgInt(self): self.cu.execute("insert into test(id) values (?)", (42,)) @@ -250,29 +238,25 @@ class CursorTests(unittest.TestCase): row = self.cu.fetchone() self.assertEqual(row[0], "Hu\x00go") + def CheckExecuteNonIterable(self): + with self.assertRaises(ValueError) as cm: + self.cu.execute("insert into test(id) values (?)", 42) + self.assertEqual(str(cm.exception), 'parameters are of unsupported type') + def CheckExecuteWrongNoOfArgs1(self): # too many parameters - try: + with self.assertRaises(sqlite.ProgrammingError): self.cu.execute("insert into test(id) values (?)", (17, "Egon")) - self.fail("should have raised ProgrammingError") - except sqlite.ProgrammingError: - pass def CheckExecuteWrongNoOfArgs2(self): # too little parameters - try: + with self.assertRaises(sqlite.ProgrammingError): self.cu.execute("insert into test(id) values (?)") - self.fail("should have raised ProgrammingError") - except sqlite.ProgrammingError: - pass def CheckExecuteWrongNoOfArgs3(self): # no parameters, parameters are needed - try: + with self.assertRaises(sqlite.ProgrammingError): self.cu.execute("insert into test(id) values (?)") - self.fail("should have raised ProgrammingError") - except sqlite.ProgrammingError: - pass def CheckExecuteParamList(self): self.cu.execute("insert into test(name) values ('foo')") @@ -311,27 +295,18 @@ class CursorTests(unittest.TestCase): def CheckExecuteDictMappingTooLittleArgs(self): self.cu.execute("insert into test(name) values ('foo')") - try: + with self.assertRaises(sqlite.ProgrammingError): self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) - self.fail("should have raised ProgrammingError") - except sqlite.ProgrammingError: - pass def CheckExecuteDictMappingNoArgs(self): self.cu.execute("insert into test(name) values ('foo')") - try: + with self.assertRaises(sqlite.ProgrammingError): self.cu.execute("select name from test where name=:name") - self.fail("should have raised ProgrammingError") - except sqlite.ProgrammingError: - pass def CheckExecuteDictMappingUnnamed(self): self.cu.execute("insert into test(name) values ('foo')") - try: + with self.assertRaises(sqlite.ProgrammingError): self.cu.execute("select name from test where name=?", {"name": "foo"}) - self.fail("should have raised ProgrammingError") - except sqlite.ProgrammingError: - pass def CheckClose(self): self.cu.close() @@ -360,8 +335,7 @@ class CursorTests(unittest.TestCase): def CheckTotalChanges(self): self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("insert into test(name) values ('foo')") - if self.cx.total_changes < 2: - self.fail("total changes reported wrong value") + self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value') # Checks for executemany: # Sequences are required by the DB-API, iterators @@ -392,32 +366,16 @@ class CursorTests(unittest.TestCase): self.cu.executemany("insert into test(income) values (?)", mygen()) def CheckExecuteManyWrongSqlArg(self): - try: + with self.assertRaises(ValueError): self.cu.executemany(42, [(3,)]) - self.fail("should have raised a ValueError") - except ValueError: - return - except: - self.fail("raised wrong exception.") def CheckExecuteManySelect(self): - try: + with self.assertRaises(sqlite.ProgrammingError): self.cu.executemany("select ?", [(3,)]) - self.fail("should have raised a ProgrammingError") - except sqlite.ProgrammingError: - return - except: - self.fail("raised wrong exception.") def CheckExecuteManyNotIterable(self): - try: + with self.assertRaises(TypeError): self.cu.executemany("insert into test(income) values (?)", 42) - self.fail("should have raised a TypeError") - except TypeError: - return - except Exception as e: - print("raised", e.__class__) - self.fail("raised wrong exception.") def CheckFetchIter(self): # Optional DB-API extension. @@ -494,22 +452,15 @@ class CursorTests(unittest.TestCase): self.assertEqual(self.cu.connection, self.cx) def CheckWrongCursorCallable(self): - try: + with self.assertRaises(TypeError): def f(): pass cur = self.cx.cursor(f) - self.fail("should have raised a TypeError") - except TypeError: - return - self.fail("should have raised a ValueError") def CheckCursorWrongClass(self): class Foo: pass foo = Foo() - try: + with self.assertRaises(TypeError): cur = sqlite.Cursor(foo) - self.fail("should have raised a ValueError") - except TypeError: - pass @unittest.skipUnless(threading, 'This test requires threading.') class ThreadTests(unittest.TestCase): @@ -708,22 +659,21 @@ class ExtensionTests(unittest.TestCase): def CheckScriptSyntaxError(self): con = sqlite.connect(":memory:") cur = con.cursor() - raised = False - try: + with self.assertRaises(sqlite.OperationalError): cur.executescript("create table test(x); asdf; create table test2(x)") - except sqlite.OperationalError: - raised = True - self.assertEqual(raised, True, "should have raised an exception") def CheckScriptErrorNormal(self): con = sqlite.connect(":memory:") cur = con.cursor() - raised = False - try: + with self.assertRaises(sqlite.OperationalError): cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") - except sqlite.OperationalError: - raised = True - self.assertEqual(raised, True, "should have raised an exception") + + def CheckCursorExecutescriptAsBytes(self): + con = sqlite.connect(":memory:") + cur = con.cursor() + with self.assertRaises(ValueError) as cm: + cur.executescript(b"create table test(foo); insert into test(foo) values (5);") + self.assertEqual(str(cm.exception), 'script argument must be unicode.') def CheckConnectionExecute(self): con = sqlite.connect(":memory:") @@ -745,68 +695,37 @@ class ExtensionTests(unittest.TestCase): self.assertEqual(result, 5, "Basic test of Connection.executescript") class ClosedConTests(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - def CheckClosedConCursor(self): con = sqlite.connect(":memory:") con.close() - try: + with self.assertRaises(sqlite.ProgrammingError): cur = con.cursor() - self.fail("Should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - except: - self.fail("Should have raised a ProgrammingError") def CheckClosedConCommit(self): con = sqlite.connect(":memory:") con.close() - try: + with self.assertRaises(sqlite.ProgrammingError): con.commit() - self.fail("Should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - except: - self.fail("Should have raised a ProgrammingError") def CheckClosedConRollback(self): con = sqlite.connect(":memory:") con.close() - try: + with self.assertRaises(sqlite.ProgrammingError): con.rollback() - self.fail("Should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - except: - self.fail("Should have raised a ProgrammingError") def CheckClosedCurExecute(self): con = sqlite.connect(":memory:") cur = con.cursor() con.close() - try: + with self.assertRaises(sqlite.ProgrammingError): cur.execute("select 4") - self.fail("Should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - except: - self.fail("Should have raised a ProgrammingError") def CheckClosedCreateFunction(self): con = sqlite.connect(":memory:") con.close() def f(x): return 17 - try: + with self.assertRaises(sqlite.ProgrammingError): con.create_function("foo", 1, f) - self.fail("Should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - except: - self.fail("Should have raised a ProgrammingError") def CheckClosedCreateAggregate(self): con = sqlite.connect(":memory:") @@ -818,57 +737,31 @@ class ClosedConTests(unittest.TestCase): pass def finalize(self): return 17 - try: + with self.assertRaises(sqlite.ProgrammingError): con.create_aggregate("foo", 1, Agg) - self.fail("Should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - except: - self.fail("Should have raised a ProgrammingError") def CheckClosedSetAuthorizer(self): con = sqlite.connect(":memory:") con.close() def authorizer(*args): return sqlite.DENY - try: + with self.assertRaises(sqlite.ProgrammingError): con.set_authorizer(authorizer) - self.fail("Should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - except: - self.fail("Should have raised a ProgrammingError") def CheckClosedSetProgressCallback(self): con = sqlite.connect(":memory:") con.close() def progress(): pass - try: + with self.assertRaises(sqlite.ProgrammingError): con.set_progress_handler(progress, 100) - self.fail("Should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - except: - self.fail("Should have raised a ProgrammingError") def CheckClosedCall(self): con = sqlite.connect(":memory:") con.close() - try: + with self.assertRaises(sqlite.ProgrammingError): con() - self.fail("Should have raised a ProgrammingError") - except sqlite.ProgrammingError: - pass - except: - self.fail("Should have raised a ProgrammingError") class ClosedCurTests(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - def CheckClosed(self): con = sqlite.connect(":memory:") cur = con.cursor() @@ -882,15 +775,103 @@ class ClosedCurTests(unittest.TestCase): else: params = [] - try: + with self.assertRaises(sqlite.ProgrammingError): method = getattr(cur, method_name) - method(*params) - self.fail("Should have raised a ProgrammingError: method " + method_name) - except sqlite.ProgrammingError: - pass - except: - self.fail("Should have raised a ProgrammingError: " + method_name) + + +class SqliteOnConflictTests(unittest.TestCase): + """ + Tests for SQLite's "insert on conflict" feature. + + See https://www.sqlite.org/lang_conflict.html for details. + """ + + def setUp(self): + self.cx = sqlite.connect(":memory:") + self.cu = self.cx.cursor() + self.cu.execute(""" + CREATE TABLE test( + id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE + ); + """) + + def tearDown(self): + self.cu.close() + self.cx.close() + + def CheckOnConflictRollbackWithExplicitTransaction(self): + self.cx.isolation_level = None # autocommit mode + self.cu = self.cx.cursor() + # Start an explicit transaction. + self.cu.execute("BEGIN") + self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") + self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") + with self.assertRaises(sqlite.IntegrityError): + self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") + # Use connection to commit. + self.cx.commit() + self.cu.execute("SELECT name, unique_name from test") + # Transaction should have rolled back and nothing should be in table. + self.assertEqual(self.cu.fetchall(), []) + + def CheckOnConflictAbortRaisesWithExplicitTransactions(self): + # Abort cancels the current sql statement but doesn't change anything + # about the current transaction. + self.cx.isolation_level = None # autocommit mode + self.cu = self.cx.cursor() + # Start an explicit transaction. + self.cu.execute("BEGIN") + self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") + self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") + with self.assertRaises(sqlite.IntegrityError): + self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") + self.cx.commit() + self.cu.execute("SELECT name, unique_name FROM test") + # Expect the first two inserts to work, third to do nothing. + self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) + + def CheckOnConflictRollbackWithoutTransaction(self): + # Start of implicit transaction + self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") + self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") + with self.assertRaises(sqlite.IntegrityError): + self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") + self.cu.execute("SELECT name, unique_name FROM test") + # Implicit transaction is rolled back on error. + self.assertEqual(self.cu.fetchall(), []) + + def CheckOnConflictAbortRaisesWithoutTransactions(self): + # Abort cancels the current sql statement but doesn't change anything + # about the current transaction. + self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") + self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") + with self.assertRaises(sqlite.IntegrityError): + self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") + # Make sure all other values were inserted. + self.cu.execute("SELECT name, unique_name FROM test") + self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) + + def CheckOnConflictFail(self): + self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") + with self.assertRaises(sqlite.IntegrityError): + self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") + self.assertEqual(self.cu.fetchall(), []) + + def CheckOnConflictIgnore(self): + self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") + # Nothing should happen. + self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") + self.cu.execute("SELECT unique_name FROM test") + self.assertEqual(self.cu.fetchall(), [('foo',)]) + + def CheckOnConflictReplace(self): + self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')") + # There shouldn't be an IntegrityError exception. + self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')") + self.cu.execute("SELECT name, unique_name FROM test") + self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')]) + def suite(): module_suite = unittest.makeSuite(ModuleTests, "Check") @@ -901,7 +882,12 @@ def suite(): ext_suite = unittest.makeSuite(ExtensionTests, "Check") closed_con_suite = unittest.makeSuite(ClosedConTests, "Check") closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check") - return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite)) + on_conflict_suite = unittest.makeSuite(SqliteOnConflictTests, "Check") + return unittest.TestSuite(( + module_suite, connection_suite, cursor_suite, thread_suite, + constructor_suite, ext_suite, closed_con_suite, closed_cur_suite, + on_conflict_suite, + )) def test(): runner = unittest.TextTestRunner() |
