diff options
Diffstat (limited to 'Lib/sqlite3/test/regression.py')
-rw-r--r-- | Lib/sqlite3/test/regression.py | 194 |
1 files changed, 111 insertions, 83 deletions
diff --git a/Lib/sqlite3/test/regression.py b/Lib/sqlite3/test/regression.py index c714116..42fc7c2 100644 --- a/Lib/sqlite3/test/regression.py +++ b/Lib/sqlite3/test/regression.py @@ -1,7 +1,7 @@ #-*- coding: iso-8859-1 -*- # pysqlite2/test/regression.py: pysqlite regression tests # -# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de> +# Copyright (C) 2006-2007 Gerhard Häring <gh@ghaering.de> # # This file is part of pysqlite. # @@ -25,7 +25,6 @@ import datetime import unittest import sqlite3 as sqlite import weakref -import functools from test import support class RegressionTests(unittest.TestCase): @@ -55,10 +54,10 @@ class RegressionTests(unittest.TestCase): # reset before a rollback, but only those that are still in the # statement cache. The others are not accessible from the connection object. con = sqlite.connect(":memory:", cached_statements=5) - cursors = [con.cursor() for x in range(5)] + cursors = [con.cursor() for x in xrange(5)] cursors[0].execute("create table test(x)") for i in range(10): - cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)]) + cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)]) for i in range(5): cursors[i].execute(" " * i + "select x from test") @@ -87,8 +86,9 @@ class RegressionTests(unittest.TestCase): cur.execute("select 1 x union select " + str(i)) con.close() - @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer') def CheckOnConflictRollback(self): + if sqlite.sqlite_version_info < (3, 2, 2): + return con = sqlite.connect(":memory:") con.execute("create table foo(x, unique(x) on conflict rollback)") con.execute("insert into foo(x) values (1)") @@ -118,6 +118,18 @@ class RegressionTests(unittest.TestCase): """ self.con.execute("") + def CheckUnicodeConnect(self): + """ + With pysqlite 2.4.0 you needed to use a string or an APSW connection + object for opening database connections. + + Formerly, both bytestrings and unicode strings used to work. + + Let's make sure unicode strings work in the future. + """ + con = sqlite.connect(u":memory:") + con.close() + def CheckTypeMapUsage(self): """ pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling @@ -133,15 +145,6 @@ class RegressionTests(unittest.TestCase): con.execute("insert into foo(bar) values (5)") con.execute(SELECT) - def CheckErrorMsgDecodeError(self): - # When porting the module to Python 3.0, the error message about - # decoding errors disappeared. This verifies they're back again. - with self.assertRaises(sqlite.OperationalError) as cm: - self.con.execute("select 'xxx' || ? || 'yyy' colname", - (bytes(bytearray([250])),)).fetchone() - msg = "Could not decode to UTF-8 column 'colname' with text 'xxx" - self.assertIn(msg, str(cm.exception)) - def CheckRegisterAdapter(self): """ See issue 3312. @@ -149,34 +152,12 @@ class RegressionTests(unittest.TestCase): self.assertRaises(TypeError, sqlite.register_adapter, {}, None) def CheckSetIsolationLevel(self): - # See issue 27881. - class CustomStr(str): - def upper(self): - return None - def __del__(self): - con.isolation_level = "" - + """ + See issue 3312. + """ con = sqlite.connect(":memory:") - con.isolation_level = None - for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE": - with self.subTest(level=level): - con.isolation_level = level - con.isolation_level = level.lower() - con.isolation_level = level.capitalize() - con.isolation_level = CustomStr(level) - - # setting isolation_level failure should not alter previous state - con.isolation_level = None - con.isolation_level = "DEFERRED" - pairs = [ - (1, TypeError), (b'', TypeError), ("abc", ValueError), - ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError), - ] - for value, exc in pairs: - with self.subTest(level=value): - with self.assertRaises(exc): - con.isolation_level = value - self.assertEqual(con.isolation_level, "DEFERRED") + self.assertRaises(UnicodeEncodeError, setattr, con, + "isolation_level", u"\xe9") def CheckCursorConstructorCallCheck(self): """ @@ -189,19 +170,17 @@ class RegressionTests(unittest.TestCase): con = sqlite.connect(":memory:") cur = Cursor(con) - with self.assertRaises(sqlite.ProgrammingError): + try: cur.execute("select 4+5").fetchall() - with self.assertRaisesRegex(sqlite.ProgrammingError, - r'^Base Cursor\.__init__ not called\.$'): + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("should have raised ProgrammingError") + with self.assertRaisesRegexp(sqlite.ProgrammingError, + r'^Base Cursor\.__init__ not called\.$'): cur.close() - def CheckStrSubclass(self): - """ - The Python 3.0 port of the module didn't cope with values of subclasses of str. - """ - class MyStr(str): pass - self.con.execute("select ?", (MyStr("abc"),)) - def CheckConnectionConstructorCallCheck(self): """ Verifies that connection methods check whether base class __init__ was @@ -212,8 +191,13 @@ class RegressionTests(unittest.TestCase): pass con = Connection(":memory:") - with self.assertRaises(sqlite.ProgrammingError): + try: cur = con.cursor() + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("should have raised ProgrammingError") def CheckCursorRegistration(self): """ @@ -234,8 +218,13 @@ class RegressionTests(unittest.TestCase): cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)]) cur.execute("select x from foo") con.rollback() - with self.assertRaises(sqlite.InterfaceError): + try: cur.fetchall() + self.fail("should have raised InterfaceError") + except sqlite.InterfaceError: + pass + except: + self.fail("should have raised InterfaceError") def CheckAutoCommit(self): """ @@ -262,14 +251,7 @@ class RegressionTests(unittest.TestCase): Call a connection with a non-string SQL request: check error handling of the statement constructor. """ - self.assertRaises(TypeError, self.con, 1) - - def CheckCollation(self): - def collation_cb(a, b): - return 1 - self.assertRaises(sqlite.ProgrammingError, self.con.create_collation, - # Lone surrogate cannot be encoded to the default encoding (utf8) - "\uDC80", collation_cb) + self.assertRaises(sqlite.Warning, self.con, 1) def CheckRecursiveCursorUse(self): """ @@ -352,16 +334,15 @@ class RegressionTests(unittest.TestCase): counter = 0 for i, row in enumerate(con.execute("select c from t")): - with self.subTest(i=i, row=row): - con.execute("insert into t2(c) values (?)", (i,)) - con.commit() - if counter == 0: - self.assertEqual(row[0], 0) - elif counter == 1: - self.assertEqual(row[0], 1) - elif counter == 2: - self.assertEqual(row[0], 2) - counter += 1 + con.execute("insert into t2(c) values (?)", (i,)) + con.commit() + if counter == 0: + self.assertEqual(row[0], 0) + elif counter == 1: + self.assertEqual(row[0], 1) + elif counter == 2: + self.assertEqual(row[0], 2) + counter += 1 self.assertEqual(counter, 3, "should have returned exactly three rows") def CheckBpo31770(self): @@ -384,26 +365,73 @@ class RegressionTests(unittest.TestCase): with self.assertRaises(AttributeError): del self.con.isolation_level - def CheckBpo37347(self): - class Printer: - def log(self, *args): - return sqlite.SQLITE_OK - for method in [self.con.set_trace_callback, - functools.partial(self.con.set_progress_handler, n=1), - self.con.set_authorizer]: - printer_instance = Printer() - method(printer_instance.log) - method(printer_instance.log) - self.con.execute("select 1") # trigger seg fault - method(None) +class UnhashableFunc: + def __hash__(self): + raise TypeError('unhashable type') + + def __init__(self, return_value=None): + self.calls = 0 + self.return_value = return_value + + def __call__(self, *args, **kwargs): + self.calls += 1 + return self.return_value + + +class UnhashableCallbacksTestCase(unittest.TestCase): + """ + https://bugs.python.org/issue34052 + + Registering unhashable callbacks raises TypeError, callbacks are not + registered in SQLite after such registration attempt. + """ + def setUp(self): + self.con = sqlite.connect(':memory:') + + def tearDown(self): + self.con.close() + def test_progress_handler(self): + f = UnhashableFunc(return_value=0) + with self.assertRaisesRegexp(TypeError, 'unhashable type'): + self.con.set_progress_handler(f, 1) + self.con.execute('SELECT 1') + self.assertFalse(f.calls) + + def test_func(self): + func_name = 'func_name' + f = UnhashableFunc() + with self.assertRaisesRegexp(TypeError, 'unhashable type'): + self.con.create_function(func_name, 0, f) + msg = 'no such function: %s' % func_name + with self.assertRaisesRegexp(sqlite.OperationalError, msg): + self.con.execute('SELECT %s()' % func_name) + self.assertFalse(f.calls) + + def test_authorizer(self): + f = UnhashableFunc(return_value=sqlite.SQLITE_DENY) + with self.assertRaisesRegexp(TypeError, 'unhashable type'): + self.con.set_authorizer(f) + self.con.execute('SELECT 1') + self.assertFalse(f.calls) + + def test_aggr(self): + class UnhashableType(type): + __hash__ = None + aggr_name = 'aggr_name' + with self.assertRaisesRegexp(TypeError, 'unhashable type'): + self.con.create_aggregate(aggr_name, 0, UnhashableType('Aggr', (), {})) + msg = 'no such function: %s' % aggr_name + with self.assertRaisesRegexp(sqlite.OperationalError, msg): + self.con.execute('SELECT %s()' % aggr_name) def suite(): regression_suite = unittest.makeSuite(RegressionTests, "Check") return unittest.TestSuite(( regression_suite, + unittest.makeSuite(UnhashableCallbacksTestCase), )) def test(): |