summaryrefslogtreecommitdiffstats
path: root/Lib/sqlite3/test/userfunctions.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/sqlite3/test/userfunctions.py')
-rw-r--r--Lib/sqlite3/test/userfunctions.py163
1 files changed, 70 insertions, 93 deletions
diff --git a/Lib/sqlite3/test/userfunctions.py b/Lib/sqlite3/test/userfunctions.py
index 9501f53..1d19151 100644
--- a/Lib/sqlite3/test/userfunctions.py
+++ b/Lib/sqlite3/test/userfunctions.py
@@ -1,4 +1,4 @@
-#-*- coding: iso-8859-1 -*-
+#-*- coding: ISO-8859-1 -*-
# pysqlite2/test/userfunctions.py: tests for user-defined functions and
# aggregates.
#
@@ -23,13 +23,13 @@
# 3. This notice may not be removed or altered from any source distribution.
import unittest
-import unittest.mock
import sqlite3 as sqlite
+from test import test_support
def func_returntext():
return "foo"
def func_returnunicode():
- return "bar"
+ return u"bar"
def func_returnint():
return 42
def func_returnfloat():
@@ -37,14 +37,15 @@ def func_returnfloat():
def func_returnnull():
return None
def func_returnblob():
- return b"blob"
+ with test_support.check_py3k_warnings():
+ return buffer("blob")
def func_returnlonglong():
return 1<<31
def func_raiseexception():
- 5/0
+ 5 // 0
def func_isstring(v):
- return type(v) is str
+ return type(v) is unicode
def func_isint(v):
return type(v) is int
def func_isfloat(v):
@@ -52,12 +53,9 @@ def func_isfloat(v):
def func_isnone(v):
return type(v) is type(None)
def func_isblob(v):
- return isinstance(v, (bytes, memoryview))
+ return type(v) is buffer
def func_islonglong(v):
- return isinstance(v, int) and v >= 1<<31
-
-def func(*args):
- return len(args)
+ return isinstance(v, (int, long)) and v >= 1<<31
class AggrNoStep:
def __init__(self):
@@ -75,7 +73,7 @@ class AggrNoFinalize:
class AggrExceptionInInit:
def __init__(self):
- 5/0
+ 5 // 0
def step(self, x):
pass
@@ -88,7 +86,7 @@ class AggrExceptionInStep:
pass
def step(self, x):
- 5/0
+ 5 // 0
def finalize(self):
return 42
@@ -101,33 +99,19 @@ class AggrExceptionInFinalize:
pass
def finalize(self):
- 5/0
+ 5 // 0
class AggrCheckType:
def __init__(self):
self.val = None
def step(self, whichType, val):
- theType = {"str": str, "int": int, "float": float, "None": type(None),
- "blob": bytes}
+ theType = {"str": unicode, "int": int, "float": float, "None": type(None), "blob": buffer}
self.val = int(theType[whichType] is type(val))
def finalize(self):
return self.val
-class AggrCheckTypes:
- def __init__(self):
- self.val = 0
-
- def step(self, whichType, *vals):
- theType = {"str": str, "int": int, "float": float, "None": type(None),
- "blob": bytes}
- for val in vals:
- self.val += int(theType[whichType] is type(val))
-
- def finalize(self):
- return self.val
-
class AggrSum:
def __init__(self):
self.val = 0.0
@@ -157,14 +141,16 @@ class FunctionTests(unittest.TestCase):
self.con.create_function("isnone", 1, func_isnone)
self.con.create_function("isblob", 1, func_isblob)
self.con.create_function("islonglong", 1, func_islonglong)
- self.con.create_function("spam", -1, func)
def tearDown(self):
self.con.close()
def CheckFuncErrorOnCreate(self):
- with self.assertRaises(sqlite.OperationalError):
+ try:
self.con.create_function("bla", -100, lambda x: 2*x)
+ self.fail("should have raised an OperationalError")
+ except sqlite.OperationalError:
+ pass
def CheckFuncRefCount(self):
def getfunc():
@@ -182,15 +168,15 @@ class FunctionTests(unittest.TestCase):
cur = self.con.cursor()
cur.execute("select returntext()")
val = cur.fetchone()[0]
- self.assertEqual(type(val), str)
+ self.assertEqual(type(val), unicode)
self.assertEqual(val, "foo")
def CheckFuncReturnUnicode(self):
cur = self.con.cursor()
cur.execute("select returnunicode()")
val = cur.fetchone()[0]
- self.assertEqual(type(val), str)
- self.assertEqual(val, "bar")
+ self.assertEqual(type(val), unicode)
+ self.assertEqual(val, u"bar")
def CheckFuncReturnInt(self):
cur = self.con.cursor()
@@ -218,8 +204,9 @@ class FunctionTests(unittest.TestCase):
cur = self.con.cursor()
cur.execute("select returnblob()")
val = cur.fetchone()[0]
- self.assertEqual(type(val), bytes)
- self.assertEqual(val, b"blob")
+ with test_support.check_py3k_warnings():
+ self.assertEqual(type(val), buffer)
+ self.assertEqual(val, buffer("blob"))
def CheckFuncReturnLongLong(self):
cur = self.con.cursor()
@@ -229,10 +216,12 @@ class FunctionTests(unittest.TestCase):
def CheckFuncException(self):
cur = self.con.cursor()
- with self.assertRaises(sqlite.OperationalError) as cm:
+ try:
cur.execute("select raiseexception()")
cur.fetchone()
- self.assertEqual(str(cm.exception), 'user-defined function raised exception')
+ self.fail("should have raised OperationalError")
+ except sqlite.OperationalError, e:
+ self.assertEqual(e.args[0], 'user-defined function raised exception')
def CheckParamString(self):
cur = self.con.cursor()
@@ -260,7 +249,8 @@ class FunctionTests(unittest.TestCase):
def CheckParamBlob(self):
cur = self.con.cursor()
- cur.execute("select isblob(?)", (memoryview(b"blob"),))
+ with test_support.check_py3k_warnings():
+ cur.execute("select isblob(?)", (buffer("blob"),))
val = cur.fetchone()[0]
self.assertEqual(val, 1)
@@ -270,35 +260,6 @@ class FunctionTests(unittest.TestCase):
val = cur.fetchone()[0]
self.assertEqual(val, 1)
- def CheckAnyArguments(self):
- cur = self.con.cursor()
- cur.execute("select spam(?, ?)", (1, 2))
- val = cur.fetchone()[0]
- self.assertEqual(val, 2)
-
- def CheckFuncNonDeterministic(self):
- mock = unittest.mock.Mock(return_value=None)
- self.con.create_function("deterministic", 0, mock, deterministic=False)
- self.con.execute("select deterministic() = deterministic()")
- self.assertEqual(mock.call_count, 2)
-
- @unittest.skipIf(sqlite.sqlite_version_info < (3, 8, 3), "deterministic parameter not supported")
- def CheckFuncDeterministic(self):
- mock = unittest.mock.Mock(return_value=None)
- self.con.create_function("deterministic", 0, mock, deterministic=True)
- self.con.execute("select deterministic() = deterministic()")
- self.assertEqual(mock.call_count, 1)
-
- @unittest.skipIf(sqlite.sqlite_version_info >= (3, 8, 3), "SQLite < 3.8.3 needed")
- def CheckFuncDeterministicNotSupported(self):
- with self.assertRaises(sqlite.NotSupportedError):
- self.con.create_function("deterministic", 0, int, deterministic=True)
-
- def CheckFuncDeterministicKeywordOnly(self):
- with self.assertRaises(TypeError):
- self.con.create_function("deterministic", 0, int, True)
-
-
class AggregateTests(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")
@@ -312,8 +273,9 @@ class AggregateTests(unittest.TestCase):
b blob
)
""")
- cur.execute("insert into test(t, i, f, n, b) values (?, ?, ?, ?, ?)",
- ("foo", 5, 3.14, None, memoryview(b"blob"),))
+ with test_support.check_py3k_warnings():
+ cur.execute("insert into test(t, i, f, n, b) values (?, ?, ?, ?, ?)",
+ ("foo", 5, 3.14, None, buffer("blob"),))
self.con.create_aggregate("nostep", 1, AggrNoStep)
self.con.create_aggregate("nofinalize", 1, AggrNoFinalize)
@@ -321,7 +283,6 @@ class AggregateTests(unittest.TestCase):
self.con.create_aggregate("excStep", 1, AggrExceptionInStep)
self.con.create_aggregate("excFinalize", 1, AggrExceptionInFinalize)
self.con.create_aggregate("checkType", 2, AggrCheckType)
- self.con.create_aggregate("checkTypes", -1, AggrCheckTypes)
self.con.create_aggregate("mysum", 1, AggrSum)
def tearDown(self):
@@ -330,42 +291,55 @@ class AggregateTests(unittest.TestCase):
pass
def CheckAggrErrorOnCreate(self):
- with self.assertRaises(sqlite.OperationalError):
+ try:
self.con.create_function("bla", -100, AggrSum)
+ self.fail("should have raised an OperationalError")
+ except sqlite.OperationalError:
+ pass
def CheckAggrNoStep(self):
cur = self.con.cursor()
- with self.assertRaises(AttributeError) as cm:
+ try:
cur.execute("select nostep(t) from test")
- self.assertEqual(str(cm.exception), "'AggrNoStep' object has no attribute 'step'")
+ self.fail("should have raised an AttributeError")
+ except AttributeError, e:
+ self.assertEqual(e.args[0], "AggrNoStep instance has no attribute 'step'")
def CheckAggrNoFinalize(self):
cur = self.con.cursor()
- with self.assertRaises(sqlite.OperationalError) as cm:
+ try:
cur.execute("select nofinalize(t) from test")
val = cur.fetchone()[0]
- self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error")
+ self.fail("should have raised an OperationalError")
+ except sqlite.OperationalError, e:
+ self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error")
def CheckAggrExceptionInInit(self):
cur = self.con.cursor()
- with self.assertRaises(sqlite.OperationalError) as cm:
+ try:
cur.execute("select excInit(t) from test")
val = cur.fetchone()[0]
- self.assertEqual(str(cm.exception), "user-defined aggregate's '__init__' method raised error")
+ self.fail("should have raised an OperationalError")
+ except sqlite.OperationalError, e:
+ self.assertEqual(e.args[0], "user-defined aggregate's '__init__' method raised error")
def CheckAggrExceptionInStep(self):
cur = self.con.cursor()
- with self.assertRaises(sqlite.OperationalError) as cm:
+ try:
cur.execute("select excStep(t) from test")
val = cur.fetchone()[0]
- self.assertEqual(str(cm.exception), "user-defined aggregate's 'step' method raised error")
+ self.fail("should have raised an OperationalError")
+ except sqlite.OperationalError, e:
+ self.assertEqual(e.args[0], "user-defined aggregate's 'step' method raised error")
def CheckAggrExceptionInFinalize(self):
cur = self.con.cursor()
- with self.assertRaises(sqlite.OperationalError) as cm:
+ try:
cur.execute("select excFinalize(t) from test")
val = cur.fetchone()[0]
- self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error")
+ self.fail("should have raised an OperationalError")
+ except sqlite.OperationalError, e:
+ self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error")
def CheckAggrCheckParamStr(self):
cur = self.con.cursor()
@@ -379,12 +353,6 @@ class AggregateTests(unittest.TestCase):
val = cur.fetchone()[0]
self.assertEqual(val, 1)
- def CheckAggrCheckParamsInt(self):
- cur = self.con.cursor()
- cur.execute("select checkTypes('int', ?, ?)", (42, 24))
- val = cur.fetchone()[0]
- self.assertEqual(val, 2)
-
def CheckAggrCheckParamFloat(self):
cur = self.con.cursor()
cur.execute("select checkType('float', ?)", (3.14,))
@@ -399,7 +367,8 @@ class AggregateTests(unittest.TestCase):
def CheckAggrCheckParamBlob(self):
cur = self.con.cursor()
- cur.execute("select checkType('blob', ?)", (memoryview(b"blob"),))
+ with test_support.check_py3k_warnings():
+ cur.execute("select checkType('blob', ?)", (buffer("blob"),))
val = cur.fetchone()[0]
self.assertEqual(val, 1)
@@ -438,14 +407,22 @@ class AuthorizerTests(unittest.TestCase):
pass
def test_table_access(self):
- with self.assertRaises(sqlite.DatabaseError) as cm:
+ try:
self.con.execute("select * from t2")
- self.assertIn('prohibited', str(cm.exception))
+ except sqlite.DatabaseError, e:
+ if not e.args[0].endswith("prohibited"):
+ self.fail("wrong exception text: %s" % e.args[0])
+ return
+ self.fail("should have raised an exception due to missing privileges")
def test_column_access(self):
- with self.assertRaises(sqlite.DatabaseError) as cm:
+ try:
self.con.execute("select c2 from t1")
- self.assertIn('prohibited', str(cm.exception))
+ except sqlite.DatabaseError, e:
+ if not e.args[0].endswith("prohibited"):
+ self.fail("wrong exception text: %s" % e.args[0])
+ return
+ self.fail("should have raised an exception due to missing privileges")
class AuthorizerRaiseExceptionTests(AuthorizerTests):
@staticmethod