summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_pyrepl/test_pyrepl.py
diff options
context:
space:
mode:
authorEmily Morehouse <emily@cuttlesoft.com>2024-09-25 18:22:03 (GMT)
committerGitHub <noreply@github.com>2024-09-25 18:22:03 (GMT)
commitc1600c78e4565b6bb558ade451abe2648ba4dd0a (patch)
treebd17e1a3aa256803506e27259766493d5e24290c /Lib/test/test_pyrepl/test_pyrepl.py
parent28efeefab7d577ea4fb6e3f6e82f903f2aee271d (diff)
downloadcpython-c1600c78e4565b6bb558ade451abe2648ba4dd0a.zip
cpython-c1600c78e4565b6bb558ade451abe2648ba4dd0a.tar.gz
cpython-c1600c78e4565b6bb558ade451abe2648ba4dd0a.tar.bz2
gh-123856: Fix PyREPL failure when a keyboard interrupt is triggered after using a history search (#124396)
Co-authored-by: Ɓukasz Langa <lukasz@langa.pl>
Diffstat (limited to 'Lib/test/test_pyrepl/test_pyrepl.py')
-rw-r--r--Lib/test/test_pyrepl/test_pyrepl.py192
1 files changed, 107 insertions, 85 deletions
diff --git a/Lib/test/test_pyrepl/test_pyrepl.py b/Lib/test/test_pyrepl/test_pyrepl.py
index e816de3..0f3e999 100644
--- a/Lib/test/test_pyrepl/test_pyrepl.py
+++ b/Lib/test/test_pyrepl/test_pyrepl.py
@@ -8,7 +8,7 @@ import select
import subprocess
import sys
import tempfile
-from unittest import TestCase, skipUnless
+from unittest import TestCase, skipUnless, skipIf
from unittest.mock import patch
from test.support import force_not_colorized
from test.support import SHORT_TIMEOUT
@@ -35,6 +35,94 @@ try:
except ImportError:
pty = None
+
+class ReplTestCase(TestCase):
+ def run_repl(
+ self,
+ repl_input: str | list[str],
+ env: dict | None = None,
+ *,
+ cmdline_args: list[str] | None = None,
+ cwd: str | None = None,
+ ) -> tuple[str, int]:
+ temp_dir = None
+ if cwd is None:
+ temp_dir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
+ cwd = temp_dir.name
+ try:
+ return self._run_repl(
+ repl_input, env=env, cmdline_args=cmdline_args, cwd=cwd
+ )
+ finally:
+ if temp_dir is not None:
+ temp_dir.cleanup()
+
+ def _run_repl(
+ self,
+ repl_input: str | list[str],
+ *,
+ env: dict | None,
+ cmdline_args: list[str] | None,
+ cwd: str,
+ ) -> tuple[str, int]:
+ assert pty
+ master_fd, slave_fd = pty.openpty()
+ cmd = [sys.executable, "-i", "-u"]
+ if env is None:
+ cmd.append("-I")
+ elif "PYTHON_HISTORY" not in env:
+ env["PYTHON_HISTORY"] = os.path.join(cwd, ".regrtest_history")
+ if cmdline_args is not None:
+ cmd.extend(cmdline_args)
+
+ try:
+ import termios
+ except ModuleNotFoundError:
+ pass
+ else:
+ term_attr = termios.tcgetattr(slave_fd)
+ term_attr[6][termios.VREPRINT] = 0 # pass through CTRL-R
+ term_attr[6][termios.VINTR] = 0 # pass through CTRL-C
+ termios.tcsetattr(slave_fd, termios.TCSANOW, term_attr)
+
+ process = subprocess.Popen(
+ cmd,
+ stdin=slave_fd,
+ stdout=slave_fd,
+ stderr=slave_fd,
+ cwd=cwd,
+ text=True,
+ close_fds=True,
+ env=env if env else os.environ,
+ )
+ os.close(slave_fd)
+ if isinstance(repl_input, list):
+ repl_input = "\n".join(repl_input) + "\n"
+ os.write(master_fd, repl_input.encode("utf-8"))
+
+ output = []
+ while select.select([master_fd], [], [], SHORT_TIMEOUT)[0]:
+ try:
+ data = os.read(master_fd, 1024).decode("utf-8")
+ if not data:
+ break
+ except OSError:
+ break
+ output.append(data)
+ else:
+ os.close(master_fd)
+ process.kill()
+ self.fail(f"Timeout while waiting for output, got: {''.join(output)}")
+
+ os.close(master_fd)
+ try:
+ exit_code = process.wait(timeout=SHORT_TIMEOUT)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ exit_code = process.wait()
+ return "".join(output), exit_code
+
+
class TestCursorPosition(TestCase):
def prepare_reader(self, events):
console = FakeConsole(events)
@@ -968,7 +1056,20 @@ class TestPasteEvent(TestCase):
@skipUnless(pty, "requires pty")
-class TestMain(TestCase):
+class TestDumbTerminal(ReplTestCase):
+ def test_dumb_terminal_exits_cleanly(self):
+ env = os.environ.copy()
+ env.update({"TERM": "dumb"})
+ output, exit_code = self.run_repl("exit()\n", env=env)
+ self.assertEqual(exit_code, 0)
+ self.assertIn("warning: can't use pyrepl", output)
+ self.assertNotIn("Exception", output)
+ self.assertNotIn("Traceback", output)
+
+
+@skipUnless(pty, "requires pty")
+@skipIf((os.environ.get("TERM") or "dumb") == "dumb", "can't use pyrepl in dumb terminal")
+class TestMain(ReplTestCase):
def setUp(self):
# Cleanup from PYTHON* variables to isolate from local
# user settings, see #121359. Such variables should be
@@ -1078,15 +1179,6 @@ class TestMain(TestCase):
}
self._run_repl_globals_test(expectations, as_module=True)
- def test_dumb_terminal_exits_cleanly(self):
- env = os.environ.copy()
- env.update({"TERM": "dumb"})
- output, exit_code = self.run_repl("exit()\n", env=env)
- self.assertEqual(exit_code, 0)
- self.assertIn("warning: can't use pyrepl", output)
- self.assertNotIn("Exception", output)
- self.assertNotIn("Traceback", output)
-
@force_not_colorized
def test_python_basic_repl(self):
env = os.environ.copy()
@@ -1209,80 +1301,6 @@ class TestMain(TestCase):
self.assertIn("in x3", output)
self.assertIn("in <module>", output)
- def run_repl(
- self,
- repl_input: str | list[str],
- env: dict | None = None,
- *,
- cmdline_args: list[str] | None = None,
- cwd: str | None = None,
- ) -> tuple[str, int]:
- temp_dir = None
- if cwd is None:
- temp_dir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
- cwd = temp_dir.name
- try:
- return self._run_repl(
- repl_input, env=env, cmdline_args=cmdline_args, cwd=cwd
- )
- finally:
- if temp_dir is not None:
- temp_dir.cleanup()
-
- def _run_repl(
- self,
- repl_input: str | list[str],
- *,
- env: dict | None,
- cmdline_args: list[str] | None,
- cwd: str,
- ) -> tuple[str, int]:
- assert pty
- master_fd, slave_fd = pty.openpty()
- cmd = [sys.executable, "-i", "-u"]
- if env is None:
- cmd.append("-I")
- elif "PYTHON_HISTORY" not in env:
- env["PYTHON_HISTORY"] = os.path.join(cwd, ".regrtest_history")
- if cmdline_args is not None:
- cmd.extend(cmdline_args)
- process = subprocess.Popen(
- cmd,
- stdin=slave_fd,
- stdout=slave_fd,
- stderr=slave_fd,
- cwd=cwd,
- text=True,
- close_fds=True,
- env=env if env else os.environ,
- )
- os.close(slave_fd)
- if isinstance(repl_input, list):
- repl_input = "\n".join(repl_input) + "\n"
- os.write(master_fd, repl_input.encode("utf-8"))
-
- output = []
- while select.select([master_fd], [], [], SHORT_TIMEOUT)[0]:
- try:
- data = os.read(master_fd, 1024).decode("utf-8")
- if not data:
- break
- except OSError:
- break
- output.append(data)
- else:
- os.close(master_fd)
- process.kill()
- self.fail(f"Timeout while waiting for output, got: {''.join(output)}")
-
- os.close(master_fd)
- try:
- exit_code = process.wait(timeout=SHORT_TIMEOUT)
- except subprocess.TimeoutExpired:
- process.kill()
- exit_code = process.wait()
- return "".join(output), exit_code
-
def test_readline_history_file(self):
# skip, if readline module is not available
readline = import_module('readline')
@@ -1305,3 +1323,7 @@ class TestMain(TestCase):
output, exit_code = self.run_repl("exit\n", env=env)
self.assertEqual(exit_code, 0)
self.assertNotIn("\\040", pathlib.Path(hfile.name).read_text())
+
+ def test_keyboard_interrupt_after_isearch(self):
+ output, exit_code = self.run_repl(["\x12", "\x03", "exit"])
+ self.assertEqual(exit_code, 0)