diff options
author | Serhiy Storchaka <storchaka@gmail.com> | 2021-08-08 05:49:44 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-08 05:49:44 (GMT) |
commit | 0eec6276fdcdde5221370d92b50ea95851760c72 (patch) | |
tree | 35fa0e56f83f404eb2120ec1963c6b5e15813d34 /Lib/sqlite3 | |
parent | ebecffdb6d5fffa4249f9a813f1fc1915926feb5 (diff) | |
download | cpython-0eec6276fdcdde5221370d92b50ea95851760c72.zip cpython-0eec6276fdcdde5221370d92b50ea95851760c72.tar.gz cpython-0eec6276fdcdde5221370d92b50ea95851760c72.tar.bz2 |
bpo-44859: Improve error handling in sqlite3 and and raise more accurate exceptions. (GH-27654)
* MemoryError is now raised instead of sqlite3.Warning when
memory is not enough for encoding a statement to UTF-8
in Connection.__call__() and Cursor.execute().
* UnicodEncodeError is now raised instead of sqlite3.Warning when
the statement contains surrogate characters
in Connection.__call__() and Cursor.execute().
* TypeError is now raised instead of ValueError for non-string
script argument in Cursor.executescript().
* ValueError is now raised for script containing the null
character instead of truncating it in Cursor.executescript().
* Correctly handle exceptions raised when getting boolean value
of the result of the progress handler.
* Add many tests covering different corner cases.
Co-authored-by: Erlend Egeberg Aasland <erlend.aasland@innova.no>
Diffstat (limited to 'Lib/sqlite3')
-rw-r--r-- | Lib/sqlite3/test/dbapi.py | 33 | ||||
-rw-r--r-- | Lib/sqlite3/test/hooks.py | 29 | ||||
-rw-r--r-- | Lib/sqlite3/test/regression.py | 23 | ||||
-rw-r--r-- | Lib/sqlite3/test/types.py | 52 | ||||
-rw-r--r-- | Lib/sqlite3/test/userfunctions.py | 48 |
5 files changed, 166 insertions, 19 deletions
diff --git a/Lib/sqlite3/test/dbapi.py b/Lib/sqlite3/test/dbapi.py index 408f994..5d7e5bb 100644 --- a/Lib/sqlite3/test/dbapi.py +++ b/Lib/sqlite3/test/dbapi.py @@ -26,7 +26,7 @@ import sys import threading import unittest -from test.support import check_disallow_instantiation, threading_helper +from test.support import check_disallow_instantiation, threading_helper, bigmemtest from test.support.os_helper import TESTFN, unlink @@ -758,9 +758,35 @@ class ExtensionTests(unittest.TestCase): def test_cursor_executescript_as_bytes(self): con = sqlite.connect(":memory:") cur = con.cursor() - with self.assertRaises(ValueError) as cm: + with self.assertRaises(TypeError): cur.executescript(b"create table test(foo); insert into test(foo) values (5);") - self.assertEqual(str(cm.exception), 'script argument must be unicode.') + + def test_cursor_executescript_with_null_characters(self): + con = sqlite.connect(":memory:") + cur = con.cursor() + with self.assertRaises(ValueError): + cur.executescript(""" + create table a(i);\0 + insert into a(i) values (5); + """) + + def test_cursor_executescript_with_surrogates(self): + con = sqlite.connect(":memory:") + cur = con.cursor() + with self.assertRaises(UnicodeEncodeError): + cur.executescript(""" + create table a(s); + insert into a(s) values ('\ud8ff'); + """) + + @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') + @bigmemtest(size=2**31, memuse=3, dry_run=False) + def test_cursor_executescript_too_large_script(self, maxsize): + con = sqlite.connect(":memory:") + cur = con.cursor() + for size in 2**31-1, 2**31: + with self.assertRaises(sqlite.DataError): + cur.executescript("create table a(s);".ljust(size)) def test_connection_execute(self): con = sqlite.connect(":memory:") @@ -969,6 +995,7 @@ def suite(): CursorTests, ExtensionTests, ModuleTests, + OpenTests, SqliteOnConflictTests, ThreadTests, UninitialisedConnectionTests, diff --git a/Lib/sqlite3/test/hooks.py b/Lib/sqlite3/test/hooks.py index 1be6d38..43e3810 100644 --- a/Lib/sqlite3/test/hooks.py +++ b/Lib/sqlite3/test/hooks.py @@ -24,7 +24,7 @@ import unittest import sqlite3 as sqlite from test.support.os_helper import TESTFN, unlink - +from .userfunctions import with_tracebacks class CollationTests(unittest.TestCase): def test_create_collation_not_string(self): @@ -145,7 +145,6 @@ class ProgressTests(unittest.TestCase): """) self.assertTrue(progress_calls) - def test_opcode_count(self): """ Test that the opcode argument is respected. @@ -198,6 +197,32 @@ class ProgressTests(unittest.TestCase): con.execute("select 1 union select 2 union select 3").fetchall() self.assertEqual(action, 0, "progress handler was not cleared") + @with_tracebacks(['bad_progress', 'ZeroDivisionError']) + def test_error_in_progress_handler(self): + con = sqlite.connect(":memory:") + def bad_progress(): + 1 / 0 + con.set_progress_handler(bad_progress, 1) + with self.assertRaises(sqlite.OperationalError): + con.execute(""" + create table foo(a, b) + """) + + @with_tracebacks(['__bool__', 'ZeroDivisionError']) + def test_error_in_progress_handler_result(self): + con = sqlite.connect(":memory:") + class BadBool: + def __bool__(self): + 1 / 0 + def bad_progress(): + return BadBool() + con.set_progress_handler(bad_progress, 1) + with self.assertRaises(sqlite.OperationalError): + con.execute(""" + create table foo(a, b) + """) + + class TraceCallbackTests(unittest.TestCase): def test_trace_callback_used(self): """ diff --git a/Lib/sqlite3/test/regression.py b/Lib/sqlite3/test/regression.py index 6c093d7..ddf36e7 100644 --- a/Lib/sqlite3/test/regression.py +++ b/Lib/sqlite3/test/regression.py @@ -21,6 +21,7 @@ # 3. This notice may not be removed or altered from any source distribution. import datetime +import sys import unittest import sqlite3 as sqlite import weakref @@ -273,7 +274,7 @@ class RegressionTests(unittest.TestCase): Call a connection with a non-string SQL request: check error handling of the statement constructor. """ - self.assertRaises(TypeError, self.con, 1) + self.assertRaises(TypeError, self.con, b"select 1") def test_collation(self): def collation_cb(a, b): @@ -344,6 +345,26 @@ class RegressionTests(unittest.TestCase): self.assertRaises(ValueError, cur.execute, " \0select 2") self.assertRaises(ValueError, cur.execute, "select 2\0") + def test_surrogates(self): + con = sqlite.connect(":memory:") + self.assertRaises(UnicodeEncodeError, con, "select '\ud8ff'") + self.assertRaises(UnicodeEncodeError, con, "select '\udcff'") + cur = con.cursor() + self.assertRaises(UnicodeEncodeError, cur.execute, "select '\ud8ff'") + self.assertRaises(UnicodeEncodeError, cur.execute, "select '\udcff'") + + @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') + @support.bigmemtest(size=2**31, memuse=4, dry_run=False) + def test_large_sql(self, maxsize): + # Test two cases: size+1 > INT_MAX and size+1 <= INT_MAX. + for size in (2**31, 2**31-2): + con = sqlite.connect(":memory:") + sql = "select 1".ljust(size) + self.assertRaises(sqlite.DataError, con, sql) + cur = con.cursor() + self.assertRaises(sqlite.DataError, cur.execute, sql) + del sql + def test_commit_cursor_reset(self): """ Connection.commit() did reset cursors, which made sqlite3 diff --git a/Lib/sqlite3/test/types.py b/Lib/sqlite3/test/types.py index 4f0e4f6..b8926ff 100644 --- a/Lib/sqlite3/test/types.py +++ b/Lib/sqlite3/test/types.py @@ -23,11 +23,14 @@ import datetime import unittest import sqlite3 as sqlite +import sys try: import zlib except ImportError: zlib = None +from test import support + class SqliteTypeTests(unittest.TestCase): def setUp(self): @@ -45,6 +48,12 @@ class SqliteTypeTests(unittest.TestCase): row = self.cur.fetchone() self.assertEqual(row[0], "Österreich") + def test_string_with_null_character(self): + self.cur.execute("insert into test(s) values (?)", ("a\0b",)) + self.cur.execute("select s from test") + row = self.cur.fetchone() + self.assertEqual(row[0], "a\0b") + def test_small_int(self): self.cur.execute("insert into test(i) values (?)", (42,)) self.cur.execute("select i from test") @@ -52,7 +61,7 @@ class SqliteTypeTests(unittest.TestCase): self.assertEqual(row[0], 42) def test_large_int(self): - num = 2**40 + num = 123456789123456789 self.cur.execute("insert into test(i) values (?)", (num,)) self.cur.execute("select i from test") row = self.cur.fetchone() @@ -78,6 +87,45 @@ class SqliteTypeTests(unittest.TestCase): row = self.cur.fetchone() self.assertEqual(row[0], "Österreich") + def test_too_large_int(self): + for value in 2**63, -2**63-1, 2**64: + with self.assertRaises(OverflowError): + self.cur.execute("insert into test(i) values (?)", (value,)) + self.cur.execute("select i from test") + row = self.cur.fetchone() + self.assertIsNone(row) + + def test_string_with_surrogates(self): + for value in 0xd8ff, 0xdcff: + with self.assertRaises(UnicodeEncodeError): + self.cur.execute("insert into test(s) values (?)", (chr(value),)) + self.cur.execute("select s from test") + row = self.cur.fetchone() + self.assertIsNone(row) + + @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') + @support.bigmemtest(size=2**31, memuse=4, dry_run=False) + def test_too_large_string(self, maxsize): + with self.assertRaises(sqlite.InterfaceError): + self.cur.execute("insert into test(s) values (?)", ('x'*(2**31-1),)) + with self.assertRaises(OverflowError): + self.cur.execute("insert into test(s) values (?)", ('x'*(2**31),)) + self.cur.execute("select 1 from test") + row = self.cur.fetchone() + self.assertIsNone(row) + + @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') + @support.bigmemtest(size=2**31, memuse=3, dry_run=False) + def test_too_large_blob(self, maxsize): + with self.assertRaises(sqlite.InterfaceError): + self.cur.execute("insert into test(s) values (?)", (b'x'*(2**31-1),)) + with self.assertRaises(OverflowError): + self.cur.execute("insert into test(s) values (?)", (b'x'*(2**31),)) + self.cur.execute("select 1 from test") + row = self.cur.fetchone() + self.assertIsNone(row) + + class DeclTypesTests(unittest.TestCase): class Foo: def __init__(self, _val): @@ -163,7 +211,7 @@ class DeclTypesTests(unittest.TestCase): def test_large_int(self): # default - num = 2**40 + num = 123456789123456789 self.cur.execute("insert into test(i) values (?)", (num,)) self.cur.execute("select i from test") row = self.cur.fetchone() diff --git a/Lib/sqlite3/test/userfunctions.py b/Lib/sqlite3/test/userfunctions.py index 9681dbd..b4d5181 100644 --- a/Lib/sqlite3/test/userfunctions.py +++ b/Lib/sqlite3/test/userfunctions.py @@ -33,28 +33,37 @@ import sqlite3 as sqlite from test.support import bigmemtest -def with_tracebacks(strings): +def with_tracebacks(strings, traceback=True): """Convenience decorator for testing callback tracebacks.""" - strings.append('Traceback') + if traceback: + strings.append('Traceback') def decorator(func): @functools.wraps(func) def wrapper(self, *args, **kwargs): # First, run the test with traceback enabled. - sqlite.enable_callback_tracebacks(True) - buf = io.StringIO() - with contextlib.redirect_stderr(buf): + with check_tracebacks(self, strings): func(self, *args, **kwargs) - tb = buf.getvalue() - for s in strings: - self.assertIn(s, tb) # Then run the test with traceback disabled. - sqlite.enable_callback_tracebacks(False) func(self, *args, **kwargs) return wrapper return decorator +@contextlib.contextmanager +def check_tracebacks(self, strings): + """Convenience context manager for testing callback tracebacks.""" + sqlite.enable_callback_tracebacks(True) + try: + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + yield + tb = buf.getvalue() + for s in strings: + self.assertIn(s, tb) + finally: + sqlite.enable_callback_tracebacks(False) + def func_returntext(): return "foo" def func_returntextwithnull(): @@ -408,9 +417,26 @@ class FunctionTests(unittest.TestCase): del x,y gc.collect() + def test_func_return_too_large_int(self): + cur = self.con.cursor() + for value in 2**63, -2**63-1, 2**64: + self.con.create_function("largeint", 0, lambda value=value: value) + with check_tracebacks(self, ['OverflowError']): + with self.assertRaises(sqlite.DataError): + cur.execute("select largeint()") + + def test_func_return_text_with_surrogates(self): + cur = self.con.cursor() + self.con.create_function("pychr", 1, chr) + for value in 0xd8ff, 0xdcff: + with check_tracebacks(self, + ['UnicodeEncodeError', 'surrogates not allowed']): + with self.assertRaises(sqlite.OperationalError): + cur.execute("select pychr(?)", (value,)) + @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') @bigmemtest(size=2**31, memuse=3, dry_run=False) - def test_large_text(self, size): + def test_func_return_too_large_text(self, size): cur = self.con.cursor() for size in 2**31-1, 2**31: self.con.create_function("largetext", 0, lambda size=size: "b" * size) @@ -419,7 +445,7 @@ class FunctionTests(unittest.TestCase): @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') @bigmemtest(size=2**31, memuse=2, dry_run=False) - def test_large_blob(self, size): + def test_func_return_too_large_blob(self, size): cur = self.con.cursor() for size in 2**31-1, 2**31: self.con.create_function("largeblob", 0, lambda size=size: b"b" * size) |