summaryrefslogtreecommitdiffstats
path: root/Lib/sqlite3/test/userfunctions.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/sqlite3/test/userfunctions.py')
-rw-r--r--Lib/sqlite3/test/userfunctions.py93
1 files changed, 49 insertions, 44 deletions
diff --git a/Lib/sqlite3/test/userfunctions.py b/Lib/sqlite3/test/userfunctions.py
index 69e2ec2..4075045 100644
--- a/Lib/sqlite3/test/userfunctions.py
+++ b/Lib/sqlite3/test/userfunctions.py
@@ -55,6 +55,9 @@ def func_isblob(v):
def func_islonglong(v):
return isinstance(v, int) and v >= 1<<31
+def func(*args):
+ return len(args)
+
class AggrNoStep:
def __init__(self):
pass
@@ -111,6 +114,19 @@ class AggrCheckType:
def finalize(self):
return self.val
+class AggrCheckTypes:
+ def __init__(self):
+ self.val = 0
+
+ def step(self, whichType, *vals):
+ theType = {"str": str, "int": int, "float": float, "None": type(None),
+ "blob": bytes}
+ for val in vals:
+ self.val += int(theType[whichType] is type(val))
+
+ def finalize(self):
+ return self.val
+
class AggrSum:
def __init__(self):
self.val = 0.0
@@ -140,16 +156,14 @@ class FunctionTests(unittest.TestCase):
self.con.create_function("isnone", 1, func_isnone)
self.con.create_function("isblob", 1, func_isblob)
self.con.create_function("islonglong", 1, func_islonglong)
+ self.con.create_function("spam", -1, func)
def tearDown(self):
self.con.close()
def CheckFuncErrorOnCreate(self):
- try:
+ with self.assertRaises(sqlite.OperationalError):
self.con.create_function("bla", -100, lambda x: 2*x)
- self.fail("should have raised an OperationalError")
- except sqlite.OperationalError:
- pass
def CheckFuncRefCount(self):
def getfunc():
@@ -214,12 +228,10 @@ class FunctionTests(unittest.TestCase):
def CheckFuncException(self):
cur = self.con.cursor()
- try:
+ with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select raiseexception()")
cur.fetchone()
- self.fail("should have raised OperationalError")
- except sqlite.OperationalError as e:
- self.assertEqual(e.args[0], 'user-defined function raised exception')
+ self.assertEqual(str(cm.exception), 'user-defined function raised exception')
def CheckParamString(self):
cur = self.con.cursor()
@@ -257,6 +269,13 @@ class FunctionTests(unittest.TestCase):
val = cur.fetchone()[0]
self.assertEqual(val, 1)
+ def CheckAnyArguments(self):
+ cur = self.con.cursor()
+ cur.execute("select spam(?, ?)", (1, 2))
+ val = cur.fetchone()[0]
+ self.assertEqual(val, 2)
+
+
class AggregateTests(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")
@@ -279,6 +298,7 @@ class AggregateTests(unittest.TestCase):
self.con.create_aggregate("excStep", 1, AggrExceptionInStep)
self.con.create_aggregate("excFinalize", 1, AggrExceptionInFinalize)
self.con.create_aggregate("checkType", 2, AggrCheckType)
+ self.con.create_aggregate("checkTypes", -1, AggrCheckTypes)
self.con.create_aggregate("mysum", 1, AggrSum)
def tearDown(self):
@@ -287,55 +307,42 @@ class AggregateTests(unittest.TestCase):
pass
def CheckAggrErrorOnCreate(self):
- try:
+ with self.assertRaises(sqlite.OperationalError):
self.con.create_function("bla", -100, AggrSum)
- self.fail("should have raised an OperationalError")
- except sqlite.OperationalError:
- pass
def CheckAggrNoStep(self):
cur = self.con.cursor()
- try:
+ with self.assertRaises(AttributeError) as cm:
cur.execute("select nostep(t) from test")
- self.fail("should have raised an AttributeError")
- except AttributeError as e:
- self.assertEqual(e.args[0], "'AggrNoStep' object has no attribute 'step'")
+ self.assertEqual(str(cm.exception), "'AggrNoStep' object has no attribute 'step'")
def CheckAggrNoFinalize(self):
cur = self.con.cursor()
- try:
+ with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select nofinalize(t) from test")
val = cur.fetchone()[0]
- self.fail("should have raised an OperationalError")
- except sqlite.OperationalError as e:
- self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error")
+ self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error")
def CheckAggrExceptionInInit(self):
cur = self.con.cursor()
- try:
+ with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select excInit(t) from test")
val = cur.fetchone()[0]
- self.fail("should have raised an OperationalError")
- except sqlite.OperationalError as e:
- self.assertEqual(e.args[0], "user-defined aggregate's '__init__' method raised error")
+ self.assertEqual(str(cm.exception), "user-defined aggregate's '__init__' method raised error")
def CheckAggrExceptionInStep(self):
cur = self.con.cursor()
- try:
+ with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select excStep(t) from test")
val = cur.fetchone()[0]
- self.fail("should have raised an OperationalError")
- except sqlite.OperationalError as e:
- self.assertEqual(e.args[0], "user-defined aggregate's 'step' method raised error")
+ self.assertEqual(str(cm.exception), "user-defined aggregate's 'step' method raised error")
def CheckAggrExceptionInFinalize(self):
cur = self.con.cursor()
- try:
+ with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select excFinalize(t) from test")
val = cur.fetchone()[0]
- self.fail("should have raised an OperationalError")
- except sqlite.OperationalError as e:
- self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error")
+ self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error")
def CheckAggrCheckParamStr(self):
cur = self.con.cursor()
@@ -349,6 +356,12 @@ class AggregateTests(unittest.TestCase):
val = cur.fetchone()[0]
self.assertEqual(val, 1)
+ def CheckAggrCheckParamsInt(self):
+ cur = self.con.cursor()
+ cur.execute("select checkTypes('int', ?, ?)", (42, 24))
+ val = cur.fetchone()[0]
+ self.assertEqual(val, 2)
+
def CheckAggrCheckParamFloat(self):
cur = self.con.cursor()
cur.execute("select checkType('float', ?)", (3.14,))
@@ -402,22 +415,14 @@ class AuthorizerTests(unittest.TestCase):
pass
def test_table_access(self):
- try:
+ with self.assertRaises(sqlite.DatabaseError) as cm:
self.con.execute("select * from t2")
- except sqlite.DatabaseError as e:
- if not e.args[0].endswith("prohibited"):
- self.fail("wrong exception text: %s" % e.args[0])
- return
- self.fail("should have raised an exception due to missing privileges")
+ self.assertIn('prohibited', str(cm.exception))
def test_column_access(self):
- try:
+ with self.assertRaises(sqlite.DatabaseError) as cm:
self.con.execute("select c2 from t1")
- except sqlite.DatabaseError as e:
- if not e.args[0].endswith("prohibited"):
- self.fail("wrong exception text: %s" % e.args[0])
- return
- self.fail("should have raised an exception due to missing privileges")
+ self.assertIn('prohibited', str(cm.exception))
class AuthorizerRaiseExceptionTests(AuthorizerTests):
@staticmethod