summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_dbm_sqlite3.py
diff options
context:
space:
mode:
authorErlend E. Aasland <erlend@python.org>2024-02-14 11:14:56 (GMT)
committerGitHub <noreply@github.com>2024-02-14 11:14:56 (GMT)
commitdd5e4d90789b3a065290e264122629f31cb0b547 (patch)
tree5325d301b0da1b62ddbe52f0c922b57d2eea8de6 /Lib/test/test_dbm_sqlite3.py
parent57e4c81ae1cd605efa173885574aedc3fded4b8b (diff)
downloadcpython-dd5e4d90789b3a065290e264122629f31cb0b547.zip
cpython-dd5e4d90789b3a065290e264122629f31cb0b547.tar.gz
cpython-dd5e4d90789b3a065290e264122629f31cb0b547.tar.bz2
gh-100414: Add SQLite backend to dbm (#114481)
Co-authored-by: Raymond Hettinger <rhettinger@users.noreply.github.com> Co-authored-by: Serhiy Storchaka <storchaka@gmail.com> Co-authored-by: Mariusz Felisiak <felisiak.mariusz@gmail.com>
Diffstat (limited to 'Lib/test/test_dbm_sqlite3.py')
-rw-r--r--Lib/test/test_dbm_sqlite3.py308
1 files changed, 308 insertions, 0 deletions
diff --git a/Lib/test/test_dbm_sqlite3.py b/Lib/test/test_dbm_sqlite3.py
new file mode 100644
index 0000000..7bc2a03
--- /dev/null
+++ b/Lib/test/test_dbm_sqlite3.py
@@ -0,0 +1,308 @@
+import sqlite3
+import sys
+import test.support
+import unittest
+from contextlib import closing
+from functools import partial
+from pathlib import Path
+from test.support import cpython_only, import_helper, os_helper
+
+
+dbm_sqlite3 = import_helper.import_module("dbm.sqlite3")
+from dbm.sqlite3 import _normalize_uri
+
+
+class _SQLiteDbmTests(unittest.TestCase):
+
+ def setUp(self):
+ self.filename = os_helper.TESTFN
+ db = dbm_sqlite3.open(self.filename, "c")
+ db.close()
+
+ def tearDown(self):
+ for suffix in "", "-wal", "-shm":
+ os_helper.unlink(self.filename + suffix)
+
+
+class URI(unittest.TestCase):
+
+ def test_uri_substitutions(self):
+ dataset = (
+ ("/absolute/////b/c", "/absolute/b/c"),
+ ("PRE#MID##END", "PRE%23MID%23%23END"),
+ ("%#?%%#", "%25%23%3F%25%25%23"),
+ )
+ for path, normalized in dataset:
+ with self.subTest(path=path, normalized=normalized):
+ self.assertTrue(_normalize_uri(path).endswith(normalized))
+
+ @unittest.skipUnless(sys.platform == "win32", "requires Windows")
+ def test_uri_windows(self):
+ dataset = (
+ # Relative subdir.
+ (r"2018\January.xlsx",
+ "2018/January.xlsx"),
+ # Absolute with drive letter.
+ (r"C:\Projects\apilibrary\apilibrary.sln",
+ "/C:/Projects/apilibrary/apilibrary.sln"),
+ # Relative with drive letter.
+ (r"C:Projects\apilibrary\apilibrary.sln",
+ "/C:Projects/apilibrary/apilibrary.sln"),
+ )
+ for path, normalized in dataset:
+ with self.subTest(path=path, normalized=normalized):
+ if not Path(path).is_absolute():
+ self.skipTest(f"skipping relative path: {path!r}")
+ self.assertTrue(_normalize_uri(path).endswith(normalized))
+
+
+class ReadOnly(_SQLiteDbmTests):
+
+ def setUp(self):
+ super().setUp()
+ with dbm_sqlite3.open(self.filename, "w") as db:
+ db[b"key1"] = "value1"
+ db[b"key2"] = "value2"
+ self.db = dbm_sqlite3.open(self.filename, "r")
+
+ def tearDown(self):
+ self.db.close()
+ super().tearDown()
+
+ def test_readonly_read(self):
+ self.assertEqual(self.db[b"key1"], b"value1")
+ self.assertEqual(self.db[b"key2"], b"value2")
+
+ def test_readonly_write(self):
+ with self.assertRaises(dbm_sqlite3.error):
+ self.db[b"new"] = "value"
+
+ def test_readonly_delete(self):
+ with self.assertRaises(dbm_sqlite3.error):
+ del self.db[b"key1"]
+
+ def test_readonly_keys(self):
+ self.assertEqual(self.db.keys(), [b"key1", b"key2"])
+
+ def test_readonly_iter(self):
+ self.assertEqual([k for k in self.db], [b"key1", b"key2"])
+
+
+class ReadWrite(_SQLiteDbmTests):
+
+ def setUp(self):
+ super().setUp()
+ self.db = dbm_sqlite3.open(self.filename, "w")
+
+ def tearDown(self):
+ self.db.close()
+ super().tearDown()
+
+ def db_content(self):
+ with closing(sqlite3.connect(self.filename)) as cx:
+ keys = [r[0] for r in cx.execute("SELECT key FROM Dict")]
+ vals = [r[0] for r in cx.execute("SELECT value FROM Dict")]
+ return keys, vals
+
+ def test_readwrite_unique_key(self):
+ self.db["key"] = "value"
+ self.db["key"] = "other"
+ keys, vals = self.db_content()
+ self.assertEqual(keys, [b"key"])
+ self.assertEqual(vals, [b"other"])
+
+ def test_readwrite_delete(self):
+ self.db["key"] = "value"
+ self.db["new"] = "other"
+
+ del self.db[b"new"]
+ keys, vals = self.db_content()
+ self.assertEqual(keys, [b"key"])
+ self.assertEqual(vals, [b"value"])
+
+ del self.db[b"key"]
+ keys, vals = self.db_content()
+ self.assertEqual(keys, [])
+ self.assertEqual(vals, [])
+
+ def test_readwrite_null_key(self):
+ with self.assertRaises(dbm_sqlite3.error):
+ self.db[None] = "value"
+
+ def test_readwrite_null_value(self):
+ with self.assertRaises(dbm_sqlite3.error):
+ self.db[b"key"] = None
+
+
+class Misuse(_SQLiteDbmTests):
+
+ def setUp(self):
+ super().setUp()
+ self.db = dbm_sqlite3.open(self.filename, "w")
+
+ def tearDown(self):
+ self.db.close()
+ super().tearDown()
+
+ def test_misuse_double_create(self):
+ self.db["key"] = "value"
+ with dbm_sqlite3.open(self.filename, "c") as db:
+ self.assertEqual(db[b"key"], b"value")
+
+ def test_misuse_double_close(self):
+ self.db.close()
+
+ def test_misuse_invalid_flag(self):
+ regex = "must be.*'r'.*'w'.*'c'.*'n', not 'invalid'"
+ with self.assertRaisesRegex(ValueError, regex):
+ dbm_sqlite3.open(self.filename, flag="invalid")
+
+ def test_misuse_double_delete(self):
+ self.db["key"] = "value"
+ del self.db[b"key"]
+ with self.assertRaises(KeyError):
+ del self.db[b"key"]
+
+ def test_misuse_invalid_key(self):
+ with self.assertRaises(KeyError):
+ self.db[b"key"]
+
+ def test_misuse_iter_close1(self):
+ self.db["1"] = 1
+ it = iter(self.db)
+ self.db.close()
+ with self.assertRaises(dbm_sqlite3.error):
+ next(it)
+
+ def test_misuse_iter_close2(self):
+ self.db["1"] = 1
+ self.db["2"] = 2
+ it = iter(self.db)
+ next(it)
+ self.db.close()
+ with self.assertRaises(dbm_sqlite3.error):
+ next(it)
+
+ def test_misuse_use_after_close(self):
+ self.db.close()
+ with self.assertRaises(dbm_sqlite3.error):
+ self.db[b"read"]
+ with self.assertRaises(dbm_sqlite3.error):
+ self.db[b"write"] = "value"
+ with self.assertRaises(dbm_sqlite3.error):
+ del self.db[b"del"]
+ with self.assertRaises(dbm_sqlite3.error):
+ len(self.db)
+ with self.assertRaises(dbm_sqlite3.error):
+ self.db.keys()
+
+ def test_misuse_reinit(self):
+ with self.assertRaises(dbm_sqlite3.error):
+ self.db.__init__("new.db", flag="n", mode=0o666)
+
+ def test_misuse_empty_filename(self):
+ for flag in "r", "w", "c", "n":
+ with self.assertRaises(dbm_sqlite3.error):
+ db = dbm_sqlite3.open("", flag="c")
+
+
+class DataTypes(_SQLiteDbmTests):
+
+ dataset = (
+ # (raw, coerced)
+ (42, b"42"),
+ (3.14, b"3.14"),
+ ("string", b"string"),
+ (b"bytes", b"bytes"),
+ )
+
+ def setUp(self):
+ super().setUp()
+ self.db = dbm_sqlite3.open(self.filename, "w")
+
+ def tearDown(self):
+ self.db.close()
+ super().tearDown()
+
+ def test_datatypes_values(self):
+ for raw, coerced in self.dataset:
+ with self.subTest(raw=raw, coerced=coerced):
+ self.db["key"] = raw
+ self.assertEqual(self.db[b"key"], coerced)
+
+ def test_datatypes_keys(self):
+ for raw, coerced in self.dataset:
+ with self.subTest(raw=raw, coerced=coerced):
+ self.db[raw] = "value"
+ self.assertEqual(self.db[coerced], b"value")
+ # Raw keys are silently coerced to bytes.
+ self.assertEqual(self.db[raw], b"value")
+ del self.db[raw]
+
+ def test_datatypes_replace_coerced(self):
+ self.db["10"] = "value"
+ self.db[b"10"] = "value"
+ self.db[10] = "value"
+ self.assertEqual(self.db.keys(), [b"10"])
+
+
+class CorruptDatabase(_SQLiteDbmTests):
+ """Verify that database exceptions are raised as dbm.sqlite3.error."""
+
+ def setUp(self):
+ super().setUp()
+ with closing(sqlite3.connect(self.filename)) as cx:
+ with cx:
+ cx.execute("DROP TABLE IF EXISTS Dict")
+ cx.execute("CREATE TABLE Dict (invalid_schema)")
+
+ def check(self, flag, fn, should_succeed=False):
+ with closing(dbm_sqlite3.open(self.filename, flag)) as db:
+ with self.assertRaises(dbm_sqlite3.error):
+ fn(db)
+
+ @staticmethod
+ def read(db):
+ return db["key"]
+
+ @staticmethod
+ def write(db):
+ db["key"] = "value"
+
+ @staticmethod
+ def iter(db):
+ next(iter(db))
+
+ @staticmethod
+ def keys(db):
+ db.keys()
+
+ @staticmethod
+ def del_(db):
+ del db["key"]
+
+ @staticmethod
+ def len_(db):
+ len(db)
+
+ def test_corrupt_readwrite(self):
+ for flag in "r", "w", "c":
+ with self.subTest(flag=flag):
+ check = partial(self.check, flag=flag)
+ check(fn=self.read)
+ check(fn=self.write)
+ check(fn=self.iter)
+ check(fn=self.keys)
+ check(fn=self.del_)
+ check(fn=self.len_)
+
+ def test_corrupt_force_new(self):
+ with closing(dbm_sqlite3.open(self.filename, "n")) as db:
+ db["foo"] = "write"
+ _ = db[b"foo"]
+ next(iter(db))
+ del db[b"foo"]
+
+
+if __name__ == "__main__":
+ unittest.main()