summaryrefslogtreecommitdiffstats
path: root/Lib/sqlite3/test
diff options
context:
space:
mode:
authorGerhard Häring <gh@ghaering.de>2006-06-13 22:24:47 (GMT)
committerGerhard Häring <gh@ghaering.de>2006-06-13 22:24:47 (GMT)
commit1541ef08afda1da09ec99cfb5305fdc0af04ef40 (patch)
tree5ece2f7b0508920b06f4f6c001066418c483cb87 /Lib/sqlite3/test
parentea3912b0da71e16b8a37e04fcf3969dc85c27fa1 (diff)
downloadcpython-1541ef08afda1da09ec99cfb5305fdc0af04ef40.zip
cpython-1541ef08afda1da09ec99cfb5305fdc0af04ef40.tar.gz
cpython-1541ef08afda1da09ec99cfb5305fdc0af04ef40.tar.bz2
Merged changes from external pysqlite 2.3.0 release. Documentation updates will
follow in a few hours at the latest. Then we should be ready for beta1.
Diffstat (limited to 'Lib/sqlite3/test')
-rw-r--r--Lib/sqlite3/test/regression.py8
-rw-r--r--Lib/sqlite3/test/types.py30
-rw-r--r--Lib/sqlite3/test/userfunctions.py108
3 files changed, 109 insertions, 37 deletions
diff --git a/Lib/sqlite3/test/regression.py b/Lib/sqlite3/test/regression.py
index 25e4b63..c8733b9 100644
--- a/Lib/sqlite3/test/regression.py
+++ b/Lib/sqlite3/test/regression.py
@@ -61,6 +61,14 @@ class RegressionTests(unittest.TestCase):
con.rollback()
+ def CheckColumnNameWithSpaces(self):
+ cur = self.con.cursor()
+ cur.execute('select 1 as "foo bar [datetime]"')
+ self.failUnlessEqual(cur.description[0][0], "foo bar")
+
+ cur.execute('select 1 as "foo baz"')
+ self.failUnlessEqual(cur.description[0][0], "foo baz")
+
def suite():
regression_suite = unittest.makeSuite(RegressionTests, "Check")
return unittest.TestSuite((regression_suite,))
diff --git a/Lib/sqlite3/test/types.py b/Lib/sqlite3/test/types.py
index e49f7dd..c238ef9 100644
--- a/Lib/sqlite3/test/types.py
+++ b/Lib/sqlite3/test/types.py
@@ -101,16 +101,16 @@ class DeclTypesTests(unittest.TestCase):
self.cur.execute("create table test(i int, s str, f float, b bool, u unicode, foo foo, bin blob)")
# override float, make them always return the same number
- sqlite.converters["float"] = lambda x: 47.2
+ sqlite.converters["FLOAT"] = lambda x: 47.2
# and implement two custom ones
- sqlite.converters["bool"] = lambda x: bool(int(x))
- sqlite.converters["foo"] = DeclTypesTests.Foo
+ sqlite.converters["BOOL"] = lambda x: bool(int(x))
+ sqlite.converters["FOO"] = DeclTypesTests.Foo
def tearDown(self):
- del sqlite.converters["float"]
- del sqlite.converters["bool"]
- del sqlite.converters["foo"]
+ del sqlite.converters["FLOAT"]
+ del sqlite.converters["BOOL"]
+ del sqlite.converters["FOO"]
self.cur.close()
self.con.close()
@@ -208,14 +208,14 @@ class ColNamesTests(unittest.TestCase):
self.cur = self.con.cursor()
self.cur.execute("create table test(x foo)")
- sqlite.converters["foo"] = lambda x: "[%s]" % x
- sqlite.converters["bar"] = lambda x: "<%s>" % x
- sqlite.converters["exc"] = lambda x: 5/0
+ sqlite.converters["FOO"] = lambda x: "[%s]" % x
+ sqlite.converters["BAR"] = lambda x: "<%s>" % x
+ sqlite.converters["EXC"] = lambda x: 5/0
def tearDown(self):
- del sqlite.converters["foo"]
- del sqlite.converters["bar"]
- del sqlite.converters["exc"]
+ del sqlite.converters["FOO"]
+ del sqlite.converters["BAR"]
+ del sqlite.converters["EXC"]
self.cur.close()
self.con.close()
@@ -231,12 +231,6 @@ class ColNamesTests(unittest.TestCase):
val = self.cur.fetchone()[0]
self.failUnlessEqual(val, None)
- def CheckExc(self):
- # Exceptions in type converters result in returned Nones
- self.cur.execute('select 5 as "x [exc]"')
- val = self.cur.fetchone()[0]
- self.failUnlessEqual(val, None)
-
def CheckColName(self):
self.cur.execute("insert into test(x) values (?)", ("xxx",))
self.cur.execute('select x as "x [bar]" from test')
diff --git a/Lib/sqlite3/test/userfunctions.py b/Lib/sqlite3/test/userfunctions.py
index 78656e7..bcc5ae3 100644
--- a/Lib/sqlite3/test/userfunctions.py
+++ b/Lib/sqlite3/test/userfunctions.py
@@ -55,6 +55,9 @@ class AggrNoStep:
def __init__(self):
pass
+ def finalize(self):
+ return 1
+
class AggrNoFinalize:
def __init__(self):
pass
@@ -144,9 +147,12 @@ class FunctionTests(unittest.TestCase):
def CheckFuncRefCount(self):
def getfunc():
def f():
- return val
+ return 1
return f
- self.con.create_function("reftest", 0, getfunc())
+ f = getfunc()
+ globals()["foo"] = f
+ # self.con.create_function("reftest", 0, getfunc())
+ self.con.create_function("reftest", 0, f)
cur = self.con.cursor()
cur.execute("select reftest()")
@@ -195,9 +201,12 @@ class FunctionTests(unittest.TestCase):
def CheckFuncException(self):
cur = self.con.cursor()
- cur.execute("select raiseexception()")
- val = cur.fetchone()[0]
- self.failUnlessEqual(val, None)
+ try:
+ cur.execute("select raiseexception()")
+ cur.fetchone()
+ self.fail("should have raised OperationalError")
+ except sqlite.OperationalError, e:
+ self.failUnlessEqual(e.args[0], 'user-defined function raised exception')
def CheckParamString(self):
cur = self.con.cursor()
@@ -267,31 +276,47 @@ class AggregateTests(unittest.TestCase):
def CheckAggrNoStep(self):
cur = self.con.cursor()
- cur.execute("select nostep(t) from test")
+ try:
+ cur.execute("select nostep(t) from test")
+ self.fail("should have raised an AttributeError")
+ except AttributeError, e:
+ self.failUnlessEqual(e.args[0], "AggrNoStep instance has no attribute 'step'")
def CheckAggrNoFinalize(self):
cur = self.con.cursor()
- cur.execute("select nofinalize(t) from test")
- val = cur.fetchone()[0]
- self.failUnlessEqual(val, None)
+ try:
+ cur.execute("select nofinalize(t) from test")
+ val = cur.fetchone()[0]
+ self.fail("should have raised an OperationalError")
+ except sqlite.OperationalError, e:
+ self.failUnlessEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error")
def CheckAggrExceptionInInit(self):
cur = self.con.cursor()
- cur.execute("select excInit(t) from test")
- val = cur.fetchone()[0]
- self.failUnlessEqual(val, None)
+ try:
+ cur.execute("select excInit(t) from test")
+ val = cur.fetchone()[0]
+ self.fail("should have raised an OperationalError")
+ except sqlite.OperationalError, e:
+ self.failUnlessEqual(e.args[0], "user-defined aggregate's '__init__' method raised error")
def CheckAggrExceptionInStep(self):
cur = self.con.cursor()
- cur.execute("select excStep(t) from test")
- val = cur.fetchone()[0]
- self.failUnlessEqual(val, 42)
+ try:
+ cur.execute("select excStep(t) from test")
+ val = cur.fetchone()[0]
+ self.fail("should have raised an OperationalError")
+ except sqlite.OperationalError, e:
+ self.failUnlessEqual(e.args[0], "user-defined aggregate's 'step' method raised error")
def CheckAggrExceptionInFinalize(self):
cur = self.con.cursor()
- cur.execute("select excFinalize(t) from test")
- val = cur.fetchone()[0]
- self.failUnlessEqual(val, None)
+ try:
+ cur.execute("select excFinalize(t) from test")
+ val = cur.fetchone()[0]
+ self.fail("should have raised an OperationalError")
+ except sqlite.OperationalError, e:
+ self.failUnlessEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error")
def CheckAggrCheckParamStr(self):
cur = self.con.cursor()
@@ -331,10 +356,55 @@ class AggregateTests(unittest.TestCase):
val = cur.fetchone()[0]
self.failUnlessEqual(val, 60)
+def authorizer_cb(action, arg1, arg2, dbname, source):
+ if action != sqlite.SQLITE_SELECT:
+ return sqlite.SQLITE_DENY
+ if arg2 == 'c2' or arg1 == 't2':
+ return sqlite.SQLITE_DENY
+ return sqlite.SQLITE_OK
+
+class AuthorizerTests(unittest.TestCase):
+ def setUp(self):
+ sqlite.enable_callback_tracebacks(1)
+ self.con = sqlite.connect(":memory:")
+ self.con.executescript("""
+ create table t1 (c1, c2);
+ create table t2 (c1, c2);
+ insert into t1 (c1, c2) values (1, 2);
+ insert into t2 (c1, c2) values (4, 5);
+ """)
+
+ # For our security test:
+ self.con.execute("select c2 from t2")
+
+ self.con.set_authorizer(authorizer_cb)
+
+ def tearDown(self):
+ pass
+
+ def CheckTableAccess(self):
+ try:
+ self.con.execute("select * from t2")
+ except sqlite.DatabaseError, e:
+ if not e.args[0].endswith("prohibited"):
+ self.fail("wrong exception text: %s" % e.args[0])
+ return
+ self.fail("should have raised an exception due to missing privileges")
+
+ def CheckColumnAccess(self):
+ try:
+ self.con.execute("select c2 from t1")
+ except sqlite.DatabaseError, e:
+ if not e.args[0].endswith("prohibited"):
+ self.fail("wrong exception text: %s" % e.args[0])
+ return
+ self.fail("should have raised an exception due to missing privileges")
+
def suite():
function_suite = unittest.makeSuite(FunctionTests, "Check")
aggregate_suite = unittest.makeSuite(AggregateTests, "Check")
- return unittest.TestSuite((function_suite, aggregate_suite))
+ authorizer_suite = unittest.makeSuite(AuthorizerTests, "Check")
+ return unittest.TestSuite((function_suite, aggregate_suite, authorizer_suite))
def test():
runner = unittest.TextTestRunner()