From bc7c7cd18a4ae015449f95454a762a7276585bb8 Mon Sep 17 00:00:00 2001 From: Erlend Egeberg Aasland Date: Mon, 1 Aug 2022 12:25:16 +0200 Subject: gh-77617: Add sqlite3 command-line interface (#95026) Co-authored-by: Serhiy Storchaka --- Doc/library/sqlite3.rst | 20 +++ Doc/whatsnew/3.12.rst | 7 + Lib/sqlite3/__main__.py | 97 +++++++++++++ Lib/test/test_sqlite3/test_cli.py | 155 +++++++++++++++++++++ .../2022-07-20-00-23-58.gh-issue-77617.XGaqSQ.rst | 2 + 5 files changed, 281 insertions(+) create mode 100644 Lib/sqlite3/__main__.py create mode 100644 Lib/test/test_sqlite3/test_cli.py create mode 100644 Misc/NEWS.d/next/Library/2022-07-20-00-23-58.gh-issue-77617.XGaqSQ.rst diff --git a/Doc/library/sqlite3.rst b/Doc/library/sqlite3.rst index 35eb404..28bd7ce 100644 --- a/Doc/library/sqlite3.rst +++ b/Doc/library/sqlite3.rst @@ -1442,6 +1442,26 @@ and you can let the ``sqlite3`` module convert SQLite types to Python types via :ref:`converters `. +.. _sqlite3-cli: + +Command-line interface +^^^^^^^^^^^^^^^^^^^^^^ + +The ``sqlite3`` module can be invoked as a script +in order to provide a simple SQLite shell. +Type ``.quit`` or CTRL-D to exit the shell. + +.. program:: python -m sqlite3 [-h] [-v] [filename] [sql] + +.. option:: -h, --help + Print CLI help. + +.. option:: -v, --version + Print underlying SQLite library version. + +.. versionadded:: 3.12 + + .. _sqlite3-howtos: How-to guides diff --git a/Doc/whatsnew/3.12.rst b/Doc/whatsnew/3.12.rst index 0c53bc0..67396f8 100644 --- a/Doc/whatsnew/3.12.rst +++ b/Doc/whatsnew/3.12.rst @@ -112,6 +112,13 @@ os (Contributed by Kumar Aditya in :gh:`93312`.) +sqlite3 +------- + +* Add a :ref:`command-line interface `. + (Contributed by Erlend E. Aasland in :gh:`77617`.) + + Optimizations ============= diff --git a/Lib/sqlite3/__main__.py b/Lib/sqlite3/__main__.py new file mode 100644 index 0000000..c62fad8 --- /dev/null +++ b/Lib/sqlite3/__main__.py @@ -0,0 +1,97 @@ +import sqlite3 +import sys + +from argparse import ArgumentParser +from code import InteractiveConsole +from textwrap import dedent + + +def execute(c, sql, suppress_errors=True): + try: + for row in c.execute(sql): + print(row) + except sqlite3.Error as e: + tp = type(e).__name__ + try: + print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr) + except AttributeError: + print(f"{tp}: {e}", file=sys.stderr) + if not suppress_errors: + sys.exit(1) + + +class SqliteInteractiveConsole(InteractiveConsole): + + def __init__(self, connection): + super().__init__() + self._con = connection + self._cur = connection.cursor() + + def runsource(self, source, filename="", symbol="single"): + match source: + case ".version": + print(f"{sqlite3.sqlite_version}") + case ".help": + print("Enter SQL code and press enter.") + case ".quit": + sys.exit(0) + case _: + if not sqlite3.complete_statement(source): + return True + execute(self._cur, source) + return False + + +def main(): + parser = ArgumentParser( + description="Python sqlite3 CLI", + prog="python -m sqlite3", + ) + parser.add_argument( + "filename", type=str, default=":memory:", nargs="?", + help=( + "SQLite database to open (defaults to ':memory:'). " + "A new database is created if the file does not previously exist." + ), + ) + parser.add_argument( + "sql", type=str, nargs="?", + help=( + "An SQL query to execute. " + "Any returned rows are printed to stdout." + ), + ) + parser.add_argument( + "-v", "--version", action="version", + version=f"SQLite version {sqlite3.sqlite_version}", + help="Print underlying SQLite library version", + ) + args = parser.parse_args() + + if args.filename == ":memory:": + db_name = "a transient in-memory database" + else: + db_name = repr(args.filename) + + banner = dedent(f""" + sqlite3 shell, running on SQLite version {sqlite3.sqlite_version} + Connected to {db_name} + + Each command will be run using execute() on the cursor. + Type ".help" for more information; type ".quit" or CTRL-D to quit. + """).strip() + sys.ps1 = "sqlite> " + sys.ps2 = " ... " + + con = sqlite3.connect(args.filename, isolation_level=None) + try: + if args.sql: + execute(con, args.sql, suppress_errors=False) + else: + console = SqliteInteractiveConsole(con) + console.interact(banner, exitmsg="") + finally: + con.close() + + +main() diff --git a/Lib/test/test_sqlite3/test_cli.py b/Lib/test/test_sqlite3/test_cli.py new file mode 100644 index 0000000..d374f8e --- /dev/null +++ b/Lib/test/test_sqlite3/test_cli.py @@ -0,0 +1,155 @@ +"""sqlite3 CLI tests.""" + +import sqlite3 as sqlite +import subprocess +import sys +import unittest + +from test.support import SHORT_TIMEOUT, requires_subprocess +from test.support.os_helper import TESTFN, unlink + + +@requires_subprocess() +class CommandLineInterface(unittest.TestCase): + + def _do_test(self, *args, expect_success=True): + with subprocess.Popen( + [sys.executable, "-Xutf8", "-m", "sqlite3", *args], + encoding="utf-8", + bufsize=0, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) as proc: + proc.wait() + if expect_success == bool(proc.returncode): + self.fail("".join(proc.stderr)) + stdout = proc.stdout.read() + stderr = proc.stderr.read() + if expect_success: + self.assertEqual(stderr, "") + else: + self.assertEqual(stdout, "") + return stdout, stderr + + def expect_success(self, *args): + out, _ = self._do_test(*args) + return out + + def expect_failure(self, *args): + _, err = self._do_test(*args, expect_success=False) + return err + + def test_cli_help(self): + out = self.expect_success("-h") + self.assertIn("usage: python -m sqlite3", out) + + def test_cli_version(self): + out = self.expect_success("-v") + self.assertIn(sqlite.sqlite_version, out) + + def test_cli_execute_sql(self): + out = self.expect_success(":memory:", "select 1") + self.assertIn("(1,)", out) + + def test_cli_execute_too_much_sql(self): + stderr = self.expect_failure(":memory:", "select 1; select 2") + err = "ProgrammingError: You can only execute one statement at a time" + self.assertIn(err, stderr) + + def test_cli_execute_incomplete_sql(self): + stderr = self.expect_failure(":memory:", "sel") + self.assertIn("OperationalError (SQLITE_ERROR)", stderr) + + def test_cli_on_disk_db(self): + self.addCleanup(unlink, TESTFN) + out = self.expect_success(TESTFN, "create table t(t)") + self.assertEqual(out, "") + out = self.expect_success(TESTFN, "select count(t) from t") + self.assertIn("(0,)", out) + + +@requires_subprocess() +class InteractiveSession(unittest.TestCase): + TIMEOUT = SHORT_TIMEOUT / 10. + MEMORY_DB_MSG = "Connected to a transient in-memory database" + PS1 = "sqlite> " + PS2 = "... " + + def start_cli(self, *args): + return subprocess.Popen( + [sys.executable, "-Xutf8", "-m", "sqlite3", *args], + encoding="utf-8", + bufsize=0, + stdin=subprocess.PIPE, + # Note: the banner is printed to stderr, the prompt to stdout. + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + def expect_success(self, proc): + proc.wait() + if proc.returncode: + self.fail("".join(proc.stderr)) + + def test_interact(self): + with self.start_cli() as proc: + out, err = proc.communicate(timeout=self.TIMEOUT) + self.assertIn(self.MEMORY_DB_MSG, err) + self.assertIn(self.PS1, out) + self.expect_success(proc) + + def test_interact_quit(self): + with self.start_cli() as proc: + out, err = proc.communicate(input=".quit", timeout=self.TIMEOUT) + self.assertIn(self.MEMORY_DB_MSG, err) + self.assertIn(self.PS1, out) + self.expect_success(proc) + + def test_interact_version(self): + with self.start_cli() as proc: + out, err = proc.communicate(input=".version", timeout=self.TIMEOUT) + self.assertIn(self.MEMORY_DB_MSG, err) + self.assertIn(sqlite.sqlite_version, out) + self.expect_success(proc) + + def test_interact_valid_sql(self): + with self.start_cli() as proc: + out, err = proc.communicate(input="select 1;", + timeout=self.TIMEOUT) + self.assertIn(self.MEMORY_DB_MSG, err) + self.assertIn("(1,)", out) + self.expect_success(proc) + + def test_interact_valid_multiline_sql(self): + with self.start_cli() as proc: + out, err = proc.communicate(input="select 1\n;", + timeout=self.TIMEOUT) + self.assertIn(self.MEMORY_DB_MSG, err) + self.assertIn(self.PS2, out) + self.assertIn("(1,)", out) + self.expect_success(proc) + + def test_interact_invalid_sql(self): + with self.start_cli() as proc: + out, err = proc.communicate(input="sel;", timeout=self.TIMEOUT) + self.assertIn(self.MEMORY_DB_MSG, err) + self.assertIn("OperationalError (SQLITE_ERROR)", err) + self.expect_success(proc) + + def test_interact_on_disk_file(self): + self.addCleanup(unlink, TESTFN) + with self.start_cli(TESTFN) as proc: + out, err = proc.communicate(input="create table t(t);", + timeout=self.TIMEOUT) + self.assertIn(TESTFN, err) + self.assertIn(self.PS1, out) + self.expect_success(proc) + with self.start_cli(TESTFN, "select count(t) from t") as proc: + out = proc.stdout.read() + err = proc.stderr.read() + self.assertIn("(0,)", out) + self.expect_success(proc) + + +if __name__ == "__main__": + unittest.main() diff --git a/Misc/NEWS.d/next/Library/2022-07-20-00-23-58.gh-issue-77617.XGaqSQ.rst b/Misc/NEWS.d/next/Library/2022-07-20-00-23-58.gh-issue-77617.XGaqSQ.rst new file mode 100644 index 0000000..1cbaa7d --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-07-20-00-23-58.gh-issue-77617.XGaqSQ.rst @@ -0,0 +1,2 @@ +Add :mod:`sqlite3` :ref:`command-line interface `. +Patch by Erlend Aasland. -- cgit v0.12