summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_sqlite3/test_hooks.py
diff options
context:
space:
mode:
authorErlend Egeberg Aasland <erlend.aasland@innova.no>2021-10-29 22:08:19 (GMT)
committerGitHub <noreply@github.com>2021-10-29 22:08:19 (GMT)
commit62bf263a775f4444d8b5d5841cc09be3bd53e933 (patch)
treed92d1e79ebb70b5b5f92ab034aa45fecd2a368e4 /Lib/test/test_sqlite3/test_hooks.py
parentc2d0ba722a7b3839685af968cf0c304a24cdf525 (diff)
downloadcpython-62bf263a775f4444d8b5d5841cc09be3bd53e933.zip
cpython-62bf263a775f4444d8b5d5841cc09be3bd53e933.tar.gz
cpython-62bf263a775f4444d8b5d5841cc09be3bd53e933.tar.bz2
bpo-10572: Move `sqlite3` tests to `Lib/test` (GH-29304)
Automerge-Triggered-By: GH:brettcannon
Diffstat (limited to 'Lib/test/test_sqlite3/test_hooks.py')
-rw-r--r--Lib/test/test_sqlite3/test_hooks.py294
1 files changed, 294 insertions, 0 deletions
diff --git a/Lib/test/test_sqlite3/test_hooks.py b/Lib/test/test_sqlite3/test_hooks.py
new file mode 100644
index 0000000..bf454b2
--- /dev/null
+++ b/Lib/test/test_sqlite3/test_hooks.py
@@ -0,0 +1,294 @@
+# pysqlite2/test/hooks.py: tests for various SQLite-specific hooks
+#
+# Copyright (C) 2006-2007 Gerhard Häring <gh@ghaering.de>
+#
+# This file is part of pysqlite.
+#
+# This software is provided 'as-is', without any express or implied
+# warranty. In no event will the authors be held liable for any damages
+# arising from the use of this software.
+#
+# Permission is granted to anyone to use this software for any purpose,
+# including commercial applications, and to alter it and redistribute it
+# freely, subject to the following restrictions:
+#
+# 1. The origin of this software must not be misrepresented; you must not
+# claim that you wrote the original software. If you use this software
+# in a product, an acknowledgment in the product documentation would be
+# appreciated but is not required.
+# 2. Altered source versions must be plainly marked as such, and must not be
+# misrepresented as being the original software.
+# 3. This notice may not be removed or altered from any source distribution.
+
+import unittest
+import sqlite3 as sqlite
+
+from test.support.os_helper import TESTFN, unlink
+from .test_userfunctions import with_tracebacks
+
+class CollationTests(unittest.TestCase):
+ def test_create_collation_not_string(self):
+ con = sqlite.connect(":memory:")
+ with self.assertRaises(TypeError):
+ con.create_collation(None, lambda x, y: (x > y) - (x < y))
+
+ def test_create_collation_not_callable(self):
+ con = sqlite.connect(":memory:")
+ with self.assertRaises(TypeError) as cm:
+ con.create_collation("X", 42)
+ self.assertEqual(str(cm.exception), 'parameter must be callable')
+
+ def test_create_collation_not_ascii(self):
+ con = sqlite.connect(":memory:")
+ con.create_collation("collä", lambda x, y: (x > y) - (x < y))
+
+ def test_create_collation_bad_upper(self):
+ class BadUpperStr(str):
+ def upper(self):
+ return None
+ con = sqlite.connect(":memory:")
+ mycoll = lambda x, y: -((x > y) - (x < y))
+ con.create_collation(BadUpperStr("mycoll"), mycoll)
+ result = con.execute("""
+ select x from (
+ select 'a' as x
+ union
+ select 'b' as x
+ ) order by x collate mycoll
+ """).fetchall()
+ self.assertEqual(result[0][0], 'b')
+ self.assertEqual(result[1][0], 'a')
+
+ def test_collation_is_used(self):
+ def mycoll(x, y):
+ # reverse order
+ return -((x > y) - (x < y))
+
+ con = sqlite.connect(":memory:")
+ con.create_collation("mycoll", mycoll)
+ sql = """
+ select x from (
+ select 'a' as x
+ union
+ select 'b' as x
+ union
+ select 'c' as x
+ ) order by x collate mycoll
+ """
+ result = con.execute(sql).fetchall()
+ self.assertEqual(result, [('c',), ('b',), ('a',)],
+ msg='the expected order was not returned')
+
+ con.create_collation("mycoll", None)
+ with self.assertRaises(sqlite.OperationalError) as cm:
+ result = con.execute(sql).fetchall()
+ self.assertEqual(str(cm.exception), 'no such collation sequence: mycoll')
+
+ def test_collation_returns_large_integer(self):
+ def mycoll(x, y):
+ # reverse order
+ return -((x > y) - (x < y)) * 2**32
+ con = sqlite.connect(":memory:")
+ con.create_collation("mycoll", mycoll)
+ sql = """
+ select x from (
+ select 'a' as x
+ union
+ select 'b' as x
+ union
+ select 'c' as x
+ ) order by x collate mycoll
+ """
+ result = con.execute(sql).fetchall()
+ self.assertEqual(result, [('c',), ('b',), ('a',)],
+ msg="the expected order was not returned")
+
+ def test_collation_register_twice(self):
+ """
+ Register two different collation functions under the same name.
+ Verify that the last one is actually used.
+ """
+ con = sqlite.connect(":memory:")
+ con.create_collation("mycoll", lambda x, y: (x > y) - (x < y))
+ con.create_collation("mycoll", lambda x, y: -((x > y) - (x < y)))
+ result = con.execute("""
+ select x from (select 'a' as x union select 'b' as x) order by x collate mycoll
+ """).fetchall()
+ self.assertEqual(result[0][0], 'b')
+ self.assertEqual(result[1][0], 'a')
+
+ def test_deregister_collation(self):
+ """
+ Register a collation, then deregister it. Make sure an error is raised if we try
+ to use it.
+ """
+ con = sqlite.connect(":memory:")
+ con.create_collation("mycoll", lambda x, y: (x > y) - (x < y))
+ con.create_collation("mycoll", None)
+ with self.assertRaises(sqlite.OperationalError) as cm:
+ con.execute("select 'a' as x union select 'b' as x order by x collate mycoll")
+ self.assertEqual(str(cm.exception), 'no such collation sequence: mycoll')
+
+class ProgressTests(unittest.TestCase):
+ def test_progress_handler_used(self):
+ """
+ Test that the progress handler is invoked once it is set.
+ """
+ con = sqlite.connect(":memory:")
+ progress_calls = []
+ def progress():
+ progress_calls.append(None)
+ return 0
+ con.set_progress_handler(progress, 1)
+ con.execute("""
+ create table foo(a, b)
+ """)
+ self.assertTrue(progress_calls)
+
+ def test_opcode_count(self):
+ """
+ Test that the opcode argument is respected.
+ """
+ con = sqlite.connect(":memory:")
+ progress_calls = []
+ def progress():
+ progress_calls.append(None)
+ return 0
+ con.set_progress_handler(progress, 1)
+ curs = con.cursor()
+ curs.execute("""
+ create table foo (a, b)
+ """)
+ first_count = len(progress_calls)
+ progress_calls = []
+ con.set_progress_handler(progress, 2)
+ curs.execute("""
+ create table bar (a, b)
+ """)
+ second_count = len(progress_calls)
+ self.assertGreaterEqual(first_count, second_count)
+
+ def test_cancel_operation(self):
+ """
+ Test that returning a non-zero value stops the operation in progress.
+ """
+ con = sqlite.connect(":memory:")
+ def progress():
+ return 1
+ con.set_progress_handler(progress, 1)
+ curs = con.cursor()
+ self.assertRaises(
+ sqlite.OperationalError,
+ curs.execute,
+ "create table bar (a, b)")
+
+ def test_clear_handler(self):
+ """
+ Test that setting the progress handler to None clears the previously set handler.
+ """
+ con = sqlite.connect(":memory:")
+ action = 0
+ def progress():
+ nonlocal action
+ action = 1
+ return 0
+ con.set_progress_handler(progress, 1)
+ con.set_progress_handler(None, 1)
+ con.execute("select 1 union select 2 union select 3").fetchall()
+ self.assertEqual(action, 0, "progress handler was not cleared")
+
+ @with_tracebacks(['bad_progress', 'ZeroDivisionError'])
+ def test_error_in_progress_handler(self):
+ con = sqlite.connect(":memory:")
+ def bad_progress():
+ 1 / 0
+ con.set_progress_handler(bad_progress, 1)
+ with self.assertRaises(sqlite.OperationalError):
+ con.execute("""
+ create table foo(a, b)
+ """)
+
+ @with_tracebacks(['__bool__', 'ZeroDivisionError'])
+ def test_error_in_progress_handler_result(self):
+ con = sqlite.connect(":memory:")
+ class BadBool:
+ def __bool__(self):
+ 1 / 0
+ def bad_progress():
+ return BadBool()
+ con.set_progress_handler(bad_progress, 1)
+ with self.assertRaises(sqlite.OperationalError):
+ con.execute("""
+ create table foo(a, b)
+ """)
+
+
+class TraceCallbackTests(unittest.TestCase):
+ def test_trace_callback_used(self):
+ """
+ Test that the trace callback is invoked once it is set.
+ """
+ con = sqlite.connect(":memory:")
+ traced_statements = []
+ def trace(statement):
+ traced_statements.append(statement)
+ con.set_trace_callback(trace)
+ con.execute("create table foo(a, b)")
+ self.assertTrue(traced_statements)
+ self.assertTrue(any("create table foo" in stmt for stmt in traced_statements))
+
+ def test_clear_trace_callback(self):
+ """
+ Test that setting the trace callback to None clears the previously set callback.
+ """
+ con = sqlite.connect(":memory:")
+ traced_statements = []
+ def trace(statement):
+ traced_statements.append(statement)
+ con.set_trace_callback(trace)
+ con.set_trace_callback(None)
+ con.execute("create table foo(a, b)")
+ self.assertFalse(traced_statements, "trace callback was not cleared")
+
+ def test_unicode_content(self):
+ """
+ Test that the statement can contain unicode literals.
+ """
+ unicode_value = '\xf6\xe4\xfc\xd6\xc4\xdc\xdf\u20ac'
+ con = sqlite.connect(":memory:")
+ traced_statements = []
+ def trace(statement):
+ traced_statements.append(statement)
+ con.set_trace_callback(trace)
+ con.execute("create table foo(x)")
+ con.execute("insert into foo(x) values ('%s')" % unicode_value)
+ con.commit()
+ self.assertTrue(any(unicode_value in stmt for stmt in traced_statements),
+ "Unicode data %s garbled in trace callback: %s"
+ % (ascii(unicode_value), ', '.join(map(ascii, traced_statements))))
+
+ def test_trace_callback_content(self):
+ # set_trace_callback() shouldn't produce duplicate content (bpo-26187)
+ traced_statements = []
+ def trace(statement):
+ traced_statements.append(statement)
+
+ queries = ["create table foo(x)",
+ "insert into foo(x) values(1)"]
+ self.addCleanup(unlink, TESTFN)
+ con1 = sqlite.connect(TESTFN, isolation_level=None)
+ con2 = sqlite.connect(TESTFN)
+ try:
+ con1.set_trace_callback(trace)
+ cur = con1.cursor()
+ cur.execute(queries[0])
+ con2.execute("create table bar(x)")
+ cur.execute(queries[1])
+ finally:
+ con1.close()
+ con2.close()
+ self.assertEqual(traced_statements, queries)
+
+
+if __name__ == "__main__":
+ unittest.main()