summaryrefslogtreecommitdiffstats
path: root/Lib/sqlite3/test/regression.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/sqlite3/test/regression.py')
-rw-r--r--Lib/sqlite3/test/regression.py194
1 files changed, 111 insertions, 83 deletions
diff --git a/Lib/sqlite3/test/regression.py b/Lib/sqlite3/test/regression.py
index c714116..42fc7c2 100644
--- a/Lib/sqlite3/test/regression.py
+++ b/Lib/sqlite3/test/regression.py
@@ -1,7 +1,7 @@
#-*- coding: iso-8859-1 -*-
# pysqlite2/test/regression.py: pysqlite regression tests
#
-# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
+# Copyright (C) 2006-2007 Gerhard Häring <gh@ghaering.de>
#
# This file is part of pysqlite.
#
@@ -25,7 +25,6 @@ import datetime
import unittest
import sqlite3 as sqlite
import weakref
-import functools
from test import support
class RegressionTests(unittest.TestCase):
@@ -55,10 +54,10 @@ class RegressionTests(unittest.TestCase):
# reset before a rollback, but only those that are still in the
# statement cache. The others are not accessible from the connection object.
con = sqlite.connect(":memory:", cached_statements=5)
- cursors = [con.cursor() for x in range(5)]
+ cursors = [con.cursor() for x in xrange(5)]
cursors[0].execute("create table test(x)")
for i in range(10):
- cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
+ cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)])
for i in range(5):
cursors[i].execute(" " * i + "select x from test")
@@ -87,8 +86,9 @@ class RegressionTests(unittest.TestCase):
cur.execute("select 1 x union select " + str(i))
con.close()
- @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer')
def CheckOnConflictRollback(self):
+ if sqlite.sqlite_version_info < (3, 2, 2):
+ return
con = sqlite.connect(":memory:")
con.execute("create table foo(x, unique(x) on conflict rollback)")
con.execute("insert into foo(x) values (1)")
@@ -118,6 +118,18 @@ class RegressionTests(unittest.TestCase):
"""
self.con.execute("")
+ def CheckUnicodeConnect(self):
+ """
+ With pysqlite 2.4.0 you needed to use a string or an APSW connection
+ object for opening database connections.
+
+ Formerly, both bytestrings and unicode strings used to work.
+
+ Let's make sure unicode strings work in the future.
+ """
+ con = sqlite.connect(u":memory:")
+ con.close()
+
def CheckTypeMapUsage(self):
"""
pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
@@ -133,15 +145,6 @@ class RegressionTests(unittest.TestCase):
con.execute("insert into foo(bar) values (5)")
con.execute(SELECT)
- def CheckErrorMsgDecodeError(self):
- # When porting the module to Python 3.0, the error message about
- # decoding errors disappeared. This verifies they're back again.
- with self.assertRaises(sqlite.OperationalError) as cm:
- self.con.execute("select 'xxx' || ? || 'yyy' colname",
- (bytes(bytearray([250])),)).fetchone()
- msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
- self.assertIn(msg, str(cm.exception))
-
def CheckRegisterAdapter(self):
"""
See issue 3312.
@@ -149,34 +152,12 @@ class RegressionTests(unittest.TestCase):
self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
def CheckSetIsolationLevel(self):
- # See issue 27881.
- class CustomStr(str):
- def upper(self):
- return None
- def __del__(self):
- con.isolation_level = ""
-
+ """
+ See issue 3312.
+ """
con = sqlite.connect(":memory:")
- con.isolation_level = None
- for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
- with self.subTest(level=level):
- con.isolation_level = level
- con.isolation_level = level.lower()
- con.isolation_level = level.capitalize()
- con.isolation_level = CustomStr(level)
-
- # setting isolation_level failure should not alter previous state
- con.isolation_level = None
- con.isolation_level = "DEFERRED"
- pairs = [
- (1, TypeError), (b'', TypeError), ("abc", ValueError),
- ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
- ]
- for value, exc in pairs:
- with self.subTest(level=value):
- with self.assertRaises(exc):
- con.isolation_level = value
- self.assertEqual(con.isolation_level, "DEFERRED")
+ self.assertRaises(UnicodeEncodeError, setattr, con,
+ "isolation_level", u"\xe9")
def CheckCursorConstructorCallCheck(self):
"""
@@ -189,19 +170,17 @@ class RegressionTests(unittest.TestCase):
con = sqlite.connect(":memory:")
cur = Cursor(con)
- with self.assertRaises(sqlite.ProgrammingError):
+ try:
cur.execute("select 4+5").fetchall()
- with self.assertRaisesRegex(sqlite.ProgrammingError,
- r'^Base Cursor\.__init__ not called\.$'):
+ self.fail("should have raised ProgrammingError")
+ except sqlite.ProgrammingError:
+ pass
+ except:
+ self.fail("should have raised ProgrammingError")
+ with self.assertRaisesRegexp(sqlite.ProgrammingError,
+ r'^Base Cursor\.__init__ not called\.$'):
cur.close()
- def CheckStrSubclass(self):
- """
- The Python 3.0 port of the module didn't cope with values of subclasses of str.
- """
- class MyStr(str): pass
- self.con.execute("select ?", (MyStr("abc"),))
-
def CheckConnectionConstructorCallCheck(self):
"""
Verifies that connection methods check whether base class __init__ was
@@ -212,8 +191,13 @@ class RegressionTests(unittest.TestCase):
pass
con = Connection(":memory:")
- with self.assertRaises(sqlite.ProgrammingError):
+ try:
cur = con.cursor()
+ self.fail("should have raised ProgrammingError")
+ except sqlite.ProgrammingError:
+ pass
+ except:
+ self.fail("should have raised ProgrammingError")
def CheckCursorRegistration(self):
"""
@@ -234,8 +218,13 @@ class RegressionTests(unittest.TestCase):
cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
cur.execute("select x from foo")
con.rollback()
- with self.assertRaises(sqlite.InterfaceError):
+ try:
cur.fetchall()
+ self.fail("should have raised InterfaceError")
+ except sqlite.InterfaceError:
+ pass
+ except:
+ self.fail("should have raised InterfaceError")
def CheckAutoCommit(self):
"""
@@ -262,14 +251,7 @@ class RegressionTests(unittest.TestCase):
Call a connection with a non-string SQL request: check error handling
of the statement constructor.
"""
- self.assertRaises(TypeError, self.con, 1)
-
- def CheckCollation(self):
- def collation_cb(a, b):
- return 1
- self.assertRaises(sqlite.ProgrammingError, self.con.create_collation,
- # Lone surrogate cannot be encoded to the default encoding (utf8)
- "\uDC80", collation_cb)
+ self.assertRaises(sqlite.Warning, self.con, 1)
def CheckRecursiveCursorUse(self):
"""
@@ -352,16 +334,15 @@ class RegressionTests(unittest.TestCase):
counter = 0
for i, row in enumerate(con.execute("select c from t")):
- with self.subTest(i=i, row=row):
- con.execute("insert into t2(c) values (?)", (i,))
- con.commit()
- if counter == 0:
- self.assertEqual(row[0], 0)
- elif counter == 1:
- self.assertEqual(row[0], 1)
- elif counter == 2:
- self.assertEqual(row[0], 2)
- counter += 1
+ con.execute("insert into t2(c) values (?)", (i,))
+ con.commit()
+ if counter == 0:
+ self.assertEqual(row[0], 0)
+ elif counter == 1:
+ self.assertEqual(row[0], 1)
+ elif counter == 2:
+ self.assertEqual(row[0], 2)
+ counter += 1
self.assertEqual(counter, 3, "should have returned exactly three rows")
def CheckBpo31770(self):
@@ -384,26 +365,73 @@ class RegressionTests(unittest.TestCase):
with self.assertRaises(AttributeError):
del self.con.isolation_level
- def CheckBpo37347(self):
- class Printer:
- def log(self, *args):
- return sqlite.SQLITE_OK
- for method in [self.con.set_trace_callback,
- functools.partial(self.con.set_progress_handler, n=1),
- self.con.set_authorizer]:
- printer_instance = Printer()
- method(printer_instance.log)
- method(printer_instance.log)
- self.con.execute("select 1") # trigger seg fault
- method(None)
+class UnhashableFunc:
+ def __hash__(self):
+ raise TypeError('unhashable type')
+
+ def __init__(self, return_value=None):
+ self.calls = 0
+ self.return_value = return_value
+
+ def __call__(self, *args, **kwargs):
+ self.calls += 1
+ return self.return_value
+
+
+class UnhashableCallbacksTestCase(unittest.TestCase):
+ """
+ https://bugs.python.org/issue34052
+
+ Registering unhashable callbacks raises TypeError, callbacks are not
+ registered in SQLite after such registration attempt.
+ """
+ def setUp(self):
+ self.con = sqlite.connect(':memory:')
+
+ def tearDown(self):
+ self.con.close()
+ def test_progress_handler(self):
+ f = UnhashableFunc(return_value=0)
+ with self.assertRaisesRegexp(TypeError, 'unhashable type'):
+ self.con.set_progress_handler(f, 1)
+ self.con.execute('SELECT 1')
+ self.assertFalse(f.calls)
+
+ def test_func(self):
+ func_name = 'func_name'
+ f = UnhashableFunc()
+ with self.assertRaisesRegexp(TypeError, 'unhashable type'):
+ self.con.create_function(func_name, 0, f)
+ msg = 'no such function: %s' % func_name
+ with self.assertRaisesRegexp(sqlite.OperationalError, msg):
+ self.con.execute('SELECT %s()' % func_name)
+ self.assertFalse(f.calls)
+
+ def test_authorizer(self):
+ f = UnhashableFunc(return_value=sqlite.SQLITE_DENY)
+ with self.assertRaisesRegexp(TypeError, 'unhashable type'):
+ self.con.set_authorizer(f)
+ self.con.execute('SELECT 1')
+ self.assertFalse(f.calls)
+
+ def test_aggr(self):
+ class UnhashableType(type):
+ __hash__ = None
+ aggr_name = 'aggr_name'
+ with self.assertRaisesRegexp(TypeError, 'unhashable type'):
+ self.con.create_aggregate(aggr_name, 0, UnhashableType('Aggr', (), {}))
+ msg = 'no such function: %s' % aggr_name
+ with self.assertRaisesRegexp(sqlite.OperationalError, msg):
+ self.con.execute('SELECT %s()' % aggr_name)
def suite():
regression_suite = unittest.makeSuite(RegressionTests, "Check")
return unittest.TestSuite((
regression_suite,
+ unittest.makeSuite(UnhashableCallbacksTestCase),
))
def test():