diff options
Diffstat (limited to 'Lib/sqlite3/test/userfunctions.py')
-rw-r--r-- | Lib/sqlite3/test/userfunctions.py | 93 |
1 files changed, 49 insertions, 44 deletions
diff --git a/Lib/sqlite3/test/userfunctions.py b/Lib/sqlite3/test/userfunctions.py index 69e2ec2..4075045 100644 --- a/Lib/sqlite3/test/userfunctions.py +++ b/Lib/sqlite3/test/userfunctions.py @@ -55,6 +55,9 @@ def func_isblob(v): def func_islonglong(v): return isinstance(v, int) and v >= 1<<31 +def func(*args): + return len(args) + class AggrNoStep: def __init__(self): pass @@ -111,6 +114,19 @@ class AggrCheckType: def finalize(self): return self.val +class AggrCheckTypes: + def __init__(self): + self.val = 0 + + def step(self, whichType, *vals): + theType = {"str": str, "int": int, "float": float, "None": type(None), + "blob": bytes} + for val in vals: + self.val += int(theType[whichType] is type(val)) + + def finalize(self): + return self.val + class AggrSum: def __init__(self): self.val = 0.0 @@ -140,16 +156,14 @@ class FunctionTests(unittest.TestCase): self.con.create_function("isnone", 1, func_isnone) self.con.create_function("isblob", 1, func_isblob) self.con.create_function("islonglong", 1, func_islonglong) + self.con.create_function("spam", -1, func) def tearDown(self): self.con.close() def CheckFuncErrorOnCreate(self): - try: + with self.assertRaises(sqlite.OperationalError): self.con.create_function("bla", -100, lambda x: 2*x) - self.fail("should have raised an OperationalError") - except sqlite.OperationalError: - pass def CheckFuncRefCount(self): def getfunc(): @@ -214,12 +228,10 @@ class FunctionTests(unittest.TestCase): def CheckFuncException(self): cur = self.con.cursor() - try: + with self.assertRaises(sqlite.OperationalError) as cm: cur.execute("select raiseexception()") cur.fetchone() - self.fail("should have raised OperationalError") - except sqlite.OperationalError as e: - self.assertEqual(e.args[0], 'user-defined function raised exception') + self.assertEqual(str(cm.exception), 'user-defined function raised exception') def CheckParamString(self): cur = self.con.cursor() @@ -257,6 +269,13 @@ class FunctionTests(unittest.TestCase): val = cur.fetchone()[0] self.assertEqual(val, 1) + def CheckAnyArguments(self): + cur = self.con.cursor() + cur.execute("select spam(?, ?)", (1, 2)) + val = cur.fetchone()[0] + self.assertEqual(val, 2) + + class AggregateTests(unittest.TestCase): def setUp(self): self.con = sqlite.connect(":memory:") @@ -279,6 +298,7 @@ class AggregateTests(unittest.TestCase): self.con.create_aggregate("excStep", 1, AggrExceptionInStep) self.con.create_aggregate("excFinalize", 1, AggrExceptionInFinalize) self.con.create_aggregate("checkType", 2, AggrCheckType) + self.con.create_aggregate("checkTypes", -1, AggrCheckTypes) self.con.create_aggregate("mysum", 1, AggrSum) def tearDown(self): @@ -287,55 +307,42 @@ class AggregateTests(unittest.TestCase): pass def CheckAggrErrorOnCreate(self): - try: + with self.assertRaises(sqlite.OperationalError): self.con.create_function("bla", -100, AggrSum) - self.fail("should have raised an OperationalError") - except sqlite.OperationalError: - pass def CheckAggrNoStep(self): cur = self.con.cursor() - try: + with self.assertRaises(AttributeError) as cm: cur.execute("select nostep(t) from test") - self.fail("should have raised an AttributeError") - except AttributeError as e: - self.assertEqual(e.args[0], "'AggrNoStep' object has no attribute 'step'") + self.assertEqual(str(cm.exception), "'AggrNoStep' object has no attribute 'step'") def CheckAggrNoFinalize(self): cur = self.con.cursor() - try: + with self.assertRaises(sqlite.OperationalError) as cm: cur.execute("select nofinalize(t) from test") val = cur.fetchone()[0] - self.fail("should have raised an OperationalError") - except sqlite.OperationalError as e: - self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error") + self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error") def CheckAggrExceptionInInit(self): cur = self.con.cursor() - try: + with self.assertRaises(sqlite.OperationalError) as cm: cur.execute("select excInit(t) from test") val = cur.fetchone()[0] - self.fail("should have raised an OperationalError") - except sqlite.OperationalError as e: - self.assertEqual(e.args[0], "user-defined aggregate's '__init__' method raised error") + self.assertEqual(str(cm.exception), "user-defined aggregate's '__init__' method raised error") def CheckAggrExceptionInStep(self): cur = self.con.cursor() - try: + with self.assertRaises(sqlite.OperationalError) as cm: cur.execute("select excStep(t) from test") val = cur.fetchone()[0] - self.fail("should have raised an OperationalError") - except sqlite.OperationalError as e: - self.assertEqual(e.args[0], "user-defined aggregate's 'step' method raised error") + self.assertEqual(str(cm.exception), "user-defined aggregate's 'step' method raised error") def CheckAggrExceptionInFinalize(self): cur = self.con.cursor() - try: + with self.assertRaises(sqlite.OperationalError) as cm: cur.execute("select excFinalize(t) from test") val = cur.fetchone()[0] - self.fail("should have raised an OperationalError") - except sqlite.OperationalError as e: - self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error") + self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error") def CheckAggrCheckParamStr(self): cur = self.con.cursor() @@ -349,6 +356,12 @@ class AggregateTests(unittest.TestCase): val = cur.fetchone()[0] self.assertEqual(val, 1) + def CheckAggrCheckParamsInt(self): + cur = self.con.cursor() + cur.execute("select checkTypes('int', ?, ?)", (42, 24)) + val = cur.fetchone()[0] + self.assertEqual(val, 2) + def CheckAggrCheckParamFloat(self): cur = self.con.cursor() cur.execute("select checkType('float', ?)", (3.14,)) @@ -402,22 +415,14 @@ class AuthorizerTests(unittest.TestCase): pass def test_table_access(self): - try: + with self.assertRaises(sqlite.DatabaseError) as cm: self.con.execute("select * from t2") - except sqlite.DatabaseError as e: - if not e.args[0].endswith("prohibited"): - self.fail("wrong exception text: %s" % e.args[0]) - return - self.fail("should have raised an exception due to missing privileges") + self.assertIn('prohibited', str(cm.exception)) def test_column_access(self): - try: + with self.assertRaises(sqlite.DatabaseError) as cm: self.con.execute("select c2 from t1") - except sqlite.DatabaseError as e: - if not e.args[0].endswith("prohibited"): - self.fail("wrong exception text: %s" % e.args[0]) - return - self.fail("should have raised an exception due to missing privileges") + self.assertIn('prohibited', str(cm.exception)) class AuthorizerRaiseExceptionTests(AuthorizerTests): @staticmethod |