diff options
Diffstat (limited to 'Lib/sqlite3/test/userfunctions.py')
-rw-r--r-- | Lib/sqlite3/test/userfunctions.py | 163 |
1 files changed, 70 insertions, 93 deletions
diff --git a/Lib/sqlite3/test/userfunctions.py b/Lib/sqlite3/test/userfunctions.py index 9501f53..1d19151 100644 --- a/Lib/sqlite3/test/userfunctions.py +++ b/Lib/sqlite3/test/userfunctions.py @@ -1,4 +1,4 @@ -#-*- coding: iso-8859-1 -*- +#-*- coding: ISO-8859-1 -*- # pysqlite2/test/userfunctions.py: tests for user-defined functions and # aggregates. # @@ -23,13 +23,13 @@ # 3. This notice may not be removed or altered from any source distribution. import unittest -import unittest.mock import sqlite3 as sqlite +from test import test_support def func_returntext(): return "foo" def func_returnunicode(): - return "bar" + return u"bar" def func_returnint(): return 42 def func_returnfloat(): @@ -37,14 +37,15 @@ def func_returnfloat(): def func_returnnull(): return None def func_returnblob(): - return b"blob" + with test_support.check_py3k_warnings(): + return buffer("blob") def func_returnlonglong(): return 1<<31 def func_raiseexception(): - 5/0 + 5 // 0 def func_isstring(v): - return type(v) is str + return type(v) is unicode def func_isint(v): return type(v) is int def func_isfloat(v): @@ -52,12 +53,9 @@ def func_isfloat(v): def func_isnone(v): return type(v) is type(None) def func_isblob(v): - return isinstance(v, (bytes, memoryview)) + return type(v) is buffer def func_islonglong(v): - return isinstance(v, int) and v >= 1<<31 - -def func(*args): - return len(args) + return isinstance(v, (int, long)) and v >= 1<<31 class AggrNoStep: def __init__(self): @@ -75,7 +73,7 @@ class AggrNoFinalize: class AggrExceptionInInit: def __init__(self): - 5/0 + 5 // 0 def step(self, x): pass @@ -88,7 +86,7 @@ class AggrExceptionInStep: pass def step(self, x): - 5/0 + 5 // 0 def finalize(self): return 42 @@ -101,33 +99,19 @@ class AggrExceptionInFinalize: pass def finalize(self): - 5/0 + 5 // 0 class AggrCheckType: def __init__(self): self.val = None def step(self, whichType, val): - theType = {"str": str, "int": int, "float": float, "None": type(None), - "blob": bytes} + theType = {"str": unicode, "int": int, "float": float, "None": type(None), "blob": buffer} self.val = int(theType[whichType] is type(val)) def finalize(self): return self.val -class AggrCheckTypes: - def __init__(self): - self.val = 0 - - def step(self, whichType, *vals): - theType = {"str": str, "int": int, "float": float, "None": type(None), - "blob": bytes} - for val in vals: - self.val += int(theType[whichType] is type(val)) - - def finalize(self): - return self.val - class AggrSum: def __init__(self): self.val = 0.0 @@ -157,14 +141,16 @@ class FunctionTests(unittest.TestCase): self.con.create_function("isnone", 1, func_isnone) self.con.create_function("isblob", 1, func_isblob) self.con.create_function("islonglong", 1, func_islonglong) - self.con.create_function("spam", -1, func) def tearDown(self): self.con.close() def CheckFuncErrorOnCreate(self): - with self.assertRaises(sqlite.OperationalError): + try: self.con.create_function("bla", -100, lambda x: 2*x) + self.fail("should have raised an OperationalError") + except sqlite.OperationalError: + pass def CheckFuncRefCount(self): def getfunc(): @@ -182,15 +168,15 @@ class FunctionTests(unittest.TestCase): cur = self.con.cursor() cur.execute("select returntext()") val = cur.fetchone()[0] - self.assertEqual(type(val), str) + self.assertEqual(type(val), unicode) self.assertEqual(val, "foo") def CheckFuncReturnUnicode(self): cur = self.con.cursor() cur.execute("select returnunicode()") val = cur.fetchone()[0] - self.assertEqual(type(val), str) - self.assertEqual(val, "bar") + self.assertEqual(type(val), unicode) + self.assertEqual(val, u"bar") def CheckFuncReturnInt(self): cur = self.con.cursor() @@ -218,8 +204,9 @@ class FunctionTests(unittest.TestCase): cur = self.con.cursor() cur.execute("select returnblob()") val = cur.fetchone()[0] - self.assertEqual(type(val), bytes) - self.assertEqual(val, b"blob") + with test_support.check_py3k_warnings(): + self.assertEqual(type(val), buffer) + self.assertEqual(val, buffer("blob")) def CheckFuncReturnLongLong(self): cur = self.con.cursor() @@ -229,10 +216,12 @@ class FunctionTests(unittest.TestCase): def CheckFuncException(self): cur = self.con.cursor() - with self.assertRaises(sqlite.OperationalError) as cm: + try: cur.execute("select raiseexception()") cur.fetchone() - self.assertEqual(str(cm.exception), 'user-defined function raised exception') + self.fail("should have raised OperationalError") + except sqlite.OperationalError, e: + self.assertEqual(e.args[0], 'user-defined function raised exception') def CheckParamString(self): cur = self.con.cursor() @@ -260,7 +249,8 @@ class FunctionTests(unittest.TestCase): def CheckParamBlob(self): cur = self.con.cursor() - cur.execute("select isblob(?)", (memoryview(b"blob"),)) + with test_support.check_py3k_warnings(): + cur.execute("select isblob(?)", (buffer("blob"),)) val = cur.fetchone()[0] self.assertEqual(val, 1) @@ -270,35 +260,6 @@ class FunctionTests(unittest.TestCase): val = cur.fetchone()[0] self.assertEqual(val, 1) - def CheckAnyArguments(self): - cur = self.con.cursor() - cur.execute("select spam(?, ?)", (1, 2)) - val = cur.fetchone()[0] - self.assertEqual(val, 2) - - def CheckFuncNonDeterministic(self): - mock = unittest.mock.Mock(return_value=None) - self.con.create_function("deterministic", 0, mock, deterministic=False) - self.con.execute("select deterministic() = deterministic()") - self.assertEqual(mock.call_count, 2) - - @unittest.skipIf(sqlite.sqlite_version_info < (3, 8, 3), "deterministic parameter not supported") - def CheckFuncDeterministic(self): - mock = unittest.mock.Mock(return_value=None) - self.con.create_function("deterministic", 0, mock, deterministic=True) - self.con.execute("select deterministic() = deterministic()") - self.assertEqual(mock.call_count, 1) - - @unittest.skipIf(sqlite.sqlite_version_info >= (3, 8, 3), "SQLite < 3.8.3 needed") - def CheckFuncDeterministicNotSupported(self): - with self.assertRaises(sqlite.NotSupportedError): - self.con.create_function("deterministic", 0, int, deterministic=True) - - def CheckFuncDeterministicKeywordOnly(self): - with self.assertRaises(TypeError): - self.con.create_function("deterministic", 0, int, True) - - class AggregateTests(unittest.TestCase): def setUp(self): self.con = sqlite.connect(":memory:") @@ -312,8 +273,9 @@ class AggregateTests(unittest.TestCase): b blob ) """) - cur.execute("insert into test(t, i, f, n, b) values (?, ?, ?, ?, ?)", - ("foo", 5, 3.14, None, memoryview(b"blob"),)) + with test_support.check_py3k_warnings(): + cur.execute("insert into test(t, i, f, n, b) values (?, ?, ?, ?, ?)", + ("foo", 5, 3.14, None, buffer("blob"),)) self.con.create_aggregate("nostep", 1, AggrNoStep) self.con.create_aggregate("nofinalize", 1, AggrNoFinalize) @@ -321,7 +283,6 @@ class AggregateTests(unittest.TestCase): self.con.create_aggregate("excStep", 1, AggrExceptionInStep) self.con.create_aggregate("excFinalize", 1, AggrExceptionInFinalize) self.con.create_aggregate("checkType", 2, AggrCheckType) - self.con.create_aggregate("checkTypes", -1, AggrCheckTypes) self.con.create_aggregate("mysum", 1, AggrSum) def tearDown(self): @@ -330,42 +291,55 @@ class AggregateTests(unittest.TestCase): pass def CheckAggrErrorOnCreate(self): - with self.assertRaises(sqlite.OperationalError): + try: self.con.create_function("bla", -100, AggrSum) + self.fail("should have raised an OperationalError") + except sqlite.OperationalError: + pass def CheckAggrNoStep(self): cur = self.con.cursor() - with self.assertRaises(AttributeError) as cm: + try: cur.execute("select nostep(t) from test") - self.assertEqual(str(cm.exception), "'AggrNoStep' object has no attribute 'step'") + self.fail("should have raised an AttributeError") + except AttributeError, e: + self.assertEqual(e.args[0], "AggrNoStep instance has no attribute 'step'") def CheckAggrNoFinalize(self): cur = self.con.cursor() - with self.assertRaises(sqlite.OperationalError) as cm: + try: cur.execute("select nofinalize(t) from test") val = cur.fetchone()[0] - self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error") + self.fail("should have raised an OperationalError") + except sqlite.OperationalError, e: + self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error") def CheckAggrExceptionInInit(self): cur = self.con.cursor() - with self.assertRaises(sqlite.OperationalError) as cm: + try: cur.execute("select excInit(t) from test") val = cur.fetchone()[0] - self.assertEqual(str(cm.exception), "user-defined aggregate's '__init__' method raised error") + self.fail("should have raised an OperationalError") + except sqlite.OperationalError, e: + self.assertEqual(e.args[0], "user-defined aggregate's '__init__' method raised error") def CheckAggrExceptionInStep(self): cur = self.con.cursor() - with self.assertRaises(sqlite.OperationalError) as cm: + try: cur.execute("select excStep(t) from test") val = cur.fetchone()[0] - self.assertEqual(str(cm.exception), "user-defined aggregate's 'step' method raised error") + self.fail("should have raised an OperationalError") + except sqlite.OperationalError, e: + self.assertEqual(e.args[0], "user-defined aggregate's 'step' method raised error") def CheckAggrExceptionInFinalize(self): cur = self.con.cursor() - with self.assertRaises(sqlite.OperationalError) as cm: + try: cur.execute("select excFinalize(t) from test") val = cur.fetchone()[0] - self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error") + self.fail("should have raised an OperationalError") + except sqlite.OperationalError, e: + self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error") def CheckAggrCheckParamStr(self): cur = self.con.cursor() @@ -379,12 +353,6 @@ class AggregateTests(unittest.TestCase): val = cur.fetchone()[0] self.assertEqual(val, 1) - def CheckAggrCheckParamsInt(self): - cur = self.con.cursor() - cur.execute("select checkTypes('int', ?, ?)", (42, 24)) - val = cur.fetchone()[0] - self.assertEqual(val, 2) - def CheckAggrCheckParamFloat(self): cur = self.con.cursor() cur.execute("select checkType('float', ?)", (3.14,)) @@ -399,7 +367,8 @@ class AggregateTests(unittest.TestCase): def CheckAggrCheckParamBlob(self): cur = self.con.cursor() - cur.execute("select checkType('blob', ?)", (memoryview(b"blob"),)) + with test_support.check_py3k_warnings(): + cur.execute("select checkType('blob', ?)", (buffer("blob"),)) val = cur.fetchone()[0] self.assertEqual(val, 1) @@ -438,14 +407,22 @@ class AuthorizerTests(unittest.TestCase): pass def test_table_access(self): - with self.assertRaises(sqlite.DatabaseError) as cm: + try: self.con.execute("select * from t2") - self.assertIn('prohibited', str(cm.exception)) + except sqlite.DatabaseError, e: + if not e.args[0].endswith("prohibited"): + self.fail("wrong exception text: %s" % e.args[0]) + return + self.fail("should have raised an exception due to missing privileges") def test_column_access(self): - with self.assertRaises(sqlite.DatabaseError) as cm: + try: self.con.execute("select c2 from t1") - self.assertIn('prohibited', str(cm.exception)) + except sqlite.DatabaseError, e: + if not e.args[0].endswith("prohibited"): + self.fail("wrong exception text: %s" % e.args[0]) + return + self.fail("should have raised an exception due to missing privileges") class AuthorizerRaiseExceptionTests(AuthorizerTests): @staticmethod |