summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPablo Galindo Salgado <Pablogsal@gmail.com>2024-05-05 19:32:23 (GMT)
committerGitHub <noreply@github.com>2024-05-05 19:32:23 (GMT)
commitf27f8c790af1233d499b795af1c0d1b36aaecaf5 (patch)
tree22c502c6382512fafbb63e3020c8462e5400d4df
parent40cc809902304f60c6e1c933191dd4d64e570e28 (diff)
downloadcpython-f27f8c790af1233d499b795af1c0d1b36aaecaf5.zip
cpython-f27f8c790af1233d499b795af1c0d1b36aaecaf5.tar.gz
cpython-f27f8c790af1233d499b795af1c0d1b36aaecaf5.tar.bz2
gh-111201: A new Python REPL (GH-111567)
Co-authored-by: Łukasz Langa <lukasz@langa.pl> Co-authored-by: Marta Gómez Macías <mgmacias@google.com> Co-authored-by: Lysandros Nikolaou <lisandrosnik@gmail.com> Co-authored-by: Hugo van Kemenade <1324225+hugovk@users.noreply.github.com> Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com>
-rw-r--r--.github/workflows/mypy.yml4
-rw-r--r--.github/workflows/reusable-macos.yml1
-rw-r--r--.github/workflows/reusable-ubuntu.yml1
-rw-r--r--Doc/glossary.rst14
-rw-r--r--Doc/tutorial/appendix.rst22
-rw-r--r--Doc/using/cmdline.rst10
-rw-r--r--Doc/whatsnew/3.13.rst28
-rw-r--r--Lib/_pyrepl/__init__.py19
-rw-r--r--Lib/_pyrepl/__main__.py43
-rw-r--r--Lib/_pyrepl/_minimal_curses.py68
-rw-r--r--Lib/_pyrepl/commands.py464
-rw-r--r--Lib/_pyrepl/completing_reader.py287
-rw-r--r--Lib/_pyrepl/console.py112
-rw-r--r--Lib/_pyrepl/curses.py33
-rw-r--r--Lib/_pyrepl/fancy_termios.py74
-rw-r--r--Lib/_pyrepl/historical_reader.py345
-rw-r--r--Lib/_pyrepl/input.py114
-rw-r--r--Lib/_pyrepl/keymap.py215
-rw-r--r--Lib/_pyrepl/mypy.ini27
-rw-r--r--Lib/_pyrepl/pager.py169
-rw-r--r--Lib/_pyrepl/reader.py660
-rw-r--r--Lib/_pyrepl/readline.py501
-rw-r--r--Lib/_pyrepl/simple_interact.py157
-rw-r--r--Lib/_pyrepl/trace.py21
-rw-r--r--Lib/_pyrepl/types.py8
-rw-r--r--Lib/_pyrepl/unix_console.py743
-rw-r--r--Lib/_pyrepl/unix_eventqueue.py152
-rw-r--r--Lib/_pyrepl/utils.py18
-rw-r--r--Lib/code.py13
-rwxr-xr-xLib/pydoc.py158
-rw-r--r--Lib/site.py14
-rw-r--r--Lib/test/test_pyrepl.py929
-rw-r--r--Lib/test/test_traceback.py4
-rw-r--r--Lib/traceback.py5
-rw-r--r--Makefile.pre.in1
-rw-r--r--Misc/NEWS.d/next/Core and Builtins/2024-04-28-00-41-17.gh-issue-111201.cQsh5U.rst4
-rw-r--r--Modules/main.c21
-rw-r--r--Python/clinic/sysmodule.c.h20
-rw-r--r--Python/pythonrun.c2
-rw-r--r--Python/stdlib_module_names.h1
-rw-r--r--Python/sysmodule.c16
41 files changed, 5328 insertions, 170 deletions
diff --git a/.github/workflows/mypy.yml b/.github/workflows/mypy.yml
index b766785..35996f2 100644
--- a/.github/workflows/mypy.yml
+++ b/.github/workflows/mypy.yml
@@ -8,6 +8,7 @@ on:
pull_request:
paths:
- ".github/workflows/mypy.yml"
+ - "Lib/_pyrepl/**"
- "Lib/test/libregrtest/**"
- "Tools/build/generate_sbom.py"
- "Tools/cases_generator/**"
@@ -35,8 +36,9 @@ jobs:
strategy:
matrix:
target: [
+ "Lib/_pyrepl",
"Lib/test/libregrtest",
- "Tools/build/",
+ "Tools/build",
"Tools/cases_generator",
"Tools/clinic",
"Tools/jit",
diff --git a/.github/workflows/reusable-macos.yml b/.github/workflows/reusable-macos.yml
index dabeca8..e3319f1 100644
--- a/.github/workflows/reusable-macos.yml
+++ b/.github/workflows/reusable-macos.yml
@@ -22,6 +22,7 @@ jobs:
HOMEBREW_NO_INSTALL_CLEANUP: 1
HOMEBREW_NO_INSTALLED_DEPENDENTS_CHECK: 1
PYTHONSTRICTEXTENSIONBUILD: 1
+ TERM: linux
strategy:
fail-fast: false
matrix:
diff --git a/.github/workflows/reusable-ubuntu.yml b/.github/workflows/reusable-ubuntu.yml
index e6fbaaf..0218591 100644
--- a/.github/workflows/reusable-ubuntu.yml
+++ b/.github/workflows/reusable-ubuntu.yml
@@ -17,6 +17,7 @@ jobs:
FORCE_COLOR: 1
OPENSSL_VER: 3.0.13
PYTHONSTRICTEXTENSIONBUILD: 1
+ TERM: linux
steps:
- uses: actions/checkout@v4
- name: Register gcc problem matcher
diff --git a/Doc/glossary.rst b/Doc/glossary.rst
index 05ac3ed..2846f77 100644
--- a/Doc/glossary.rst
+++ b/Doc/glossary.rst
@@ -9,13 +9,14 @@ Glossary
.. glossary::
``>>>``
- The default Python prompt of the interactive shell. Often seen for code
- examples which can be executed interactively in the interpreter.
+ The default Python prompt of the :term:`interactive` shell. Often
+ seen for code examples which can be executed interactively in the
+ interpreter.
``...``
Can refer to:
- * The default Python prompt of the interactive shell when entering the
+ * The default Python prompt of the :term:`interactive` shell when entering the
code for an indented code block, when within a pair of matching left and
right delimiters (parentheses, square brackets, curly braces or triple
quotes), or after specifying a decorator.
@@ -620,7 +621,8 @@ Glossary
execute them and see their results. Just launch ``python`` with no
arguments (possibly by selecting it from your computer's main
menu). It is a very powerful way to test out new ideas or inspect
- modules and packages (remember ``help(x)``).
+ modules and packages (remember ``help(x)``). For more on interactive
+ mode, see :ref:`tut-interac`.
interpreted
Python is an interpreted language, as opposed to a compiled one,
@@ -1084,6 +1086,10 @@ Glossary
See also :term:`namespace package`.
+ REPL
+ An acronym for the "read–eval–print loop", another name for the
+ :term:`interactive` interpreter shell.
+
__slots__
A declaration inside a class that saves memory by pre-declaring space for
instance attributes and eliminating instance dictionaries. Though
diff --git a/Doc/tutorial/appendix.rst b/Doc/tutorial/appendix.rst
index 4bea0d8..10eb143 100644
--- a/Doc/tutorial/appendix.rst
+++ b/Doc/tutorial/appendix.rst
@@ -10,6 +10,28 @@ Appendix
Interactive Mode
================
+There are two variants of the interactive :term:`REPL`. The classic
+basic interpreter is supported on all platforms with minimal line
+control capabilities.
+
+On Unix-like systems (e.g. Linux or macOS) with :mod:`curses` and
+:mod:`readline` support, a new interactive shell is used by default.
+This one supports color, multiline editing, history browsing, and
+paste mode. To disable color, see :ref:`using-on-controlling-color` for
+details. Function keys provide some additional functionality.
+:kbd:`F1` enters the interactive help browser :mod:`pydoc`.
+:kbd:`F2` allows for browsing command-line history without output nor the
+:term:`>>>` and :term:`...` prompts. :kbd:`F3` enters "paste mode", which
+makes pasting larger blocks of code easier. Press :kbd:`F3` to return to
+the regular prompt.
+
+When using the new interactive shell, exit the shell by typing :kbd:`exit`
+or :kbd:`quit`. Adding call parentheses after those commands is not
+required.
+
+If the new interactive shell is not desired, it can be disabled via
+the :envvar:`PYTHON_BASIC_REPL` environment variable.
+
.. _tut-error:
Error Handling
diff --git a/Doc/using/cmdline.rst b/Doc/using/cmdline.rst
index 051dbf9..522e6e5 100644
--- a/Doc/using/cmdline.rst
+++ b/Doc/using/cmdline.rst
@@ -42,6 +42,7 @@ additional methods of invocation:
* When called with standard input connected to a tty device, it prompts for
commands and executes them until an EOF (an end-of-file character, you can
produce that with :kbd:`Ctrl-D` on UNIX or :kbd:`Ctrl-Z, Enter` on Windows) is read.
+ For more on interactive mode, see :ref:`tut-interac`.
* When called with a file name argument or with a file as standard input, it
reads and executes a script from that file.
* When called with a directory name argument, it reads and executes an
@@ -1182,6 +1183,15 @@ conflict.
.. versionadded:: 3.13
+.. envvar:: PYTHON_BASIC_REPL
+
+ If this variable is set to ``1``, the interpreter will not attempt to
+ load the Python-based :term:`REPL` that requires :mod:`curses` and
+ :mod:`readline`, and will instead use the traditional parser-based
+ :term:`REPL`.
+
+ .. versionadded:: 3.13
+
.. envvar:: PYTHON_HISTORY
This environment variable can be used to set the location of a
diff --git a/Doc/whatsnew/3.13.rst b/Doc/whatsnew/3.13.rst
index 152c870..11c3f93 100644
--- a/Doc/whatsnew/3.13.rst
+++ b/Doc/whatsnew/3.13.rst
@@ -102,6 +102,34 @@ New typing features:
New Features
============
+A Better Interactive Interpreter
+--------------------------------
+
+On Unix-like systems like Linux or macOS, Python now uses a new
+:term:`interactive` shell. When the user starts the :term:`REPL`
+from a tty, and both :mod:`curses` and :mod:`readline` are available,
+the interactive shell now supports the following new features:
+
+* colorized prompts;
+* multiline editing with history preservation;
+* interactive help browsing using :kbd:`F1` with a separate command
+ history;
+* history browsing using :kbd:`F2` that skips output as well as the
+ :term:`>>>` and :term:`...` prompts;
+* "paste mode" with :kbd:`F3` that makes pasting larger blocks of code
+ easier (press :kbd:`F3` again to return to the regular prompt);
+* ability to issue REPL-specific commands like :kbd:`help`, :kbd:`exit`,
+ and :kbd:`quit` without the need to use call parentheses after the
+ command name.
+
+If the new interactive shell is not desired, it can be disabled via
+the :envvar:`PYTHON_BASIC_REPL` environment variable.
+
+For more on interactive mode, see :ref:`tut-interac`.
+
+(Contributed by Pablo Galindo Salgado, Łukasz Langa, and
+Lysandros Nikolaou in :gh:`111201` based on code from the PyPy project.)
+
Improved Error Messages
-----------------------
diff --git a/Lib/_pyrepl/__init__.py b/Lib/_pyrepl/__init__.py
new file mode 100644
index 0000000..1693cbd
--- /dev/null
+++ b/Lib/_pyrepl/__init__.py
@@ -0,0 +1,19 @@
+# Copyright 2000-2008 Michael Hudson-Doyle <micahel@gmail.com>
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
diff --git a/Lib/_pyrepl/__main__.py b/Lib/_pyrepl/__main__.py
new file mode 100644
index 0000000..417ee17
--- /dev/null
+++ b/Lib/_pyrepl/__main__.py
@@ -0,0 +1,43 @@
+import os
+import sys
+
+CAN_USE_PYREPL = True
+
+def interactive_console(mainmodule=None, quiet=False, pythonstartup=False):
+ global CAN_USE_PYREPL
+ if not CAN_USE_PYREPL:
+ return sys._baserepl()
+
+ startup_path = os.getenv("PYTHONSTARTUP")
+ if pythonstartup and startup_path:
+ import tokenize
+ with tokenize.open(startup_path) as f:
+ startup_code = compile(f.read(), startup_path, "exec")
+ exec(startup_code)
+
+ # set sys.{ps1,ps2} just before invoking the interactive interpreter. This
+ # mimics what CPython does in pythonrun.c
+ if not hasattr(sys, "ps1"):
+ sys.ps1 = ">>> "
+ if not hasattr(sys, "ps2"):
+ sys.ps2 = "... "
+ #
+ run_interactive = None
+ try:
+ import errno
+ if not os.isatty(sys.stdin.fileno()):
+ raise OSError(errno.ENOTTY, "tty required", "stdin")
+ from .simple_interact import check
+ if err := check():
+ raise RuntimeError(err)
+ from .simple_interact import run_multiline_interactive_console
+ run_interactive = run_multiline_interactive_console
+ except Exception as e:
+ print(f"warning: can't use pyrepl: {e}", file=sys.stderr)
+ CAN_USE_PYREPL = False
+ if run_interactive is None:
+ return sys._baserepl()
+ return run_interactive(mainmodule)
+
+if __name__ == "__main__":
+ interactive_console()
diff --git a/Lib/_pyrepl/_minimal_curses.py b/Lib/_pyrepl/_minimal_curses.py
new file mode 100644
index 0000000..0757fb2
--- /dev/null
+++ b/Lib/_pyrepl/_minimal_curses.py
@@ -0,0 +1,68 @@
+"""Minimal '_curses' module, the low-level interface for curses module
+which is not meant to be used directly.
+
+Based on ctypes. It's too incomplete to be really called '_curses', so
+to use it, you have to import it and stick it in sys.modules['_curses']
+manually.
+
+Note that there is also a built-in module _minimal_curses which will
+hide this one if compiled in.
+"""
+
+import ctypes
+import ctypes.util
+
+
+class error(Exception):
+ pass
+
+
+def _find_clib():
+ trylibs = ["ncursesw", "ncurses", "curses"]
+
+ for lib in trylibs:
+ path = ctypes.util.find_library(lib)
+ if path:
+ return path
+ raise ModuleNotFoundError("curses library not found", name="_pyrepl._minimal_curses")
+
+
+_clibpath = _find_clib()
+clib = ctypes.cdll.LoadLibrary(_clibpath)
+
+clib.setupterm.argtypes = [ctypes.c_char_p, ctypes.c_int, ctypes.POINTER(ctypes.c_int)]
+clib.setupterm.restype = ctypes.c_int
+
+clib.tigetstr.argtypes = [ctypes.c_char_p]
+clib.tigetstr.restype = ctypes.POINTER(ctypes.c_char)
+
+clib.tparm.argtypes = [ctypes.c_char_p] + 9 * [ctypes.c_int] # type: ignore[operator]
+clib.tparm.restype = ctypes.c_char_p
+
+OK = 0
+ERR = -1
+
+# ____________________________________________________________
+
+
+def setupterm(termstr, fd):
+ err = ctypes.c_int(0)
+ result = clib.setupterm(termstr, fd, ctypes.byref(err))
+ if result == ERR:
+ raise error("setupterm() failed (err=%d)" % err.value)
+
+
+def tigetstr(cap):
+ if not isinstance(cap, bytes):
+ cap = cap.encode("ascii")
+ result = clib.tigetstr(cap)
+ if ctypes.cast(result, ctypes.c_void_p).value == ERR:
+ return None
+ return ctypes.cast(result, ctypes.c_char_p).value
+
+
+def tparm(str, i1=0, i2=0, i3=0, i4=0, i5=0, i6=0, i7=0, i8=0, i9=0):
+ result = clib.tparm(str, i1, i2, i3, i4, i5, i6, i7, i8, i9)
+ if result is None:
+ raise error("tparm() returned NULL")
+ return result
diff --git a/Lib/_pyrepl/commands.py b/Lib/_pyrepl/commands.py
new file mode 100644
index 0000000..60ceb30
--- /dev/null
+++ b/Lib/_pyrepl/commands.py
@@ -0,0 +1,464 @@
+# Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com>
+# Antonio Cuni
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+import os
+
+# Categories of actions:
+# killing
+# yanking
+# motion
+# editing
+# history
+# finishing
+# [completion]
+
+
+# types
+if False:
+ from .reader import Reader
+ from .historical_reader import HistoricalReader
+ from .console import Event
+
+
+class Command:
+ finish: bool = False
+ kills_digit_arg: bool = True
+
+ def __init__(
+ self, reader: HistoricalReader, event_name: str, event: list[str]
+ ) -> None:
+ # Reader should really be "any reader" but there's too much usage of
+ # HistoricalReader methods and fields in the code below for us to
+ # refactor at the moment.
+
+ self.reader = reader
+ self.event = event
+ self.event_name = event_name
+
+ def do(self) -> None:
+ pass
+
+
+class KillCommand(Command):
+ def kill_range(self, start: int, end: int) -> None:
+ if start == end:
+ return
+ r = self.reader
+ b = r.buffer
+ text = b[start:end]
+ del b[start:end]
+ if is_kill(r.last_command):
+ if start < r.pos:
+ r.kill_ring[-1] = text + r.kill_ring[-1]
+ else:
+ r.kill_ring[-1] = r.kill_ring[-1] + text
+ else:
+ r.kill_ring.append(text)
+ r.pos = start
+ r.dirty = True
+
+
+class YankCommand(Command):
+ pass
+
+
+class MotionCommand(Command):
+ pass
+
+
+class EditCommand(Command):
+ pass
+
+
+class FinishCommand(Command):
+ finish = True
+ pass
+
+
+def is_kill(command: type[Command] | None) -> bool:
+ return command is not None and issubclass(command, KillCommand)
+
+
+def is_yank(command: type[Command] | None) -> bool:
+ return command is not None and issubclass(command, YankCommand)
+
+
+# etc
+
+
+class digit_arg(Command):
+ kills_digit_arg = False
+
+ def do(self) -> None:
+ r = self.reader
+ c = self.event[-1]
+ if c == "-":
+ if r.arg is not None:
+ r.arg = -r.arg
+ else:
+ r.arg = -1
+ else:
+ d = int(c)
+ if r.arg is None:
+ r.arg = d
+ else:
+ if r.arg < 0:
+ r.arg = 10 * r.arg - d
+ else:
+ r.arg = 10 * r.arg + d
+ r.dirty = True
+
+
+class clear_screen(Command):
+ def do(self) -> None:
+ r = self.reader
+ r.console.clear()
+ r.dirty = True
+
+
+class refresh(Command):
+ def do(self) -> None:
+ self.reader.dirty = True
+
+
+class repaint(Command):
+ def do(self) -> None:
+ self.reader.dirty = True
+ self.reader.console.repaint()
+
+
+class kill_line(KillCommand):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ eol = r.eol()
+ for c in b[r.pos : eol]:
+ if not c.isspace():
+ self.kill_range(r.pos, eol)
+ return
+ else:
+ self.kill_range(r.pos, eol + 1)
+
+
+class unix_line_discard(KillCommand):
+ def do(self) -> None:
+ r = self.reader
+ self.kill_range(r.bol(), r.pos)
+
+
+class unix_word_rubout(KillCommand):
+ def do(self) -> None:
+ r = self.reader
+ for i in range(r.get_arg()):
+ self.kill_range(r.bow(), r.pos)
+
+
+class kill_word(KillCommand):
+ def do(self) -> None:
+ r = self.reader
+ for i in range(r.get_arg()):
+ self.kill_range(r.pos, r.eow())
+
+
+class backward_kill_word(KillCommand):
+ def do(self) -> None:
+ r = self.reader
+ for i in range(r.get_arg()):
+ self.kill_range(r.bow(), r.pos)
+
+
+class yank(YankCommand):
+ def do(self) -> None:
+ r = self.reader
+ if not r.kill_ring:
+ r.error("nothing to yank")
+ return
+ r.insert(r.kill_ring[-1])
+
+
+class yank_pop(YankCommand):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ if not r.kill_ring:
+ r.error("nothing to yank")
+ return
+ if not is_yank(r.last_command):
+ r.error("previous command was not a yank")
+ return
+ repl = len(r.kill_ring[-1])
+ r.kill_ring.insert(0, r.kill_ring.pop())
+ t = r.kill_ring[-1]
+ b[r.pos - repl : r.pos] = t
+ r.pos = r.pos - repl + len(t)
+ r.dirty = True
+
+
+class interrupt(FinishCommand):
+ def do(self) -> None:
+ import signal
+
+ self.reader.console.finish()
+ os.kill(os.getpid(), signal.SIGINT)
+
+
+class suspend(Command):
+ def do(self) -> None:
+ import signal
+
+ r = self.reader
+ p = r.pos
+ r.console.finish()
+ os.kill(os.getpid(), signal.SIGSTOP)
+ ## this should probably be done
+ ## in a handler for SIGCONT?
+ r.console.prepare()
+ r.pos = p
+ # r.posxy = 0, 0 # XXX this is invalid
+ r.dirty = True
+ r.console.screen = []
+
+
+class up(MotionCommand):
+ def do(self) -> None:
+ r = self.reader
+ for _ in range(r.get_arg()):
+ x, y = r.pos2xy()
+ new_y = y - 1
+
+ if new_y < 0:
+ if r.historyi > 0:
+ r.select_item(r.historyi - 1)
+ return
+ r.pos = 0
+ r.error("start of buffer")
+ return
+
+ if (
+ x
+ > (
+ new_x := r.max_column(new_y)
+ ) # we're past the end of the previous line
+ or x == r.max_column(y)
+ and any(
+ not i.isspace() for i in r.buffer[r.bol() :]
+ ) # move between eols
+ ):
+ x = new_x
+
+ r.setpos_from_xy(x, new_y)
+
+
+class down(MotionCommand):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ for _ in range(r.get_arg()):
+ x, y = r.pos2xy()
+ new_y = y + 1
+
+ if new_y > r.max_row():
+ if r.historyi < len(r.history):
+ r.select_item(r.historyi + 1)
+ r.pos = r.eol(0)
+ return
+ r.pos = len(b)
+ r.error("end of buffer")
+ return
+
+ if (
+ x
+ > (
+ new_x := r.max_column(new_y)
+ ) # we're past the end of the previous line
+ or x == r.max_column(y)
+ and any(
+ not i.isspace() for i in r.buffer[r.bol() :]
+ ) # move between eols
+ ):
+ x = new_x
+
+ r.setpos_from_xy(x, new_y)
+
+
+class left(MotionCommand):
+ def do(self) -> None:
+ r = self.reader
+ for i in range(r.get_arg()):
+ p = r.pos - 1
+ if p >= 0:
+ r.pos = p
+ else:
+ self.reader.error("start of buffer")
+
+
+class right(MotionCommand):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ for i in range(r.get_arg()):
+ p = r.pos + 1
+ if p <= len(b):
+ r.pos = p
+ else:
+ self.reader.error("end of buffer")
+
+
+class beginning_of_line(MotionCommand):
+ def do(self) -> None:
+ self.reader.pos = self.reader.bol()
+
+
+class end_of_line(MotionCommand):
+ def do(self) -> None:
+ self.reader.pos = self.reader.eol()
+
+
+class home(MotionCommand):
+ def do(self) -> None:
+ self.reader.pos = 0
+
+
+class end(MotionCommand):
+ def do(self) -> None:
+ self.reader.pos = len(self.reader.buffer)
+
+
+class forward_word(MotionCommand):
+ def do(self) -> None:
+ r = self.reader
+ for i in range(r.get_arg()):
+ r.pos = r.eow()
+
+
+class backward_word(MotionCommand):
+ def do(self) -> None:
+ r = self.reader
+ for i in range(r.get_arg()):
+ r.pos = r.bow()
+
+
+class self_insert(EditCommand):
+ def do(self) -> None:
+ r = self.reader
+ r.insert(self.event * r.get_arg())
+
+
+class insert_nl(EditCommand):
+ def do(self) -> None:
+ r = self.reader
+ r.insert("\n" * r.get_arg())
+
+
+class transpose_characters(EditCommand):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ s = r.pos - 1
+ if s < 0:
+ r.error("cannot transpose at start of buffer")
+ else:
+ if s == len(b):
+ s -= 1
+ t = min(s + r.get_arg(), len(b) - 1)
+ c = b[s]
+ del b[s]
+ b.insert(t, c)
+ r.pos = t
+ r.dirty = True
+
+
+class backspace(EditCommand):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ for i in range(r.get_arg()):
+ if r.pos > 0:
+ r.pos -= 1
+ del b[r.pos]
+ r.dirty = True
+ else:
+ self.reader.error("can't backspace at start")
+
+
+class delete(EditCommand):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ if (
+ r.pos == 0
+ and len(b) == 0 # this is something of a hack
+ and self.event[-1] == "\004"
+ ):
+ r.update_screen()
+ r.console.finish()
+ raise EOFError
+ for i in range(r.get_arg()):
+ if r.pos != len(b):
+ del b[r.pos]
+ r.dirty = True
+ else:
+ self.reader.error("end of buffer")
+
+
+class accept(FinishCommand):
+ def do(self) -> None:
+ pass
+
+
+class help(Command):
+ def do(self) -> None:
+ import _sitebuiltins
+
+ with self.reader.suspend():
+ self.reader.msg = _sitebuiltins._Helper()() # type: ignore[assignment, call-arg]
+
+
+class invalid_key(Command):
+ def do(self) -> None:
+ pending = self.reader.console.getpending()
+ s = "".join(self.event) + pending.data
+ self.reader.error("`%r' not bound" % s)
+
+
+class invalid_command(Command):
+ def do(self) -> None:
+ s = self.event_name
+ self.reader.error("command `%s' not known" % s)
+
+
+class show_history(Command):
+ def do(self) -> None:
+ from .pager import get_pager
+ from site import gethistoryfile # type: ignore[attr-defined]
+
+ history = os.linesep.join(self.reader.history[:])
+ with self.reader.suspend():
+ pager = get_pager()
+ pager(history, gethistoryfile())
+
+
+class paste_mode(Command):
+
+ def do(self) -> None:
+ self.reader.paste_mode = not self.reader.paste_mode
+ self.reader.dirty = True
diff --git a/Lib/_pyrepl/completing_reader.py b/Lib/_pyrepl/completing_reader.py
new file mode 100644
index 0000000..19fc06f
--- /dev/null
+++ b/Lib/_pyrepl/completing_reader.py
@@ -0,0 +1,287 @@
+# Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com>
+# Antonio Cuni
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+
+import re
+from . import commands, console, reader
+from .reader import Reader
+
+
+# types
+Command = commands.Command
+if False:
+ from .types import Callback, SimpleContextManager, KeySpec, CommandName
+
+
+def prefix(wordlist: list[str], j: int = 0) -> str:
+ d = {}
+ i = j
+ try:
+ while 1:
+ for word in wordlist:
+ d[word[i]] = 1
+ if len(d) > 1:
+ return wordlist[0][j:i]
+ i += 1
+ d = {}
+ except IndexError:
+ return wordlist[0][j:i]
+ return ""
+
+
+STRIPCOLOR_REGEX = re.compile(r"\x1B\[([0-9]{1,3}(;[0-9]{1,2})?)?[m|K]")
+
+def stripcolor(s: str) -> str:
+ return STRIPCOLOR_REGEX.sub('', s)
+
+
+def real_len(s: str) -> int:
+ return len(stripcolor(s))
+
+
+def left_align(s: str, maxlen: int) -> str:
+ stripped = stripcolor(s)
+ if len(stripped) > maxlen:
+ # too bad, we remove the color
+ return stripped[:maxlen]
+ padding = maxlen - len(stripped)
+ return s + ' '*padding
+
+
+def build_menu(
+ cons: console.Console,
+ wordlist: list[str],
+ start: int,
+ use_brackets: bool,
+ sort_in_column: bool,
+) -> tuple[list[str], int]:
+ if use_brackets:
+ item = "[ %s ]"
+ padding = 4
+ else:
+ item = "%s "
+ padding = 2
+ maxlen = min(max(map(real_len, wordlist)), cons.width - padding)
+ cols = int(cons.width / (maxlen + padding))
+ rows = int((len(wordlist) - 1)/cols + 1)
+
+ if sort_in_column:
+ # sort_in_column=False (default) sort_in_column=True
+ # A B C A D G
+ # D E F B E
+ # G C F
+ #
+ # "fill" the table with empty words, so we always have the same amout
+ # of rows for each column
+ missing = cols*rows - len(wordlist)
+ wordlist = wordlist + ['']*missing
+ indexes = [(i % cols) * rows + i // cols for i in range(len(wordlist))]
+ wordlist = [wordlist[i] for i in indexes]
+ menu = []
+ i = start
+ for r in range(rows):
+ row = []
+ for col in range(cols):
+ row.append(item % left_align(wordlist[i], maxlen))
+ i += 1
+ if i >= len(wordlist):
+ break
+ menu.append(''.join(row))
+ if i >= len(wordlist):
+ i = 0
+ break
+ if r + 5 > cons.height:
+ menu.append(" %d more... " % (len(wordlist) - i))
+ break
+ return menu, i
+
+# this gets somewhat user interface-y, and as a result the logic gets
+# very convoluted.
+#
+# To summarise the summary of the summary:- people are a problem.
+# -- The Hitch-Hikers Guide to the Galaxy, Episode 12
+
+#### Desired behaviour of the completions commands.
+# the considerations are:
+# (1) how many completions are possible
+# (2) whether the last command was a completion
+# (3) if we can assume that the completer is going to return the same set of
+# completions: this is controlled by the ``assume_immutable_completions``
+# variable on the reader, which is True by default to match the historical
+# behaviour of pyrepl, but e.g. False in the ReadlineAlikeReader to match
+# more closely readline's semantics (this is needed e.g. by
+# fancycompleter)
+#
+# if there's no possible completion, beep at the user and point this out.
+# this is easy.
+#
+# if there's only one possible completion, stick it in. if the last thing
+# user did was a completion, point out that he isn't getting anywhere, but
+# only if the ``assume_immutable_completions`` is True.
+#
+# now it gets complicated.
+#
+# for the first press of a completion key:
+# if there's a common prefix, stick it in.
+
+# irrespective of whether anything got stuck in, if the word is now
+# complete, show the "complete but not unique" message
+
+# if there's no common prefix and if the word is not now complete,
+# beep.
+
+# common prefix -> yes no
+# word complete \/
+# yes "cbnu" "cbnu"
+# no - beep
+
+# for the second bang on the completion key
+# there will necessarily be no common prefix
+# show a menu of the choices.
+
+# for subsequent bangs, rotate the menu around (if there are sufficient
+# choices).
+
+
+class complete(commands.Command):
+ def do(self) -> None:
+ r: CompletingReader
+ r = self.reader # type: ignore[assignment]
+ last_is_completer = r.last_command_is(self.__class__)
+ immutable_completions = r.assume_immutable_completions
+ completions_unchangable = last_is_completer and immutable_completions
+ stem = r.get_stem()
+ if not completions_unchangable:
+ r.cmpltn_menu_choices = r.get_completions(stem)
+
+ completions = r.cmpltn_menu_choices
+ if not completions:
+ r.error("no matches")
+ elif len(completions) == 1:
+ if completions_unchangable and len(completions[0]) == len(stem):
+ r.msg = "[ sole completion ]"
+ r.dirty = True
+ r.insert(completions[0][len(stem):])
+ else:
+ p = prefix(completions, len(stem))
+ if p:
+ r.insert(p)
+ if last_is_completer:
+ if not r.cmpltn_menu_vis:
+ r.cmpltn_menu_vis = 1
+ r.cmpltn_menu, r.cmpltn_menu_end = build_menu(
+ r.console, completions, r.cmpltn_menu_end,
+ r.use_brackets, r.sort_in_column)
+ r.dirty = True
+ elif stem + p in completions:
+ r.msg = "[ complete but not unique ]"
+ r.dirty = True
+ else:
+ r.msg = "[ not unique ]"
+ r.dirty = True
+
+
+class self_insert(commands.self_insert):
+ def do(self) -> None:
+ r: CompletingReader
+ r = self.reader # type: ignore[assignment]
+
+ commands.self_insert.do(self)
+
+ if r.cmpltn_menu_vis:
+ stem = r.get_stem()
+ if len(stem) < 1:
+ r.cmpltn_reset()
+ else:
+ completions = [w for w in r.cmpltn_menu_choices
+ if w.startswith(stem)]
+ if completions:
+ r.cmpltn_menu, r.cmpltn_menu_end = build_menu(
+ r.console, completions, 0,
+ r.use_brackets, r.sort_in_column)
+ else:
+ r.cmpltn_reset()
+
+
+@dataclass
+class CompletingReader(Reader):
+ """Adds completion support"""
+
+ ### Class variables
+ # see the comment for the complete command
+ assume_immutable_completions = True
+ use_brackets = True # display completions inside []
+ sort_in_column = False
+
+ ### Instance variables
+ cmpltn_menu: list[str] = field(init=False)
+ cmpltn_menu_vis: int = field(init=False)
+ cmpltn_menu_end: int = field(init=False)
+ cmpltn_menu_choices: list[str] = field(init=False)
+
+ def __post_init__(self) -> None:
+ super().__post_init__()
+ self.cmpltn_reset()
+ for c in (complete, self_insert):
+ self.commands[c.__name__] = c
+ self.commands[c.__name__.replace('_', '-')] = c
+
+ def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
+ return super().collect_keymap() + (
+ (r'\t', 'complete'),)
+
+ def after_command(self, cmd: Command) -> None:
+ super().after_command(cmd)
+ if not isinstance(cmd, (complete, self_insert)):
+ self.cmpltn_reset()
+
+ def calc_screen(self) -> list[str]:
+ screen = super().calc_screen()
+ if self.cmpltn_menu_vis:
+ ly = self.lxy[1]
+ screen[ly:ly] = self.cmpltn_menu
+ self.screeninfo[ly:ly] = [(0, [])]*len(self.cmpltn_menu)
+ self.cxy = self.cxy[0], self.cxy[1] + len(self.cmpltn_menu)
+ return screen
+
+ def finish(self) -> None:
+ super().finish()
+ self.cmpltn_reset()
+
+ def cmpltn_reset(self) -> None:
+ self.cmpltn_menu = []
+ self.cmpltn_menu_vis = 0
+ self.cmpltn_menu_end = 0
+ self.cmpltn_menu_choices = []
+
+ def get_stem(self) -> str:
+ st = self.syntax_table
+ SW = reader.SYNTAX_WORD
+ b = self.buffer
+ p = self.pos - 1
+ while p >= 0 and st.get(b[p], SW) == SW:
+ p -= 1
+ return ''.join(b[p+1:self.pos])
+
+ def get_completions(self, stem: str) -> list[str]:
+ return []
diff --git a/Lib/_pyrepl/console.py b/Lib/_pyrepl/console.py
new file mode 100644
index 0000000..d7e86e7
--- /dev/null
+++ b/Lib/_pyrepl/console.py
@@ -0,0 +1,112 @@
+# Copyright 2000-2004 Michael Hudson-Doyle <micahel@gmail.com>
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+
+
+@dataclass
+class Event:
+ evt: str
+ data: str
+ raw: bytes = b""
+
+
+@dataclass
+class Console(ABC):
+ screen: list[str] = field(default_factory=list)
+ height: int = 25
+ width: int = 80
+
+ @abstractmethod
+ def refresh(self, screen: list[str], xy: tuple[int, int]) -> None: ...
+
+ @abstractmethod
+ def prepare(self) -> None: ...
+
+ @abstractmethod
+ def restore(self) -> None: ...
+
+ @abstractmethod
+ def move_cursor(self, x: int, y: int) -> None: ...
+
+ @abstractmethod
+ def set_cursor_vis(self, visible: bool) -> None: ...
+
+ @abstractmethod
+ def getheightwidth(self) -> tuple[int, int]:
+ """Return (height, width) where height and width are the height
+ and width of the terminal window in characters."""
+ ...
+
+ @abstractmethod
+ def get_event(self, block: bool = True) -> Event | None:
+ """Return an Event instance. Returns None if |block| is false
+ and there is no event pending, otherwise waits for the
+ completion of an event."""
+ ...
+
+ @abstractmethod
+ def push_char(self, char: int | bytes) -> None:
+ """
+ Push a character to the console event queue.
+ """
+ ...
+
+ @abstractmethod
+ def beep(self) -> None: ...
+
+ @abstractmethod
+ def clear(self) -> None:
+ """Wipe the screen"""
+ ...
+
+ @abstractmethod
+ def finish(self) -> None:
+ """Move the cursor to the end of the display and otherwise get
+ ready for end. XXX could be merged with restore? Hmm."""
+ ...
+
+ @abstractmethod
+ def flushoutput(self) -> None:
+ """Flush all output to the screen (assuming there's some
+ buffering going on somewhere)."""
+ ...
+
+ @abstractmethod
+ def forgetinput(self) -> None:
+ """Forget all pending, but not yet processed input."""
+ ...
+
+ @abstractmethod
+ def getpending(self) -> Event:
+ """Return the characters that have been typed but not yet
+ processed."""
+ ...
+
+ @abstractmethod
+ def wait(self) -> None:
+ """Wait for an event."""
+ ...
+
+ @abstractmethod
+ def repaint(self) -> None:
+ ...
diff --git a/Lib/_pyrepl/curses.py b/Lib/_pyrepl/curses.py
new file mode 100644
index 0000000..3a624d9
--- /dev/null
+++ b/Lib/_pyrepl/curses.py
@@ -0,0 +1,33 @@
+# Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com>
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+
+try:
+ import _curses
+except ImportError:
+ try:
+ import curses as _curses # type: ignore[no-redef]
+ except ImportError:
+ from . import _minimal_curses as _curses # type: ignore[no-redef]
+
+setupterm = _curses.setupterm
+tigetstr = _curses.tigetstr
+tparm = _curses.tparm
+error = _curses.error
diff --git a/Lib/_pyrepl/fancy_termios.py b/Lib/_pyrepl/fancy_termios.py
new file mode 100644
index 0000000..5b85cb0
--- /dev/null
+++ b/Lib/_pyrepl/fancy_termios.py
@@ -0,0 +1,74 @@
+# Copyright 2000-2004 Michael Hudson-Doyle <micahel@gmail.com>
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+import termios
+
+
+class TermState:
+ def __init__(self, tuples):
+ (
+ self.iflag,
+ self.oflag,
+ self.cflag,
+ self.lflag,
+ self.ispeed,
+ self.ospeed,
+ self.cc,
+ ) = tuples
+
+ def as_list(self):
+ return [
+ self.iflag,
+ self.oflag,
+ self.cflag,
+ self.lflag,
+ self.ispeed,
+ self.ospeed,
+ self.cc,
+ ]
+
+ def copy(self):
+ return self.__class__(self.as_list())
+
+
+def tcgetattr(fd):
+ return TermState(termios.tcgetattr(fd))
+
+
+def tcsetattr(fd, when, attrs):
+ termios.tcsetattr(fd, when, attrs.as_list())
+
+
+class Term(TermState):
+ TS__init__ = TermState.__init__
+
+ def __init__(self, fd=0):
+ self.TS__init__(termios.tcgetattr(fd))
+ self.fd = fd
+ self.stack = []
+
+ def save(self):
+ self.stack.append(self.as_list())
+
+ def set(self, when=termios.TCSANOW):
+ termios.tcsetattr(self.fd, when, self.as_list())
+
+ def restore(self):
+ self.TS__init__(self.stack.pop())
+ self.set()
diff --git a/Lib/_pyrepl/historical_reader.py b/Lib/_pyrepl/historical_reader.py
new file mode 100644
index 0000000..eef7d90
--- /dev/null
+++ b/Lib/_pyrepl/historical_reader.py
@@ -0,0 +1,345 @@
+# Copyright 2000-2004 Michael Hudson-Doyle <micahel@gmail.com>
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+
+from contextlib import contextmanager
+from dataclasses import dataclass, field
+
+from . import commands, input
+from .reader import Reader
+
+
+if False:
+ from .types import Callback, SimpleContextManager, KeySpec, CommandName
+
+
+isearch_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple(
+ [("\\%03o" % c, "isearch-end") for c in range(256) if chr(c) != "\\"]
+ + [(c, "isearch-add-character") for c in map(chr, range(32, 127)) if c != "\\"]
+ + [
+ ("\\%03o" % c, "isearch-add-character")
+ for c in range(256)
+ if chr(c).isalpha() and chr(c) != "\\"
+ ]
+ + [
+ ("\\\\", "self-insert"),
+ (r"\C-r", "isearch-backwards"),
+ (r"\C-s", "isearch-forwards"),
+ (r"\C-c", "isearch-cancel"),
+ (r"\C-g", "isearch-cancel"),
+ (r"\<backspace>", "isearch-backspace"),
+ ]
+)
+
+ISEARCH_DIRECTION_NONE = ""
+ISEARCH_DIRECTION_BACKWARDS = "r"
+ISEARCH_DIRECTION_FORWARDS = "f"
+
+
+class next_history(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ if r.historyi == len(r.history):
+ r.error("end of history list")
+ return
+ r.select_item(r.historyi + 1)
+
+
+class previous_history(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ if r.historyi == 0:
+ r.error("start of history list")
+ return
+ r.select_item(r.historyi - 1)
+
+
+class restore_history(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ if r.historyi != len(r.history):
+ if r.get_unicode() != r.history[r.historyi]:
+ r.buffer = list(r.history[r.historyi])
+ r.pos = len(r.buffer)
+ r.dirty = True
+
+
+class first_history(commands.Command):
+ def do(self) -> None:
+ self.reader.select_item(0)
+
+
+class last_history(commands.Command):
+ def do(self) -> None:
+ self.reader.select_item(len(self.reader.history))
+
+
+class operate_and_get_next(commands.FinishCommand):
+ def do(self) -> None:
+ self.reader.next_history = self.reader.historyi + 1
+
+
+class yank_arg(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ if r.last_command is self.__class__:
+ r.yank_arg_i += 1
+ else:
+ r.yank_arg_i = 0
+ if r.historyi < r.yank_arg_i:
+ r.error("beginning of history list")
+ return
+ a = r.get_arg(-1)
+ # XXX how to split?
+ words = r.get_item(r.historyi - r.yank_arg_i - 1).split()
+ if a < -len(words) or a >= len(words):
+ r.error("no such arg")
+ return
+ w = words[a]
+ b = r.buffer
+ if r.yank_arg_i > 0:
+ o = len(r.yank_arg_yanked)
+ else:
+ o = 0
+ b[r.pos - o : r.pos] = list(w)
+ r.yank_arg_yanked = w
+ r.pos += len(w) - o
+ r.dirty = True
+
+
+class forward_history_isearch(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ r.isearch_direction = ISEARCH_DIRECTION_FORWARDS
+ r.isearch_start = r.historyi, r.pos
+ r.isearch_term = ""
+ r.dirty = True
+ r.push_input_trans(r.isearch_trans)
+
+
+class reverse_history_isearch(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ r.isearch_direction = ISEARCH_DIRECTION_BACKWARDS
+ r.dirty = True
+ r.isearch_term = ""
+ r.push_input_trans(r.isearch_trans)
+ r.isearch_start = r.historyi, r.pos
+
+
+class isearch_cancel(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ r.isearch_direction = ISEARCH_DIRECTION_NONE
+ r.pop_input_trans()
+ r.select_item(r.isearch_start[0])
+ r.pos = r.isearch_start[1]
+ r.dirty = True
+
+
+class isearch_add_character(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ r.isearch_term += self.event[-1]
+ r.dirty = True
+ p = r.pos + len(r.isearch_term) - 1
+ if b[p : p + 1] != [r.isearch_term[-1]]:
+ r.isearch_next()
+
+
+class isearch_backspace(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ if len(r.isearch_term) > 0:
+ r.isearch_term = r.isearch_term[:-1]
+ r.dirty = True
+ else:
+ r.error("nothing to rubout")
+
+
+class isearch_forwards(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ r.isearch_direction = ISEARCH_DIRECTION_FORWARDS
+ r.isearch_next()
+
+
+class isearch_backwards(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ r.isearch_direction = ISEARCH_DIRECTION_BACKWARDS
+ r.isearch_next()
+
+
+class isearch_end(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ r.isearch_direction = ISEARCH_DIRECTION_NONE
+ r.console.forgetinput()
+ r.pop_input_trans()
+ r.dirty = True
+
+
+@dataclass
+class HistoricalReader(Reader):
+ """Adds history support (with incremental history searching) to the
+ Reader class.
+ """
+
+ history: list[str] = field(default_factory=list)
+ historyi: int = 0
+ next_history: int | None = None
+ transient_history: dict[int, str] = field(default_factory=dict)
+ isearch_term: str = ""
+ isearch_direction: str = ISEARCH_DIRECTION_NONE
+ isearch_start: tuple[int, int] = field(init=False)
+ isearch_trans: input.KeymapTranslator = field(init=False)
+ yank_arg_i: int = 0
+ yank_arg_yanked: str = ""
+
+ def __post_init__(self) -> None:
+ super().__post_init__()
+ for c in [
+ next_history,
+ previous_history,
+ restore_history,
+ first_history,
+ last_history,
+ yank_arg,
+ forward_history_isearch,
+ reverse_history_isearch,
+ isearch_end,
+ isearch_add_character,
+ isearch_cancel,
+ isearch_add_character,
+ isearch_backspace,
+ isearch_forwards,
+ isearch_backwards,
+ operate_and_get_next,
+ ]:
+ self.commands[c.__name__] = c
+ self.commands[c.__name__.replace("_", "-")] = c
+ self.isearch_start = self.historyi, self.pos
+ self.isearch_trans = input.KeymapTranslator(
+ isearch_keymap, invalid_cls=isearch_end, character_cls=isearch_add_character
+ )
+
+ def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
+ return super().collect_keymap() + (
+ (r"\C-n", "next-history"),
+ (r"\C-p", "previous-history"),
+ (r"\C-o", "operate-and-get-next"),
+ (r"\C-r", "reverse-history-isearch"),
+ (r"\C-s", "forward-history-isearch"),
+ (r"\M-r", "restore-history"),
+ (r"\M-.", "yank-arg"),
+ (r"\<page down>", "last-history"),
+ (r"\<page up>", "first-history"),
+ )
+
+ def select_item(self, i: int) -> None:
+ self.transient_history[self.historyi] = self.get_unicode()
+ buf = self.transient_history.get(i)
+ if buf is None:
+ buf = self.history[i]
+ self.buffer = list(buf)
+ self.historyi = i
+ self.pos = len(self.buffer)
+ self.dirty = True
+
+ def get_item(self, i: int) -> str:
+ if i != len(self.history):
+ return self.transient_history.get(i, self.history[i])
+ else:
+ return self.transient_history.get(i, self.get_unicode())
+
+ @contextmanager
+ def suspend(self) -> SimpleContextManager:
+ with super().suspend():
+ try:
+ old_history = self.history[:]
+ del self.history[:]
+ yield
+ finally:
+ self.history[:] = old_history
+
+ def prepare(self) -> None:
+ super().prepare()
+ try:
+ self.transient_history = {}
+ if self.next_history is not None and self.next_history < len(self.history):
+ self.historyi = self.next_history
+ self.buffer[:] = list(self.history[self.next_history])
+ self.pos = len(self.buffer)
+ self.transient_history[len(self.history)] = ""
+ else:
+ self.historyi = len(self.history)
+ self.next_history = None
+ except:
+ self.restore()
+ raise
+
+ def get_prompt(self, lineno: int, cursor_on_line: bool) -> str:
+ if cursor_on_line and self.isearch_direction != ISEARCH_DIRECTION_NONE:
+ d = "rf"[self.isearch_direction == ISEARCH_DIRECTION_FORWARDS]
+ return "(%s-search `%s') " % (d, self.isearch_term)
+ else:
+ return super().get_prompt(lineno, cursor_on_line)
+
+ def isearch_next(self) -> None:
+ st = self.isearch_term
+ p = self.pos
+ i = self.historyi
+ s = self.get_unicode()
+ forwards = self.isearch_direction == ISEARCH_DIRECTION_FORWARDS
+ while 1:
+ if forwards:
+ p = s.find(st, p + 1)
+ else:
+ p = s.rfind(st, 0, p + len(st) - 1)
+ if p != -1:
+ self.select_item(i)
+ self.pos = p
+ return
+ elif (forwards and i >= len(self.history) - 1) or (not forwards and i == 0):
+ self.error("not found")
+ return
+ else:
+ if forwards:
+ i += 1
+ s = self.get_item(i)
+ p = -1
+ else:
+ i -= 1
+ s = self.get_item(i)
+ p = len(s)
+
+ def finish(self) -> None:
+ super().finish()
+ ret = self.get_unicode()
+ for i, t in self.transient_history.items():
+ if i < len(self.history) and i != self.historyi:
+ self.history[i] = t
+ if ret and should_auto_add_history:
+ self.history.append(ret)
+
+
+should_auto_add_history = True
diff --git a/Lib/_pyrepl/input.py b/Lib/_pyrepl/input.py
new file mode 100644
index 0000000..300e16d
--- /dev/null
+++ b/Lib/_pyrepl/input.py
@@ -0,0 +1,114 @@
+# Copyright 2000-2004 Michael Hudson-Doyle <micahel@gmail.com>
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+# (naming modules after builtin functions is not such a hot idea...)
+
+# an KeyTrans instance translates Event objects into Command objects
+
+# hmm, at what level do we want [C-i] and [tab] to be equivalent?
+# [meta-a] and [esc a]? obviously, these are going to be equivalent
+# for the UnixConsole, but should they be for PygameConsole?
+
+# it would in any situation seem to be a bad idea to bind, say, [tab]
+# and [C-i] to *different* things... but should binding one bind the
+# other?
+
+# executive, temporary decision: [tab] and [C-i] are distinct, but
+# [meta-key] is identified with [esc key]. We demand that any console
+# class does quite a lot towards emulating a unix terminal.
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+import unicodedata
+from collections import deque
+
+
+# types
+if False:
+ from .types import EventTuple
+
+
+class InputTranslator(ABC):
+ @abstractmethod
+ def push(self, evt: EventTuple) -> None:
+ pass
+
+ @abstractmethod
+ def get(self) -> EventTuple | None:
+ return None
+
+ @abstractmethod
+ def empty(self) -> bool:
+ return True
+
+
+class KeymapTranslator(InputTranslator):
+ def __init__(self, keymap, verbose=0, invalid_cls=None, character_cls=None):
+ self.verbose = verbose
+ from .keymap import compile_keymap, parse_keys
+
+ self.keymap = keymap
+ self.invalid_cls = invalid_cls
+ self.character_cls = character_cls
+ d = {}
+ for keyspec, command in keymap:
+ keyseq = tuple(parse_keys(keyspec))
+ d[keyseq] = command
+ if self.verbose:
+ print(d)
+ self.k = self.ck = compile_keymap(d, ())
+ self.results = deque()
+ self.stack = []
+
+ def push(self, evt):
+ if self.verbose:
+ print("pushed", evt.data, end="")
+ key = evt.data
+ d = self.k.get(key)
+ if isinstance(d, dict):
+ if self.verbose:
+ print("transition")
+ self.stack.append(key)
+ self.k = d
+ else:
+ if d is None:
+ if self.verbose:
+ print("invalid")
+ if self.stack or len(key) > 1 or unicodedata.category(key) == "C":
+ self.results.append((self.invalid_cls, self.stack + [key]))
+ else:
+ # small optimization:
+ self.k[key] = self.character_cls
+ self.results.append((self.character_cls, [key]))
+ else:
+ if self.verbose:
+ print("matched", d)
+ self.results.append((d, self.stack + [key]))
+ self.stack = []
+ self.k = self.ck
+
+ def get(self):
+ if self.results:
+ return self.results.popleft()
+ else:
+ return None
+
+ def empty(self):
+ return not self.results
diff --git a/Lib/_pyrepl/keymap.py b/Lib/_pyrepl/keymap.py
new file mode 100644
index 0000000..31a0264
--- /dev/null
+++ b/Lib/_pyrepl/keymap.py
@@ -0,0 +1,215 @@
+# Copyright 2000-2008 Michael Hudson-Doyle <micahel@gmail.com>
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""
+functions for parsing keyspecs
+
+Support for turning keyspecs into appropriate sequences.
+
+pyrepl uses it's own bastardized keyspec format, which is meant to be
+a strict superset of readline's \"KEYSEQ\" format (which is to say
+that if you can come up with a spec readline accepts that this
+doesn't, you've found a bug and should tell me about it).
+
+Note that this is the `\\C-o' style of readline keyspec, not the
+`Control-o' sort.
+
+A keyspec is a string representing a sequence of keypresses that can
+be bound to a command.
+
+All characters other than the backslash represent themselves. In the
+traditional manner, a backslash introduces a escape sequence.
+
+The extension to readline is that the sequence \\<KEY> denotes the
+sequence of charaters produced by hitting KEY.
+
+Examples:
+
+`a' - what you get when you hit the `a' key
+`\\EOA' - Escape - O - A (up, on my terminal)
+`\\<UP>' - the up arrow key
+`\\<up>' - ditto (keynames are case insensitive)
+`\\C-o', `\\c-o' - control-o
+`\\M-.' - meta-period
+`\\E.' - ditto (that's how meta works for pyrepl)
+`\\<tab>', `\\<TAB>', `\\t', `\\011', '\\x09', '\\X09', '\\C-i', '\\C-I'
+ - all of these are the tab character. Can you think of any more?
+"""
+
+_escapes = {
+ "\\": "\\",
+ "'": "'",
+ '"': '"',
+ "a": "\a",
+ "b": "\b",
+ "e": "\033",
+ "f": "\f",
+ "n": "\n",
+ "r": "\r",
+ "t": "\t",
+ "v": "\v",
+}
+
+_keynames = {
+ "backspace": "backspace",
+ "delete": "delete",
+ "down": "down",
+ "end": "end",
+ "enter": "\r",
+ "escape": "\033",
+ "f1": "f1",
+ "f2": "f2",
+ "f3": "f3",
+ "f4": "f4",
+ "f5": "f5",
+ "f6": "f6",
+ "f7": "f7",
+ "f8": "f8",
+ "f9": "f9",
+ "f10": "f10",
+ "f11": "f11",
+ "f12": "f12",
+ "f13": "f13",
+ "f14": "f14",
+ "f15": "f15",
+ "f16": "f16",
+ "f17": "f17",
+ "f18": "f18",
+ "f19": "f19",
+ "f20": "f20",
+ "home": "home",
+ "insert": "insert",
+ "left": "left",
+ "page down": "page down",
+ "page up": "page up",
+ "return": "\r",
+ "right": "right",
+ "space": " ",
+ "tab": "\t",
+ "up": "up",
+}
+
+
+class KeySpecError(Exception):
+ pass
+
+
+def _parse_key1(key, s):
+ ctrl = 0
+ meta = 0
+ ret = ""
+ while not ret and s < len(key):
+ if key[s] == "\\":
+ c = key[s + 1].lower()
+ if c in _escapes:
+ ret = _escapes[c]
+ s += 2
+ elif c == "c":
+ if key[s + 2] != "-":
+ raise KeySpecError(
+ "\\C must be followed by `-' (char %d of %s)"
+ % (s + 2, repr(key))
+ )
+ if ctrl:
+ raise KeySpecError(
+ "doubled \\C- (char %d of %s)" % (s + 1, repr(key))
+ )
+ ctrl = 1
+ s += 3
+ elif c == "m":
+ if key[s + 2] != "-":
+ raise KeySpecError(
+ "\\M must be followed by `-' (char %d of %s)"
+ % (s + 2, repr(key))
+ )
+ if meta:
+ raise KeySpecError(
+ "doubled \\M- (char %d of %s)" % (s + 1, repr(key))
+ )
+ meta = 1
+ s += 3
+ elif c.isdigit():
+ n = key[s + 1 : s + 4]
+ ret = chr(int(n, 8))
+ s += 4
+ elif c == "x":
+ n = key[s + 2 : s + 4]
+ ret = chr(int(n, 16))
+ s += 4
+ elif c == "<":
+ t = key.find(">", s)
+ if t == -1:
+ raise KeySpecError(
+ "unterminated \\< starting at char %d of %s"
+ % (s + 1, repr(key))
+ )
+ ret = key[s + 2 : t].lower()
+ if ret not in _keynames:
+ raise KeySpecError(
+ "unrecognised keyname `%s' at char %d of %s"
+ % (ret, s + 2, repr(key))
+ )
+ ret = _keynames[ret]
+ s = t + 1
+ else:
+ raise KeySpecError(
+ "unknown backslash escape %s at char %d of %s"
+ % (repr(c), s + 2, repr(key))
+ )
+ else:
+ ret = key[s]
+ s += 1
+ if ctrl:
+ if len(ret) > 1:
+ raise KeySpecError("\\C- must be followed by a character")
+ ret = chr(ord(ret) & 0x1F) # curses.ascii.ctrl()
+ if meta:
+ ret = ["\033", ret]
+ else:
+ ret = [ret]
+ return ret, s
+
+
+def parse_keys(key):
+ s = 0
+ r = []
+ while s < len(key):
+ k, s = _parse_key1(key, s)
+ r.extend(k)
+ return r
+
+
+def compile_keymap(keymap, empty=b""):
+ r = {}
+ for key, value in keymap.items():
+ if isinstance(key, bytes):
+ first = key[:1]
+ else:
+ first = key[0]
+ r.setdefault(first, {})[key[1:]] = value
+ for key, value in r.items():
+ if empty in value:
+ if len(value) != 1:
+ raise KeySpecError("key definitions for %s clash" % (value.values(),))
+ else:
+ r[key] = value[empty]
+ else:
+ r[key] = compile_keymap(value, empty)
+ return r
diff --git a/Lib/_pyrepl/mypy.ini b/Lib/_pyrepl/mypy.ini
new file mode 100644
index 0000000..ecd0309
--- /dev/null
+++ b/Lib/_pyrepl/mypy.ini
@@ -0,0 +1,27 @@
+# Config file for running mypy on _pyrepl.
+# Run mypy by invoking `mypy --config-file Lib/_pyrepl/mypy.ini`
+# on the command-line from the repo root
+
+[mypy]
+files = Lib/_pyrepl
+explicit_package_bases = True
+python_version = 3.12
+platform = linux
+pretty = True
+
+# Enable most stricter settings
+enable_error_code = ignore-without-code
+strict = True
+
+# Various stricter settings that we can't yet enable
+# Try to enable these in the following order:
+disallow_any_generics = False
+disallow_untyped_calls = False
+disallow_untyped_defs = False
+check_untyped_defs = False
+
+disable_error_code = return
+
+# Various internal modules that typeshed deliberately doesn't have stubs for:
+[mypy-_abc.*,_opcode.*,_overlapped.*,_testcapi.*,_testinternalcapi.*,test.*]
+ignore_missing_imports = True
diff --git a/Lib/_pyrepl/pager.py b/Lib/_pyrepl/pager.py
new file mode 100644
index 0000000..ecf5ddc
--- /dev/null
+++ b/Lib/_pyrepl/pager.py
@@ -0,0 +1,169 @@
+from __future__ import annotations
+
+import io
+import os
+import re
+import sys
+
+
+# types
+if False:
+ from typing import Protocol, Any
+ class Pager(Protocol):
+ def __call__(self, text: str, title: str = "") -> None:
+ ...
+
+
+def get_pager() -> Pager:
+ """Decide what method to use for paging through text."""
+ if not hasattr(sys.stdin, "isatty"):
+ return plain_pager
+ if not hasattr(sys.stdout, "isatty"):
+ return plain_pager
+ if not sys.stdin.isatty() or not sys.stdout.isatty():
+ return plain_pager
+ if sys.platform == "emscripten":
+ return plainpager
+ use_pager = os.environ.get('MANPAGER') or os.environ.get('PAGER')
+ if use_pager:
+ if sys.platform == 'win32': # pipes completely broken in Windows
+ return lambda text, title='': tempfile_pager(plain(text), use_pager)
+ elif os.environ.get('TERM') in ('dumb', 'emacs'):
+ return lambda text, title='': pipe_pager(plain(text), use_pager, title)
+ else:
+ return lambda text, title='': pipe_pager(text, use_pager, title)
+ if os.environ.get('TERM') in ('dumb', 'emacs'):
+ return plain_pager
+ if sys.platform == 'win32':
+ return lambda text, title='': tempfilepager(plain(text), 'more <')
+ if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
+ return lambda text, title='': pipe_pager(text, 'less', title)
+
+ import tempfile
+ (fd, filename) = tempfile.mkstemp()
+ os.close(fd)
+ try:
+ if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
+ return lambda text, title='': pipe_pager(text, 'more', title)
+ else:
+ return tty_pager
+ finally:
+ os.unlink(filename)
+
+
+def escape_stdout(text: str) -> str:
+ # Escape non-encodable characters to avoid encoding errors later
+ encoding = getattr(sys.stdout, 'encoding', None) or 'utf-8'
+ return text.encode(encoding, 'backslashreplace').decode(encoding)
+
+
+def escape_less(s: str) -> str:
+ return re.sub(r'([?:.%\\])', r'\\\1', s)
+
+
+def plain(text: str) -> str:
+ """Remove boldface formatting from text."""
+ return re.sub('.\b', '', text)
+
+
+def tty_pager(text: str, title: str = '') -> None:
+ """Page through text on a text terminal."""
+ lines = plain(escape_stdout(text)).split('\n')
+ has_tty = False
+ try:
+ import tty
+ import termios
+ fd = sys.stdin.fileno()
+ old = termios.tcgetattr(fd)
+ tty.setcbreak(fd)
+ getchar = lambda: sys.stdin.read(1)
+ has_tty = True
+ except (ImportError, AttributeError, io.UnsupportedOperation):
+ getchar = lambda: sys.stdin.readline()[:-1][:1]
+
+ try:
+ try:
+ h = int(os.environ.get('LINES', 0))
+ except ValueError:
+ h = 0
+ if h <= 1:
+ h = 25
+ r = inc = h - 1
+ sys.stdout.write('\n'.join(lines[:inc]) + '\n')
+ while lines[r:]:
+ sys.stdout.write('-- more --')
+ sys.stdout.flush()
+ c = getchar()
+
+ if c in ('q', 'Q'):
+ sys.stdout.write('\r \r')
+ break
+ elif c in ('\r', '\n'):
+ sys.stdout.write('\r \r' + lines[r] + '\n')
+ r = r + 1
+ continue
+ if c in ('b', 'B', '\x1b'):
+ r = r - inc - inc
+ if r < 0: r = 0
+ sys.stdout.write('\n' + '\n'.join(lines[r:r+inc]) + '\n')
+ r = r + inc
+
+ finally:
+ if has_tty:
+ termios.tcsetattr(fd, termios.TCSAFLUSH, old)
+
+
+def plain_pager(text: str, title: str = '') -> None:
+ """Simply print unformatted text. This is the ultimate fallback."""
+ sys.stdout.write(plain(escape_stdout(text)))
+
+
+def pipe_pager(text: str, cmd: str, title: str = '') -> None:
+ """Page through text by feeding it to another program."""
+ import subprocess
+ env = os.environ.copy()
+ if title:
+ title += ' '
+ esc_title = escape_less(title)
+ prompt_string = (
+ f' {esc_title}' +
+ '?ltline %lt?L/%L.'
+ ':byte %bB?s/%s.'
+ '.'
+ '?e (END):?pB %pB\\%..'
+ ' (press h for help or q to quit)')
+ env['LESS'] = '-RmPm{0}$PM{0}$'.format(prompt_string)
+ proc = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
+ errors='backslashreplace', env=env)
+ assert proc.stdin is not None
+ try:
+ with proc.stdin as pipe:
+ try:
+ pipe.write(text)
+ except KeyboardInterrupt:
+ # We've hereby abandoned whatever text hasn't been written,
+ # but the pager is still in control of the terminal.
+ pass
+ except OSError:
+ pass # Ignore broken pipes caused by quitting the pager program.
+ while True:
+ try:
+ proc.wait()
+ break
+ except KeyboardInterrupt:
+ # Ignore ctl-c like the pager itself does. Otherwise the pager is
+ # left running and the terminal is in raw mode and unusable.
+ pass
+
+
+def tempfile_pager(text: str, cmd: str, title: str = '') -> None:
+ """Page through text by invoking a program on a temporary file."""
+ import tempfile
+ with tempfile.TemporaryDirectory() as tempdir:
+ filename = os.path.join(tempdir, 'pydoc.out')
+ with open(filename, 'w', errors='backslashreplace',
+ encoding=os.device_encoding(0) if
+ sys.platform == 'win32' else None
+ ) as file:
+ file.write(text)
+ os.system(cmd + ' "' + filename + '"')
diff --git a/Lib/_pyrepl/reader.py b/Lib/_pyrepl/reader.py
new file mode 100644
index 0000000..a7ef988
--- /dev/null
+++ b/Lib/_pyrepl/reader.py
@@ -0,0 +1,660 @@
+# Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com>
+# Antonio Cuni
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+
+from contextlib import contextmanager
+from dataclasses import dataclass, field, fields
+import re
+import unicodedata
+from _colorize import can_colorize, ANSIColors # type: ignore[import-not-found]
+
+
+from . import commands, console, input
+from .utils import ANSI_ESCAPE_SEQUENCE, wlen
+from .trace import trace
+
+
+# types
+Command = commands.Command
+if False:
+ from .types import Callback, SimpleContextManager, KeySpec, CommandName
+
+
+def disp_str(buffer: str) -> tuple[str, list[int]]:
+ """disp_str(buffer:string) -> (string, [int])
+
+ Return the string that should be the printed represenation of
+ |buffer| and a list detailing where the characters of |buffer|
+ get used up. E.g.:
+
+ >>> disp_str(chr(3))
+ ('^C', [1, 0])
+
+ """
+ b: list[int] = []
+ s: list[str] = []
+ for c in buffer:
+ if unicodedata.category(c).startswith("C"):
+ c = r"\u%04x" % ord(c)
+ s.append(c)
+ b.append(wlen(c))
+ b.extend([0] * (len(c) - 1))
+ return "".join(s), b
+
+
+# syntax classes:
+
+SYNTAX_WHITESPACE, SYNTAX_WORD, SYNTAX_SYMBOL = range(3)
+
+
+def make_default_syntax_table() -> dict[str, int]:
+ # XXX perhaps should use some unicodedata here?
+ st: dict[str, int] = {}
+ for c in map(chr, range(256)):
+ st[c] = SYNTAX_SYMBOL
+ for c in [a for a in map(chr, range(256)) if a.isalnum()]:
+ st[c] = SYNTAX_WORD
+ st["\n"] = st[" "] = SYNTAX_WHITESPACE
+ return st
+
+
+def make_default_commands() -> dict[CommandName, type[Command]]:
+ result: dict[CommandName, type[Command]] = {}
+ for v in vars(commands).values():
+ if isinstance(v, type) and issubclass(v, Command) and v.__name__[0].islower():
+ result[v.__name__] = v
+ result[v.__name__.replace("_", "-")] = v
+ return result
+
+
+default_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple(
+ [
+ (r"\C-a", "beginning-of-line"),
+ (r"\C-b", "left"),
+ (r"\C-c", "interrupt"),
+ (r"\C-d", "delete"),
+ (r"\C-e", "end-of-line"),
+ (r"\C-f", "right"),
+ (r"\C-g", "cancel"),
+ (r"\C-h", "backspace"),
+ (r"\C-j", "accept"),
+ (r"\<return>", "accept"),
+ (r"\C-k", "kill-line"),
+ (r"\C-l", "clear-screen"),
+ (r"\C-m", "accept"),
+ (r"\C-t", "transpose-characters"),
+ (r"\C-u", "unix-line-discard"),
+ (r"\C-w", "unix-word-rubout"),
+ (r"\C-x\C-u", "upcase-region"),
+ (r"\C-y", "yank"),
+ (r"\C-z", "suspend"),
+ (r"\M-b", "backward-word"),
+ (r"\M-c", "capitalize-word"),
+ (r"\M-d", "kill-word"),
+ (r"\M-f", "forward-word"),
+ (r"\M-l", "downcase-word"),
+ (r"\M-t", "transpose-words"),
+ (r"\M-u", "upcase-word"),
+ (r"\M-y", "yank-pop"),
+ (r"\M--", "digit-arg"),
+ (r"\M-0", "digit-arg"),
+ (r"\M-1", "digit-arg"),
+ (r"\M-2", "digit-arg"),
+ (r"\M-3", "digit-arg"),
+ (r"\M-4", "digit-arg"),
+ (r"\M-5", "digit-arg"),
+ (r"\M-6", "digit-arg"),
+ (r"\M-7", "digit-arg"),
+ (r"\M-8", "digit-arg"),
+ (r"\M-9", "digit-arg"),
+ # (r'\M-\n', 'insert-nl'),
+ ("\\\\", "self-insert"),
+ ]
+ + [(c, "self-insert") for c in map(chr, range(32, 127)) if c != "\\"]
+ + [(c, "self-insert") for c in map(chr, range(128, 256)) if c.isalpha()]
+ + [
+ (r"\<up>", "up"),
+ (r"\<down>", "down"),
+ (r"\<left>", "left"),
+ (r"\<right>", "right"),
+ (r"\<delete>", "delete"),
+ (r"\<backspace>", "backspace"),
+ (r"\M-\<backspace>", "backward-kill-word"),
+ (r"\<end>", "end-of-line"), # was 'end'
+ (r"\<home>", "beginning-of-line"), # was 'home'
+ (r"\<f1>", "help"),
+ (r"\<f2>", "show-history"),
+ (r"\<f3>", "paste-mode"),
+ (r"\EOF", "end"), # the entries in the terminfo database for xterms
+ (r"\EOH", "home"), # seem to be wrong. this is a less than ideal
+ # workaround
+ ]
+)
+
+
+@dataclass(slots=True)
+class Reader:
+ """The Reader class implements the bare bones of a command reader,
+ handling such details as editing and cursor motion. What it does
+ not support are such things as completion or history support -
+ these are implemented elsewhere.
+
+ Instance variables of note include:
+
+ * buffer:
+ A *list* (*not* a string at the moment :-) containing all the
+ characters that have been entered.
+ * console:
+ Hopefully encapsulates the OS dependent stuff.
+ * pos:
+ A 0-based index into `buffer' for where the insertion point
+ is.
+ * screeninfo:
+ Ahem. This list contains some info needed to move the
+ insertion point around reasonably efficiently.
+ * cxy, lxy:
+ the position of the insertion point in screen ...
+ * syntax_table:
+ Dictionary mapping characters to `syntax class'; read the
+ emacs docs to see what this means :-)
+ * commands:
+ Dictionary mapping command names to command classes.
+ * arg:
+ The emacs-style prefix argument. It will be None if no such
+ argument has been provided.
+ * dirty:
+ True if we need to refresh the display.
+ * kill_ring:
+ The emacs-style kill-ring; manipulated with yank & yank-pop
+ * ps1, ps2, ps3, ps4:
+ prompts. ps1 is the prompt for a one-line input; for a
+ multiline input it looks like:
+ ps2> first line of input goes here
+ ps3> second and further
+ ps3> lines get ps3
+ ...
+ ps4> and the last one gets ps4
+ As with the usual top-level, you can set these to instances if
+ you like; str() will be called on them (once) at the beginning
+ of each command. Don't put really long or newline containing
+ strings here, please!
+ This is just the default policy; you can change it freely by
+ overriding get_prompt() (and indeed some standard subclasses
+ do).
+ * finished:
+ handle1 will set this to a true value if a command signals
+ that we're done.
+ """
+
+ console: console.Console
+
+ ## state
+ buffer: list[str] = field(default_factory=list)
+ pos: int = 0
+ ps1: str = "->> "
+ ps2: str = "/>> "
+ ps3: str = "|.. "
+ ps4: str = R"\__ "
+ kill_ring: list[list[str]] = field(default_factory=list)
+ msg: str = ""
+ arg: int | None = None
+ dirty: bool = False
+ finished: bool = False
+ paste_mode: bool = False
+ commands: dict[str, type[Command]] = field(default_factory=make_default_commands)
+ last_command: type[Command] | None = None
+ syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table)
+ msg_at_bottom: bool = True
+ keymap: tuple[tuple[str, str], ...] = ()
+ input_trans: input.KeymapTranslator = field(init=False)
+ input_trans_stack: list[input.KeymapTranslator] = field(default_factory=list)
+ screeninfo: list[tuple[int, list[int]]] = field(init=False)
+ cxy: tuple[int, int] = field(init=False)
+ lxy: tuple[int, int] = field(init=False)
+
+ def __post_init__(self) -> None:
+ # Enable the use of `insert` without a `prepare` call - necessary to
+ # facilitate the tab completion hack implemented for
+ # <https://bugs.python.org/issue25660>.
+ self.keymap = self.collect_keymap()
+ self.input_trans = input.KeymapTranslator(
+ self.keymap, invalid_cls="invalid-key", character_cls="self-insert"
+ )
+ self.screeninfo = [(0, [0])]
+ self.cxy = self.pos2xy()
+ self.lxy = (self.pos, 0)
+
+ def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
+ return default_keymap
+
+ def calc_screen(self) -> list[str]:
+ """The purpose of this method is to translate changes in
+ self.buffer into changes in self.screen. Currently it rips
+ everything down and starts from scratch, which whilst not
+ especially efficient is certainly simple(r).
+ """
+ lines = self.get_unicode().split("\n")
+ screen: list[str] = []
+ screeninfo: list[tuple[int, list[int]]] = []
+ pos = self.pos
+ for ln, line in enumerate(lines):
+ ll = len(line)
+ if 0 <= pos <= ll:
+ if self.msg and not self.msg_at_bottom:
+ for mline in self.msg.split("\n"):
+ screen.append(mline)
+ screeninfo.append((0, []))
+ self.lxy = pos, ln
+ prompt = self.get_prompt(ln, ll >= pos >= 0)
+ while "\n" in prompt:
+ pre_prompt, _, prompt = prompt.partition("\n")
+ screen.append(pre_prompt)
+ screeninfo.append((0, []))
+ pos -= ll + 1
+ prompt, lp = self.process_prompt(prompt)
+ l, l2 = disp_str(line)
+ wrapcount = (wlen(l) + lp) // self.console.width
+ if wrapcount == 0:
+ screen.append(prompt + l)
+ screeninfo.append((lp, l2))
+ else:
+ for i in range(wrapcount + 1):
+ prelen = lp if i == 0 else 0
+ index_to_wrap_before = 0
+ column = 0
+ for character_width in l2:
+ if column + character_width >= self.console.width - prelen:
+ break
+ index_to_wrap_before += 1
+ column += character_width
+ pre = prompt if i == 0 else ""
+ post = "\\" if i != wrapcount else ""
+ after = [1] if i != wrapcount else []
+ screen.append(pre + l[:index_to_wrap_before] + post)
+ screeninfo.append((prelen, l2[:index_to_wrap_before] + after))
+ l = l[index_to_wrap_before:]
+ l2 = l2[index_to_wrap_before:]
+ self.screeninfo = screeninfo
+ self.cxy = self.pos2xy()
+ if self.msg and self.msg_at_bottom:
+ for mline in self.msg.split("\n"):
+ screen.append(mline)
+ screeninfo.append((0, []))
+ return screen
+
+ def process_prompt(self, prompt: str) -> tuple[str, int]:
+ """Process the prompt.
+
+ This means calculate the length of the prompt. The character \x01
+ and \x02 are used to bracket ANSI control sequences and need to be
+ excluded from the length calculation. So also a copy of the prompt
+ is returned with these control characters removed."""
+
+ # The logic below also ignores the length of common escape
+ # sequences if they were not explicitly within \x01...\x02.
+ # They are CSI (or ANSI) sequences ( ESC [ ... LETTER )
+
+ out_prompt = ""
+ l = wlen(prompt)
+ pos = 0
+ while True:
+ s = prompt.find("\x01", pos)
+ if s == -1:
+ break
+ e = prompt.find("\x02", s)
+ if e == -1:
+ break
+ # Found start and end brackets, subtract from string length
+ l = l - (e - s + 1)
+ keep = prompt[pos:s]
+ l -= sum(map(wlen, ANSI_ESCAPE_SEQUENCE.findall(keep)))
+ out_prompt += keep + prompt[s + 1 : e]
+ pos = e + 1
+ keep = prompt[pos:]
+ l -= sum(map(wlen, ANSI_ESCAPE_SEQUENCE.findall(keep)))
+ out_prompt += keep
+ return out_prompt, l
+
+ def bow(self, p: int | None = None) -> int:
+ """Return the 0-based index of the word break preceding p most
+ immediately.
+
+ p defaults to self.pos; word boundaries are determined using
+ self.syntax_table."""
+ if p is None:
+ p = self.pos
+ st = self.syntax_table
+ b = self.buffer
+ p -= 1
+ while p >= 0 and st.get(b[p], SYNTAX_WORD) != SYNTAX_WORD:
+ p -= 1
+ while p >= 0 and st.get(b[p], SYNTAX_WORD) == SYNTAX_WORD:
+ p -= 1
+ return p + 1
+
+ def eow(self, p: int | None = None) -> int:
+ """Return the 0-based index of the word break following p most
+ immediately.
+
+ p defaults to self.pos; word boundaries are determined using
+ self.syntax_table."""
+ if p is None:
+ p = self.pos
+ st = self.syntax_table
+ b = self.buffer
+ while p < len(b) and st.get(b[p], SYNTAX_WORD) != SYNTAX_WORD:
+ p += 1
+ while p < len(b) and st.get(b[p], SYNTAX_WORD) == SYNTAX_WORD:
+ p += 1
+ return p
+
+ def bol(self, p: int | None = None) -> int:
+ """Return the 0-based index of the line break preceding p most
+ immediately.
+
+ p defaults to self.pos."""
+ if p is None:
+ p = self.pos
+ b = self.buffer
+ p -= 1
+ while p >= 0 and b[p] != "\n":
+ p -= 1
+ return p + 1
+
+ def eol(self, p: int | None = None) -> int:
+ """Return the 0-based index of the line break following p most
+ immediately.
+
+ p defaults to self.pos."""
+ if p is None:
+ p = self.pos
+ b = self.buffer
+ while p < len(b) and b[p] != "\n":
+ p += 1
+ return p
+
+ def max_column(self, y: int) -> int:
+ """Return the last x-offset for line y"""
+ return self.screeninfo[y][0] + sum(self.screeninfo[y][1])
+
+ def max_row(self) -> int:
+ return len(self.screeninfo) - 1
+
+ def get_arg(self, default: int = 1) -> int:
+ """Return any prefix argument that the user has supplied,
+ returning `default' if there is None. Defaults to 1.
+ """
+ if self.arg is None:
+ return default
+ else:
+ return self.arg
+
+ def get_prompt(self, lineno: int, cursor_on_line: bool) -> str:
+ """Return what should be in the left-hand margin for line
+ `lineno'."""
+ if self.arg is not None and cursor_on_line:
+ prompt = "(arg: %s) " % self.arg
+ elif self.paste_mode:
+ prompt = "(paste) "
+ elif "\n" in self.buffer:
+ if lineno == 0:
+ prompt = self.ps2
+ elif lineno == self.buffer.count("\n"):
+ prompt = self.ps4
+ else:
+ prompt = self.ps3
+ else:
+ prompt = self.ps1
+
+ if can_colorize():
+ prompt = f"{ANSIColors.BOLD_MAGENTA}{prompt}{ANSIColors.RESET}"
+ return prompt
+
+ def push_input_trans(self, itrans: input.KeymapTranslator) -> None:
+ self.input_trans_stack.append(self.input_trans)
+ self.input_trans = itrans
+
+ def pop_input_trans(self) -> None:
+ self.input_trans = self.input_trans_stack.pop()
+
+ def setpos_from_xy(self, x: int, y: int) -> None:
+ """Set pos according to coordinates x, y"""
+ pos = 0
+ i = 0
+ while i < y:
+ prompt_len, character_widths = self.screeninfo[i]
+ offset = len(character_widths) - character_widths.count(0)
+ in_wrapped_line = prompt_len + sum(character_widths) >= self.console.width
+ if in_wrapped_line:
+ pos += offset - 1 # -1 cause backslash is not in buffer
+ else:
+ pos += offset + 1 # +1 cause newline is in buffer
+ i += 1
+
+ j = 0
+ cur_x = self.screeninfo[i][0]
+ while cur_x < x:
+ if self.screeninfo[i][1][j] == 0:
+ continue
+ cur_x += self.screeninfo[i][1][j]
+ j += 1
+ pos += 1
+
+ self.pos = pos
+
+ def pos2xy(self) -> tuple[int, int]:
+ """Return the x, y coordinates of position 'pos'."""
+ # this *is* incomprehensible, yes.
+ y = 0
+ pos = self.pos
+ assert 0 <= pos <= len(self.buffer)
+ if pos == len(self.buffer):
+ y = len(self.screeninfo) - 1
+ p, l2 = self.screeninfo[y]
+ return p + sum(l2) + l2.count(0), y
+
+ for p, l2 in self.screeninfo:
+ l = len(l2) - l2.count(0)
+ in_wrapped_line = p + sum(l2) >= self.console.width
+ offset = l - 1 if in_wrapped_line else l # need to remove backslash
+ if offset >= pos:
+ break
+ else:
+ if p + sum(l2) >= self.console.width:
+ pos -= l - 1 # -1 cause backslash is not in buffer
+ else:
+ pos -= l + 1 # +1 cause newline is in buffer
+ y += 1
+ return p + sum(l2[:pos]), y
+
+ def insert(self, text: str | list[str]) -> None:
+ """Insert 'text' at the insertion point."""
+ self.buffer[self.pos : self.pos] = list(text)
+ self.pos += len(text)
+ self.dirty = True
+
+ def update_cursor(self) -> None:
+ """Move the cursor to reflect changes in self.pos"""
+ self.cxy = self.pos2xy()
+ self.console.move_cursor(*self.cxy)
+
+ def after_command(self, cmd: Command) -> None:
+ """This function is called to allow post command cleanup."""
+ if getattr(cmd, "kills_digit_arg", True):
+ if self.arg is not None:
+ self.dirty = True
+ self.arg = None
+
+ def prepare(self) -> None:
+ """Get ready to run. Call restore when finished. You must not
+ write to the console in between the calls to prepare and
+ restore."""
+ try:
+ self.console.prepare()
+ self.arg = None
+ self.finished = False
+ del self.buffer[:]
+ self.pos = 0
+ self.dirty = True
+ self.last_command = None
+ self.calc_screen()
+ except BaseException:
+ self.restore()
+ raise
+
+ def last_command_is(self, cls: type) -> bool:
+ if not self.last_command:
+ return False
+ return issubclass(cls, self.last_command)
+
+ def restore(self) -> None:
+ """Clean up after a run."""
+ self.console.restore()
+
+ @contextmanager
+ def suspend(self) -> SimpleContextManager:
+ """A context manager to delegate to another reader."""
+ prev_state = {f.name: getattr(self, f.name) for f in fields(self)}
+ try:
+ self.restore()
+ yield
+ finally:
+ for arg in ("msg", "ps1", "ps2", "ps3", "ps4", "paste_mode"):
+ setattr(self, arg, prev_state[arg])
+ self.prepare()
+ pass
+
+ def finish(self) -> None:
+ """Called when a command signals that we're finished."""
+ pass
+
+ def error(self, msg: str = "none") -> None:
+ self.msg = "! " + msg + " "
+ self.dirty = True
+ self.console.beep()
+
+ def update_screen(self) -> None:
+ if self.dirty:
+ self.refresh()
+
+ def refresh(self) -> None:
+ """Recalculate and refresh the screen."""
+ # this call sets up self.cxy, so call it first.
+ screen = self.calc_screen()
+ self.console.refresh(screen, self.cxy)
+ self.dirty = False
+
+ def do_cmd(self, cmd: tuple[str, list[str]]) -> None:
+ """`cmd` is a tuple of "event_name" and "event", which in the current
+ implementation is always just the "buffer" which happens to be a list
+ of single-character strings."""
+ assert isinstance(cmd[0], str)
+
+ trace("received command {cmd}", cmd=cmd)
+ command_type = self.commands.get(cmd[0], commands.invalid_command)
+ command = command_type(self, *cmd) # type: ignore[arg-type]
+
+ command.do()
+
+ self.after_command(command)
+
+ if self.dirty:
+ self.refresh()
+ else:
+ self.update_cursor()
+
+ if not isinstance(cmd, commands.digit_arg):
+ self.last_command = command_type
+
+ self.finished = bool(command.finish)
+ if self.finished:
+ self.console.finish()
+ self.finish()
+
+ def handle1(self, block: bool = True) -> bool:
+ """Handle a single event. Wait as long as it takes if block
+ is true (the default), otherwise return False if no event is
+ pending."""
+
+ if self.msg:
+ self.msg = ""
+ self.dirty = True
+
+ while True:
+ event = self.console.get_event(block)
+ if not event: # can only happen if we're not blocking
+ return False
+
+ translate = True
+
+ if event.evt == "key":
+ self.input_trans.push(event)
+ elif event.evt == "scroll":
+ self.refresh()
+ elif event.evt == "resize":
+ self.refresh()
+ else:
+ translate = False
+
+ if translate:
+ cmd = self.input_trans.get()
+ else:
+ cmd = [event.evt, event.data]
+
+ if cmd is None:
+ if block:
+ continue
+ else:
+ return False
+
+ self.do_cmd(cmd)
+ return True
+
+ def push_char(self, char: int | bytes) -> None:
+ self.console.push_char(char)
+ self.handle1(block=False)
+
+ def readline(self, startup_hook: Callback | None = None) -> str:
+ """Read a line. The implementation of this method also shows
+ how to drive Reader if you want more control over the event
+ loop."""
+ self.prepare()
+ try:
+ if startup_hook is not None:
+ startup_hook()
+ self.refresh()
+ while not self.finished:
+ self.handle1()
+ return self.get_unicode()
+
+ finally:
+ self.restore()
+
+ def bind(self, spec: KeySpec, command: CommandName) -> None:
+ self.keymap = self.keymap + ((spec, command),)
+ self.input_trans = input.KeymapTranslator(
+ self.keymap, invalid_cls="invalid-key", character_cls="self-insert"
+ )
+
+ def get_unicode(self) -> str:
+ """Return the current buffer as a unicode string."""
+ return "".join(self.buffer)
diff --git a/Lib/_pyrepl/readline.py b/Lib/_pyrepl/readline.py
new file mode 100644
index 0000000..37ba98d
--- /dev/null
+++ b/Lib/_pyrepl/readline.py
@@ -0,0 +1,501 @@
+# Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com>
+# Alex Gaynor
+# Antonio Cuni
+# Armin Rigo
+# Holger Krekel
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""A compatibility wrapper reimplementing the 'readline' standard module
+on top of pyrepl. Not all functionalities are supported. Contains
+extensions for multiline input.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+
+import os
+import readline
+from site import gethistoryfile # type: ignore[attr-defined]
+import sys
+
+from . import commands, historical_reader
+from .completing_reader import CompletingReader
+from .unix_console import UnixConsole, _error
+
+ENCODING = sys.getdefaultencoding() or "latin1"
+
+
+# types
+Command = commands.Command
+from collections.abc import Callable, Collection
+from .types import Callback, Completer, KeySpec, CommandName
+
+
+__all__ = [
+ "add_history",
+ "clear_history",
+ "get_begidx",
+ "get_completer",
+ "get_completer_delims",
+ "get_current_history_length",
+ "get_endidx",
+ "get_history_item",
+ "get_history_length",
+ "get_line_buffer",
+ "insert_text",
+ "parse_and_bind",
+ "read_history_file",
+ # "read_init_file",
+ # "redisplay",
+ "remove_history_item",
+ "replace_history_item",
+ "set_auto_history",
+ "set_completer",
+ "set_completer_delims",
+ "set_history_length",
+ # "set_pre_input_hook",
+ "set_startup_hook",
+ "write_history_file",
+ # ---- multiline extensions ----
+ "multiline_input",
+]
+
+# ____________________________________________________________
+
+@dataclass
+class ReadlineConfig:
+ readline_completer: Completer | None = readline.get_completer()
+ completer_delims: frozenset[str] = frozenset(" \t\n`~!@#$%^&*()-=+[{]}\\|;:'\",<>/?")
+
+
+@dataclass(kw_only=True)
+class ReadlineAlikeReader(historical_reader.HistoricalReader, CompletingReader):
+ # Class fields
+ assume_immutable_completions = False
+ use_brackets = False
+ sort_in_column = True
+
+ # Instance fields
+ config: ReadlineConfig
+ more_lines: Callable[[str], bool] | None = None
+
+ def __post_init__(self) -> None:
+ super().__post_init__()
+ self.commands["maybe_accept"] = maybe_accept
+ self.commands["maybe-accept"] = maybe_accept
+ self.commands["backspace_dedent"] = backspace_dedent
+ self.commands["backspace-dedent"] = backspace_dedent
+
+ def error(self, msg: str = "none") -> None:
+ pass # don't show error messages by default
+
+ def get_stem(self) -> str:
+ b = self.buffer
+ p = self.pos - 1
+ completer_delims = self.config.completer_delims
+ while p >= 0 and b[p] not in completer_delims:
+ p -= 1
+ return "".join(b[p + 1 : self.pos])
+
+ def get_completions(self, stem: str) -> list[str]:
+ if len(stem) == 0 and self.more_lines is not None:
+ b = self.buffer
+ p = self.pos
+ while p > 0 and b[p - 1] != "\n":
+ p -= 1
+ num_spaces = 4 - ((self.pos - p) % 4)
+ return [" " * num_spaces]
+ result = []
+ function = self.config.readline_completer
+ if function is not None:
+ try:
+ stem = str(stem) # rlcompleter.py seems to not like unicode
+ except UnicodeEncodeError:
+ pass # but feed unicode anyway if we have no choice
+ state = 0
+ while True:
+ try:
+ next = function(stem, state)
+ except Exception:
+ break
+ if not isinstance(next, str):
+ break
+ result.append(next)
+ state += 1
+ # emulate the behavior of the standard readline that sorts
+ # the completions before displaying them.
+ result.sort()
+ return result
+
+ def get_trimmed_history(self, maxlength: int) -> list[str]:
+ if maxlength >= 0:
+ cut = len(self.history) - maxlength
+ if cut < 0:
+ cut = 0
+ else:
+ cut = 0
+ return self.history[cut:]
+
+ # --- simplified support for reading multiline Python statements ---
+
+ def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
+ return super().collect_keymap() + (
+ (r"\n", "maybe-accept"),
+ (r"\<backspace>", "backspace-dedent"),
+ )
+
+ def after_command(self, cmd: Command) -> None:
+ super().after_command(cmd)
+ if self.more_lines is None:
+ # Force single-line input if we are in raw_input() mode.
+ # Although there is no direct way to add a \n in this mode,
+ # multiline buffers can still show up using various
+ # commands, e.g. navigating the history.
+ try:
+ index = self.buffer.index("\n")
+ except ValueError:
+ pass
+ else:
+ self.buffer = self.buffer[:index]
+ if self.pos > len(self.buffer):
+ self.pos = len(self.buffer)
+
+
+def set_auto_history(_should_auto_add_history: bool) -> None:
+ """Enable or disable automatic history"""
+ historical_reader.should_auto_add_history = bool(_should_auto_add_history)
+
+
+def _get_this_line_indent(buffer: list[str], pos: int) -> int:
+ indent = 0
+ while pos > 0 and buffer[pos - 1] in " \t":
+ indent += 1
+ pos -= 1
+ if pos > 0 and buffer[pos - 1] == "\n":
+ return indent
+ return 0
+
+
+def _get_previous_line_indent(buffer: list[str], pos: int) -> tuple[int, int | None]:
+ prevlinestart = pos
+ while prevlinestart > 0 and buffer[prevlinestart - 1] != "\n":
+ prevlinestart -= 1
+ prevlinetext = prevlinestart
+ while prevlinetext < pos and buffer[prevlinetext] in " \t":
+ prevlinetext += 1
+ if prevlinetext == pos:
+ indent = None
+ else:
+ indent = prevlinetext - prevlinestart
+ return prevlinestart, indent
+
+
+class maybe_accept(commands.Command):
+ def do(self) -> None:
+ r: ReadlineAlikeReader
+ r = self.reader # type: ignore[assignment]
+ r.dirty = True # this is needed to hide the completion menu, if visible
+ #
+ # if there are already several lines and the cursor
+ # is not on the last one, always insert a new \n.
+ text = r.get_unicode()
+ if "\n" in r.buffer[r.pos :] or (
+ r.more_lines is not None and r.more_lines(text)
+ ):
+ #
+ # auto-indent the next line like the previous line
+ prevlinestart, indent = _get_previous_line_indent(r.buffer, r.pos)
+ r.insert("\n")
+ if not self.reader.paste_mode and indent:
+ for i in range(prevlinestart, prevlinestart + indent):
+ r.insert(r.buffer[i])
+ elif not self.reader.paste_mode:
+ self.finish = True
+ else:
+ r.insert("\n")
+
+
+class backspace_dedent(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ if r.pos > 0:
+ repeat = 1
+ if b[r.pos - 1] != "\n":
+ indent = _get_this_line_indent(b, r.pos)
+ if indent > 0:
+ ls = r.pos - indent
+ while ls > 0:
+ ls, pi = _get_previous_line_indent(b, ls - 1)
+ if pi is not None and pi < indent:
+ repeat = indent - pi
+ break
+ r.pos -= repeat
+ del b[r.pos : r.pos + repeat]
+ r.dirty = True
+ else:
+ self.reader.error("can't backspace at start")
+
+
+# ____________________________________________________________
+
+
+@dataclass(slots=True)
+class _ReadlineWrapper:
+ f_in: int = -1
+ f_out: int = -1
+ reader: ReadlineAlikeReader | None = None
+ saved_history_length: int = -1
+ startup_hook: Callback | None = None
+ config: ReadlineConfig = field(default_factory=ReadlineConfig)
+
+ def __post_init__(self) -> None:
+ if self.f_in == -1:
+ self.f_in = os.dup(0)
+ if self.f_out == -1:
+ self.f_out = os.dup(1)
+
+ def get_reader(self) -> ReadlineAlikeReader:
+ if self.reader is None:
+ console = UnixConsole(self.f_in, self.f_out, encoding=ENCODING)
+ self.reader = ReadlineAlikeReader(console=console, config=self.config)
+ return self.reader
+
+ def input(self, prompt: object = "") -> str:
+ try:
+ reader = self.get_reader()
+ except _error:
+ assert raw_input is not None
+ return raw_input(prompt)
+ reader.ps1 = str(prompt)
+ return reader.readline(startup_hook=self.startup_hook)
+
+ def multiline_input(self, more_lines, ps1, ps2):
+ """Read an input on possibly multiple lines, asking for more
+ lines as long as 'more_lines(unicodetext)' returns an object whose
+ boolean value is true.
+ """
+ reader = self.get_reader()
+ saved = reader.more_lines
+ try:
+ reader.more_lines = more_lines
+ reader.ps1 = reader.ps2 = ps1
+ reader.ps3 = reader.ps4 = ps2
+ return reader.readline()
+ finally:
+ reader.more_lines = saved
+ reader.paste_mode = False
+
+ def parse_and_bind(self, string: str) -> None:
+ pass # XXX we don't support parsing GNU-readline-style init files
+
+ def set_completer(self, function: Completer | None = None) -> None:
+ self.config.readline_completer = function
+
+ def get_completer(self) -> Completer | None:
+ return self.config.readline_completer
+
+ def set_completer_delims(self, delimiters: Collection[str]) -> None:
+ self.config.completer_delims = frozenset(delimiters)
+
+ def get_completer_delims(self) -> str:
+ return "".join(sorted(self.config.completer_delims))
+
+ def _histline(self, line: str) -> str:
+ line = line.rstrip("\n")
+ return line
+
+ def get_history_length(self) -> int:
+ return self.saved_history_length
+
+ def set_history_length(self, length: int) -> None:
+ self.saved_history_length = length
+
+ def get_current_history_length(self) -> int:
+ return len(self.get_reader().history)
+
+ def read_history_file(self, filename: str = gethistoryfile()) -> None:
+ # multiline extension (really a hack) for the end of lines that
+ # are actually continuations inside a single multiline_input()
+ # history item: we use \r\n instead of just \n. If the history
+ # file is passed to GNU readline, the extra \r are just ignored.
+ history = self.get_reader().history
+
+ with open(os.path.expanduser(filename), 'rb') as f:
+ lines = [line.decode('utf-8', errors='replace') for line in f.read().split(b'\n')]
+ buffer = []
+ for line in lines:
+ # Ignore readline history file header
+ if line.startswith("_HiStOrY_V2_"):
+ continue
+ if line.endswith("\r"):
+ buffer.append(line+'\n')
+ else:
+ line = self._histline(line)
+ if buffer:
+ line = "".join(buffer).replace("\r", "") + line
+ del buffer[:]
+ if line:
+ history.append(line)
+
+ def write_history_file(self, filename: str = gethistoryfile()) -> None:
+ maxlength = self.saved_history_length
+ history = self.get_reader().get_trimmed_history(maxlength)
+ with open(os.path.expanduser(filename), "w", encoding="utf-8") as f:
+ for entry in history:
+ entry = entry.replace("\n", "\r\n") # multiline history support
+ f.write(entry + "\n")
+
+ def clear_history(self) -> None:
+ del self.get_reader().history[:]
+
+ def get_history_item(self, index: int) -> str | None:
+ history = self.get_reader().history
+ if 1 <= index <= len(history):
+ return history[index - 1]
+ else:
+ return None # like readline.c
+
+ def remove_history_item(self, index: int) -> None:
+ history = self.get_reader().history
+ if 0 <= index < len(history):
+ del history[index]
+ else:
+ raise ValueError("No history item at position %d" % index)
+ # like readline.c
+
+ def replace_history_item(self, index: int, line: str) -> None:
+ history = self.get_reader().history
+ if 0 <= index < len(history):
+ history[index] = self._histline(line)
+ else:
+ raise ValueError("No history item at position %d" % index)
+ # like readline.c
+
+ def add_history(self, line: str) -> None:
+ self.get_reader().history.append(self._histline(line))
+
+ def set_startup_hook(self, function: Callback | None = None) -> None:
+ self.startup_hook = function
+
+ def get_line_buffer(self) -> bytes:
+ buf_str = self.get_reader().get_unicode()
+ return buf_str.encode(ENCODING)
+
+ def _get_idxs(self) -> tuple[int, int]:
+ start = cursor = self.get_reader().pos
+ buf = self.get_line_buffer()
+ for i in range(cursor - 1, -1, -1):
+ if str(buf[i]) in self.get_completer_delims():
+ break
+ start = i
+ return start, cursor
+
+ def get_begidx(self) -> int:
+ return self._get_idxs()[0]
+
+ def get_endidx(self) -> int:
+ return self._get_idxs()[1]
+
+ def insert_text(self, text: str) -> None:
+ self.get_reader().insert(text)
+
+
+_wrapper = _ReadlineWrapper()
+
+# ____________________________________________________________
+# Public API
+
+parse_and_bind = _wrapper.parse_and_bind
+set_completer = _wrapper.set_completer
+get_completer = _wrapper.get_completer
+set_completer_delims = _wrapper.set_completer_delims
+get_completer_delims = _wrapper.get_completer_delims
+get_history_length = _wrapper.get_history_length
+set_history_length = _wrapper.set_history_length
+get_current_history_length = _wrapper.get_current_history_length
+read_history_file = _wrapper.read_history_file
+write_history_file = _wrapper.write_history_file
+clear_history = _wrapper.clear_history
+get_history_item = _wrapper.get_history_item
+remove_history_item = _wrapper.remove_history_item
+replace_history_item = _wrapper.replace_history_item
+add_history = _wrapper.add_history
+set_startup_hook = _wrapper.set_startup_hook
+get_line_buffer = _wrapper.get_line_buffer
+get_begidx = _wrapper.get_begidx
+get_endidx = _wrapper.get_endidx
+insert_text = _wrapper.insert_text
+
+# Extension
+multiline_input = _wrapper.multiline_input
+
+# Internal hook
+_get_reader = _wrapper.get_reader
+
+# ____________________________________________________________
+# Stubs
+
+
+def _make_stub(_name: str, _ret: object) -> None:
+ def stub(*args: object, **kwds: object) -> None:
+ import warnings
+
+ warnings.warn("readline.%s() not implemented" % _name, stacklevel=2)
+
+ stub.__name__ = _name
+ globals()[_name] = stub
+
+
+for _name, _ret in [
+ ("read_init_file", None),
+ ("redisplay", None),
+ ("set_pre_input_hook", None),
+]:
+ assert _name not in globals(), _name
+ _make_stub(_name, _ret)
+
+# ____________________________________________________________
+
+
+def _setup() -> None:
+ global raw_input
+ if raw_input is not None:
+ return # don't run _setup twice
+
+ try:
+ f_in = sys.stdin.fileno()
+ f_out = sys.stdout.fileno()
+ except (AttributeError, ValueError):
+ return
+ if not os.isatty(f_in) or not os.isatty(f_out):
+ return
+
+ _wrapper.f_in = f_in
+ _wrapper.f_out = f_out
+
+ # this is not really what readline.c does. Better than nothing I guess
+ import builtins
+
+ raw_input = builtins.input
+ builtins.input = _wrapper.input
+
+
+raw_input: Callable[[object], str] | None = None
diff --git a/Lib/_pyrepl/simple_interact.py b/Lib/_pyrepl/simple_interact.py
new file mode 100644
index 0000000..ce79d0d
--- /dev/null
+++ b/Lib/_pyrepl/simple_interact.py
@@ -0,0 +1,157 @@
+# Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com>
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""This is an alternative to python_reader which tries to emulate
+the CPython prompt as closely as possible, with the exception of
+allowing multiline input and multiline history entries.
+"""
+
+from __future__ import annotations
+
+import _colorize # type: ignore[import-not-found]
+import _sitebuiltins
+import linecache
+import sys
+import code
+from types import ModuleType
+
+from .console import Event
+from .readline import _get_reader, multiline_input
+from .unix_console import _error
+
+
+def check() -> str:
+ """Returns the error message if there is a problem initializing the state."""
+ try:
+ _get_reader()
+ except _error as e:
+ return str(e) or repr(e) or "unknown error"
+ return ""
+
+
+def _strip_final_indent(text: str) -> str:
+ # kill spaces and tabs at the end, but only if they follow '\n'.
+ # meant to remove the auto-indentation only (although it would of
+ # course also remove explicitly-added indentation).
+ short = text.rstrip(" \t")
+ n = len(short)
+ if n > 0 and text[n - 1] == "\n":
+ return short
+ return text
+
+
+REPL_COMMANDS = {
+ "exit": _sitebuiltins.Quitter('exit', ''),
+ "quit": _sitebuiltins.Quitter('quit' ,''),
+ "copyright": _sitebuiltins._Printer('copyright', sys.copyright),
+ "help": "help",
+}
+
+class InteractiveColoredConsole(code.InteractiveConsole):
+ def __init__(
+ self,
+ locals: dict[str, object] | None = None,
+ filename: str = "<console>",
+ *,
+ local_exit: bool = False,
+ ) -> None:
+ super().__init__(locals=locals, filename=filename, local_exit=local_exit) # type: ignore[call-arg]
+ self.can_colorize = _colorize.can_colorize()
+
+ def showtraceback(self):
+ super().showtraceback(colorize=self.can_colorize)
+
+
+def run_multiline_interactive_console(
+ mainmodule: ModuleType | None= None, future_flags: int = 0
+) -> None:
+ import code
+ import __main__
+ from .readline import _setup
+ _setup()
+
+ mainmodule = mainmodule or __main__
+ console = InteractiveColoredConsole(mainmodule.__dict__, filename="<stdin>")
+ if future_flags:
+ console.compile.compiler.flags |= future_flags
+
+ input_n = 0
+
+ def maybe_run_command(statement: str) -> bool:
+ statement = statement.strip()
+ if statement in console.locals or statement not in REPL_COMMANDS:
+ return False
+
+ reader = _get_reader()
+ reader.history.pop() # skip internal commands in history
+ command = REPL_COMMANDS[statement]
+ if callable(command):
+ command()
+ return True
+
+ if isinstance(command, str):
+ # Internal readline commands require a prepared reader like
+ # inside multiline_input.
+ reader.prepare()
+ reader.refresh()
+ reader.do_cmd((command, [statement]))
+ reader.restore()
+ return True
+
+ return False
+
+ def more_lines(unicodetext: str) -> bool:
+ # ooh, look at the hack:
+ src = _strip_final_indent(unicodetext)
+ try:
+ code = console.compile(src, "<stdin>", "single")
+ except (OverflowError, SyntaxError, ValueError):
+ return False
+ else:
+ return code is None
+
+ while 1:
+ try:
+ try:
+ sys.stdout.flush()
+ except Exception:
+ pass
+
+ ps1 = getattr(sys, "ps1", ">>> ")
+ ps2 = getattr(sys, "ps2", "... ")
+ try:
+ statement = multiline_input(more_lines, ps1, ps2)
+ except EOFError:
+ break
+
+ if maybe_run_command(statement):
+ continue
+
+ input_name = f"<python-input-{input_n}>"
+ linecache._register_code(input_name, statement, "<stdin>") # type: ignore[attr-defined]
+ more = console.push(_strip_final_indent(statement), filename=input_name) # type: ignore[call-arg]
+ assert not more
+ input_n += 1
+ except KeyboardInterrupt:
+ console.write("\nKeyboardInterrupt\n")
+ console.resetbuffer()
+ except MemoryError:
+ console.write("\nMemoryError\n")
+ console.resetbuffer()
diff --git a/Lib/_pyrepl/trace.py b/Lib/_pyrepl/trace.py
new file mode 100644
index 0000000..a8eb243
--- /dev/null
+++ b/Lib/_pyrepl/trace.py
@@ -0,0 +1,21 @@
+from __future__ import annotations
+
+import os
+
+# types
+if False:
+ from typing import IO
+
+
+trace_file: IO[str] | None = None
+if trace_filename := os.environ.get("PYREPL_TRACE"):
+ trace_file = open(trace_filename, "a")
+
+
+def trace(line: str, *k: object, **kw: object) -> None:
+ if trace_file is None:
+ return
+ if k or kw:
+ line = line.format(*k, **kw)
+ trace_file.write(line + "\n")
+ trace_file.flush()
diff --git a/Lib/_pyrepl/types.py b/Lib/_pyrepl/types.py
new file mode 100644
index 0000000..f9d48b8
--- /dev/null
+++ b/Lib/_pyrepl/types.py
@@ -0,0 +1,8 @@
+from collections.abc import Callable, Iterator
+
+Callback = Callable[[], object]
+SimpleContextManager = Iterator[None]
+KeySpec = str # like r"\C-c"
+CommandName = str # like "interrupt"
+EventTuple = tuple[CommandName, str]
+Completer = Callable[[str, int], str | None]
diff --git a/Lib/_pyrepl/unix_console.py b/Lib/_pyrepl/unix_console.py
new file mode 100644
index 0000000..c22b1d5
--- /dev/null
+++ b/Lib/_pyrepl/unix_console.py
@@ -0,0 +1,743 @@
+# Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com>
+# Antonio Cuni
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+
+import errno
+import os
+import re
+import select
+import signal
+import struct
+import sys
+import termios
+import time
+from fcntl import ioctl
+
+from . import curses
+from .console import Console, Event
+from .fancy_termios import tcgetattr, tcsetattr
+from .trace import trace
+from .unix_eventqueue import EventQueue
+from .utils import wlen
+
+
+# types
+if False:
+ from typing import IO
+
+
+class InvalidTerminal(RuntimeError):
+ pass
+
+
+_error = (termios.error, curses.error, InvalidTerminal)
+
+SIGWINCH_EVENT = "repaint"
+
+FIONREAD = getattr(termios, "FIONREAD", None)
+TIOCGWINSZ = getattr(termios, "TIOCGWINSZ", None)
+
+# ------------ start of baudrate definitions ------------
+
+# Add (possibly) missing baudrates (check termios man page) to termios
+
+
+def add_baudrate_if_supported(dictionary: dict[int, int], rate: int) -> None:
+ baudrate_name = "B%d" % rate
+ if hasattr(termios, baudrate_name):
+ dictionary[getattr(termios, baudrate_name)] = rate
+
+
+# Check the termios man page (Line speed) to know where these
+# values come from.
+potential_baudrates = [
+ 0,
+ 110,
+ 115200,
+ 1200,
+ 134,
+ 150,
+ 1800,
+ 19200,
+ 200,
+ 230400,
+ 2400,
+ 300,
+ 38400,
+ 460800,
+ 4800,
+ 50,
+ 57600,
+ 600,
+ 75,
+ 9600,
+]
+
+ratedict: dict[int, int] = {}
+for rate in potential_baudrates:
+ add_baudrate_if_supported(ratedict, rate)
+
+# Clean up variables to avoid unintended usage
+del rate, add_baudrate_if_supported
+
+# ------------ end of baudrate definitions ------------
+
+delayprog = re.compile(b"\\$<([0-9]+)((?:/|\\*){0,2})>")
+
+try:
+ poll: type[select.poll] = select.poll
+except AttributeError:
+ # this is exactly the minumum necessary to support what we
+ # do with poll objects
+ class MinimalPoll:
+ def __init__(self):
+ pass
+
+ def register(self, fd, flag):
+ self.fd = fd
+
+ def poll(self): # note: a 'timeout' argument would be *milliseconds*
+ r, w, e = select.select([self.fd], [], [])
+ return r
+
+ poll = MinimalPoll # type: ignore[assignment]
+
+
+class UnixConsole(Console):
+ def __init__(
+ self,
+ f_in: IO[bytes] | int = 0,
+ f_out: IO[bytes] | int = 1,
+ term: str = "",
+ encoding: str = "",
+ ):
+ """
+ Initialize the UnixConsole.
+
+ Parameters:
+ - f_in (int or file-like object): Input file descriptor or object.
+ - f_out (int or file-like object): Output file descriptor or object.
+ - term (str): Terminal name.
+ - encoding (str): Encoding to use for I/O operations.
+ """
+
+ self.encoding = encoding or sys.getdefaultencoding()
+
+ if isinstance(f_in, int):
+ self.input_fd = f_in
+ else:
+ self.input_fd = f_in.fileno()
+
+ if isinstance(f_out, int):
+ self.output_fd = f_out
+ else:
+ self.output_fd = f_out.fileno()
+
+ self.pollob = poll()
+ self.pollob.register(self.input_fd, select.POLLIN)
+ curses.setupterm(term or None, self.output_fd)
+ self.term = term
+
+ def _my_getstr(cap, optional=0):
+ r = curses.tigetstr(cap)
+ if not optional and r is None:
+ raise InvalidTerminal(
+ f"terminal doesn't have the required {cap} capability"
+ )
+ return r
+
+ self._bel = _my_getstr("bel")
+ self._civis = _my_getstr("civis", optional=True)
+ self._clear = _my_getstr("clear")
+ self._cnorm = _my_getstr("cnorm", optional=True)
+ self._cub = _my_getstr("cub", optional=True)
+ self._cub1 = _my_getstr("cub1", optional=True)
+ self._cud = _my_getstr("cud", optional=True)
+ self._cud1 = _my_getstr("cud1", optional=True)
+ self._cuf = _my_getstr("cuf", optional=True)
+ self._cuf1 = _my_getstr("cuf1", optional=True)
+ self._cup = _my_getstr("cup")
+ self._cuu = _my_getstr("cuu", optional=True)
+ self._cuu1 = _my_getstr("cuu1", optional=True)
+ self._dch1 = _my_getstr("dch1", optional=True)
+ self._dch = _my_getstr("dch", optional=True)
+ self._el = _my_getstr("el")
+ self._hpa = _my_getstr("hpa", optional=True)
+ self._ich = _my_getstr("ich", optional=True)
+ self._ich1 = _my_getstr("ich1", optional=True)
+ self._ind = _my_getstr("ind", optional=True)
+ self._pad = _my_getstr("pad", optional=True)
+ self._ri = _my_getstr("ri", optional=True)
+ self._rmkx = _my_getstr("rmkx", optional=True)
+ self._smkx = _my_getstr("smkx", optional=True)
+
+ self.__setup_movement()
+
+ self.event_queue = EventQueue(self.input_fd, self.encoding)
+ self.cursor_visible = 1
+
+ def change_encoding(self, encoding: str) -> None:
+ """
+ Change the encoding used for I/O operations.
+
+ Parameters:
+ - encoding (str): New encoding to use.
+ """
+ self.encoding = encoding
+
+ def refresh(self, screen, c_xy):
+ """
+ Refresh the console screen.
+
+ Parameters:
+ - screen (list): List of strings representing the screen contents.
+ - c_xy (tuple): Cursor position (x, y) on the screen.
+ """
+ cx, cy = c_xy
+ if not self.__gone_tall:
+ while len(self.screen) < min(len(screen), self.height):
+ self.__hide_cursor()
+ self.__move(0, len(self.screen) - 1)
+ self.__write("\n")
+ self.__posxy = 0, len(self.screen)
+ self.screen.append("")
+ else:
+ while len(self.screen) < len(screen):
+ self.screen.append("")
+
+ if len(screen) > self.height:
+ self.__gone_tall = 1
+ self.__move = self.__move_tall
+
+ px, py = self.__posxy
+ old_offset = offset = self.__offset
+ height = self.height
+
+ # we make sure the cursor is on the screen, and that we're
+ # using all of the screen if we can
+ if cy < offset:
+ offset = cy
+ elif cy >= offset + height:
+ offset = cy - height + 1
+ elif offset > 0 and len(screen) < offset + height:
+ offset = max(len(screen) - height, 0)
+ screen.append("")
+
+ oldscr = self.screen[old_offset : old_offset + height]
+ newscr = screen[offset : offset + height]
+
+ # use hardware scrolling if we have it.
+ if old_offset > offset and self._ri:
+ self.__hide_cursor()
+ self.__write_code(self._cup, 0, 0)
+ self.__posxy = 0, old_offset
+ for i in range(old_offset - offset):
+ self.__write_code(self._ri)
+ oldscr.pop(-1)
+ oldscr.insert(0, "")
+ elif old_offset < offset and self._ind:
+ self.__hide_cursor()
+ self.__write_code(self._cup, self.height - 1, 0)
+ self.__posxy = 0, old_offset + self.height - 1
+ for i in range(offset - old_offset):
+ self.__write_code(self._ind)
+ oldscr.pop(0)
+ oldscr.append("")
+
+ self.__offset = offset
+
+ for (
+ y,
+ oldline,
+ newline,
+ ) in zip(range(offset, offset + height), oldscr, newscr):
+ if oldline != newline:
+ self.__write_changed_line(y, oldline, newline, px)
+
+ y = len(newscr)
+ while y < len(oldscr):
+ self.__hide_cursor()
+ self.__move(0, y)
+ self.__posxy = 0, y
+ self.__write_code(self._el)
+ y += 1
+
+ self.__show_cursor()
+
+ self.screen = screen
+ self.move_cursor(cx, cy)
+ self.flushoutput()
+
+ def move_cursor(self, x, y):
+ """
+ Move the cursor to the specified position on the screen.
+
+ Parameters:
+ - x (int): X coordinate.
+ - y (int): Y coordinate.
+ """
+ if y < self.__offset or y >= self.__offset + self.height:
+ self.event_queue.insert(Event("scroll", None))
+ else:
+ self.__move(x, y)
+ self.__posxy = x, y
+ self.flushoutput()
+
+ def prepare(self):
+ """
+ Prepare the console for input/output operations.
+ """
+ self.__svtermstate = tcgetattr(self.input_fd)
+ raw = self.__svtermstate.copy()
+ raw.iflag &= ~(termios.BRKINT | termios.INPCK | termios.ISTRIP | termios.IXON)
+ raw.oflag &= ~(termios.OPOST)
+ raw.cflag &= ~(termios.CSIZE | termios.PARENB)
+ raw.cflag |= termios.CS8
+ raw.lflag &= ~(
+ termios.ICANON | termios.ECHO | termios.IEXTEN | (termios.ISIG * 1)
+ )
+ raw.cc[termios.VMIN] = 1
+ raw.cc[termios.VTIME] = 0
+ tcsetattr(self.input_fd, termios.TCSADRAIN, raw)
+
+ self.screen = []
+ self.height, self.width = self.getheightwidth()
+
+ self.__buffer = []
+
+ self.__posxy = 0, 0
+ self.__gone_tall = 0
+ self.__move = self.__move_short
+ self.__offset = 0
+
+ self.__maybe_write_code(self._smkx)
+
+ try:
+ self.old_sigwinch = signal.signal(signal.SIGWINCH, self.__sigwinch)
+ except ValueError:
+ pass
+
+ def restore(self):
+ """
+ Restore the console to the default state
+ """
+ self.__maybe_write_code(self._rmkx)
+ self.flushoutput()
+ tcsetattr(self.input_fd, termios.TCSADRAIN, self.__svtermstate)
+
+ if hasattr(self, "old_sigwinch"):
+ signal.signal(signal.SIGWINCH, self.old_sigwinch)
+ del self.old_sigwinch
+
+ def push_char(self, char: int | bytes) -> None:
+ """
+ Push a character to the console event queue.
+ """
+ trace("push char {char!r}", char=char)
+ self.event_queue.push(char)
+
+ def get_event(self, block: bool = True) -> Event | None:
+ """
+ Get an event from the console event queue.
+
+ Parameters:
+ - block (bool): Whether to block until an event is available.
+
+ Returns:
+ - Event: Event object from the event queue.
+ """
+ while self.event_queue.empty():
+ while True:
+ try:
+ self.push_char(os.read(self.input_fd, 1))
+ except OSError as err:
+ if err.errno == errno.EINTR:
+ if not self.event_queue.empty():
+ return self.event_queue.get()
+ else:
+ continue
+ else:
+ raise
+ else:
+ break
+ if not block:
+ break
+ return self.event_queue.get()
+
+ def wait(self):
+ """
+ Wait for events on the console.
+ """
+ self.pollob.poll()
+
+ def set_cursor_vis(self, visible):
+ """
+ Set the visibility of the cursor.
+
+ Parameters:
+ - visible (bool): Visibility flag.
+ """
+ if visible:
+ self.__show_cursor()
+ else:
+ self.__hide_cursor()
+
+ if TIOCGWINSZ:
+
+ def getheightwidth(self):
+ """
+ Get the height and width of the console.
+
+ Returns:
+ - tuple: Height and width of the console.
+ """
+ try:
+ return int(os.environ["LINES"]), int(os.environ["COLUMNS"])
+ except KeyError:
+ height, width = struct.unpack(
+ "hhhh", ioctl(self.input_fd, TIOCGWINSZ, b"\000" * 8)
+ )[0:2]
+ if not height:
+ return 25, 80
+ return height, width
+
+ else:
+
+ def getheightwidth(self):
+ """
+ Get the height and width of the console.
+
+ Returns:
+ - tuple: Height and width of the console.
+ """
+ try:
+ return int(os.environ["LINES"]), int(os.environ["COLUMNS"])
+ except KeyError:
+ return 25, 80
+
+ def forgetinput(self):
+ """
+ Discard any pending input on the console.
+ """
+ termios.tcflush(self.input_fd, termios.TCIFLUSH)
+
+ def flushoutput(self):
+ """
+ Flush the output buffer.
+ """
+ for text, iscode in self.__buffer:
+ if iscode:
+ self.__tputs(text)
+ else:
+ os.write(self.output_fd, text.encode(self.encoding, "replace"))
+ del self.__buffer[:]
+
+ def finish(self):
+ """
+ Finish console operations and flush the output buffer.
+ """
+ y = len(self.screen) - 1
+ while y >= 0 and not self.screen[y]:
+ y -= 1
+ self.__move(0, min(y, self.height + self.__offset - 1))
+ self.__write("\n\r")
+ self.flushoutput()
+
+ def beep(self):
+ """
+ Emit a beep sound.
+ """
+ self.__maybe_write_code(self._bel)
+ self.flushoutput()
+
+ if FIONREAD:
+
+ def getpending(self):
+ """
+ Get pending events from the console event queue.
+
+ Returns:
+ - Event: Pending event from the event queue.
+ """
+ e = Event("key", "", b"")
+
+ while not self.event_queue.empty():
+ e2 = self.event_queue.get()
+ e.data += e2.data
+ e.raw += e.raw
+
+ amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0]
+ raw = os.read(self.input_fd, amount)
+ data = str(raw, self.encoding, "replace")
+ e.data += data
+ e.raw += raw
+ return e
+
+ else:
+
+ def getpending(self):
+ """
+ Get pending events from the console event queue.
+
+ Returns:
+ - Event: Pending event from the event queue.
+ """
+ e = Event("key", "", b"")
+
+ while not self.event_queue.empty():
+ e2 = self.event_queue.get()
+ e.data += e2.data
+ e.raw += e.raw
+
+ amount = 10000
+ raw = os.read(self.input_fd, amount)
+ data = str(raw, self.encoding, "replace")
+ e.data += data
+ e.raw += raw
+ return e
+
+ def clear(self):
+ """
+ Clear the console screen.
+ """
+ self.__write_code(self._clear)
+ self.__gone_tall = 1
+ self.__move = self.__move_tall
+ self.__posxy = 0, 0
+ self.screen = []
+
+ def __setup_movement(self):
+ """
+ Set up the movement functions based on the terminal capabilities.
+ """
+ if 0 and self._hpa: # hpa don't work in windows telnet :-(
+ self.__move_x = self.__move_x_hpa
+ elif self._cub and self._cuf:
+ self.__move_x = self.__move_x_cub_cuf
+ elif self._cub1 and self._cuf1:
+ self.__move_x = self.__move_x_cub1_cuf1
+ else:
+ raise RuntimeError("insufficient terminal (horizontal)")
+
+ if self._cuu and self._cud:
+ self.__move_y = self.__move_y_cuu_cud
+ elif self._cuu1 and self._cud1:
+ self.__move_y = self.__move_y_cuu1_cud1
+ else:
+ raise RuntimeError("insufficient terminal (vertical)")
+
+ if self._dch1:
+ self.dch1 = self._dch1
+ elif self._dch:
+ self.dch1 = curses.tparm(self._dch, 1)
+ else:
+ self.dch1 = None
+
+ if self._ich1:
+ self.ich1 = self._ich1
+ elif self._ich:
+ self.ich1 = curses.tparm(self._ich, 1)
+ else:
+ self.ich1 = None
+
+ self.__move = self.__move_short
+
+ def __write_changed_line(self, y, oldline, newline, px_coord):
+ # this is frustrating; there's no reason to test (say)
+ # self.dch1 inside the loop -- but alternative ways of
+ # structuring this function are equally painful (I'm trying to
+ # avoid writing code generators these days...)
+ minlen = min(wlen(oldline), wlen(newline))
+ x_pos = 0
+ x_coord = 0
+
+ px_pos = 0
+ j = 0
+ for c in oldline:
+ if j >= px_coord: break
+ j += wlen(c)
+ px_pos += 1
+
+ # reuse the oldline as much as possible, but stop as soon as we
+ # encounter an ESCAPE, because it might be the start of an escape
+ # sequene
+ while x_coord < minlen and oldline[x_pos] == newline[x_pos] and newline[x_pos] != "\x1b":
+ x_coord += wlen(newline[x_pos])
+ x_pos += 1
+
+ # if we need to insert a single character right after the first detected change
+ if oldline[x_pos:] == newline[x_pos + 1 :] and self.ich1:
+ if (
+ y == self.__posxy[1]
+ and x_coord > self.__posxy[0]
+ and oldline[px_pos:x_pos] == newline[px_pos + 1 : x_pos + 1]
+ ):
+ x_pos = px_pos
+ x_coord = px_coord
+ character_width = wlen(newline[x_pos])
+ self.__move(x_coord, y)
+ self.__write_code(self.ich1)
+ self.__write(newline[x_pos])
+ self.__posxy = x_coord + character_width, y
+
+ # if it's a single character change in the middle of the line
+ elif x_coord < minlen and oldline[x_pos + 1 :] == newline[x_pos + 1 :] and wlen(oldline[x_pos]) == wlen(newline[x_pos]):
+ character_width = wlen(newline[x_pos])
+ self.__move(x_coord, y)
+ self.__write(newline[x_pos])
+ self.__posxy = x_coord + character_width, y
+
+ # if this is the last character to fit in the line and we edit in the middle of the line
+ elif (
+ self.dch1
+ and self.ich1
+ and wlen(newline) == self.width
+ and x_coord < wlen(newline) - 2
+ and newline[x_pos + 1 : -1] == oldline[x_pos:-2]
+ ):
+ self.__hide_cursor()
+ self.__move(self.width - 2, y)
+ self.__posxy = self.width - 2, y
+ self.__write_code(self.dch1)
+
+ character_width = wlen(newline[x_pos])
+ self.__move(x_coord, y)
+ self.__write_code(self.ich1)
+ self.__write(newline[x_pos])
+ self.__posxy = character_width + 1, y
+
+ else:
+ self.__hide_cursor()
+ self.__move(x_coord, y)
+ if wlen(oldline) > wlen(newline):
+ self.__write_code(self._el)
+ self.__write(newline[x_pos:])
+ self.__posxy = wlen(newline), y
+
+ if "\x1b" in newline:
+ # ANSI escape characters are present, so we can't assume
+ # anything about the position of the cursor. Moving the cursor
+ # to the left margin should work to get to a known position.
+ self.move_cursor(0, y)
+
+ def __write(self, text):
+ self.__buffer.append((text, 0))
+
+ def __write_code(self, fmt, *args):
+ self.__buffer.append((curses.tparm(fmt, *args), 1))
+
+ def __maybe_write_code(self, fmt, *args):
+ if fmt:
+ self.__write_code(fmt, *args)
+
+ def __move_y_cuu1_cud1(self, y):
+ dy = y - self.__posxy[1]
+ if dy > 0:
+ self.__write_code(dy * self._cud1)
+ elif dy < 0:
+ self.__write_code((-dy) * self._cuu1)
+
+ def __move_y_cuu_cud(self, y):
+ dy = y - self.__posxy[1]
+ if dy > 0:
+ self.__write_code(self._cud, dy)
+ elif dy < 0:
+ self.__write_code(self._cuu, -dy)
+
+ def __move_x_hpa(self, x):
+ if x != self.__posxy[0]:
+ self.__write_code(self._hpa, x)
+
+ def __move_x_cub1_cuf1(self, x):
+ dx = x - self.__posxy[0]
+ if dx > 0:
+ self.__write_code(self._cuf1 * dx)
+ elif dx < 0:
+ self.__write_code(self._cub1 * (-dx))
+
+ def __move_x_cub_cuf(self, x):
+ dx = x - self.__posxy[0]
+ if dx > 0:
+ self.__write_code(self._cuf, dx)
+ elif dx < 0:
+ self.__write_code(self._cub, -dx)
+
+ def __move_short(self, x, y):
+ self.__move_x(x)
+ self.__move_y(y)
+
+ def __move_tall(self, x, y):
+ assert 0 <= y - self.__offset < self.height, y - self.__offset
+ self.__write_code(self._cup, y - self.__offset, x)
+
+ def __sigwinch(self, signum, frame):
+ self.height, self.width = self.getheightwidth()
+ self.event_queue.insert(Event("resize", None))
+
+ def __hide_cursor(self):
+ if self.cursor_visible:
+ self.__maybe_write_code(self._civis)
+ self.cursor_visible = 0
+
+ def __show_cursor(self):
+ if not self.cursor_visible:
+ self.__maybe_write_code(self._cnorm)
+ self.cursor_visible = 1
+
+ def repaint(self):
+ if not self.__gone_tall:
+ self.__posxy = 0, self.__posxy[1]
+ self.__write("\r")
+ ns = len(self.screen) * ["\000" * self.width]
+ self.screen = ns
+ else:
+ self.__posxy = 0, self.__offset
+ self.__move(0, self.__offset)
+ ns = self.height * ["\000" * self.width]
+ self.screen = ns
+
+ def __tputs(self, fmt, prog=delayprog):
+ """A Python implementation of the curses tputs function; the
+ curses one can't really be wrapped in a sane manner.
+
+ I have the strong suspicion that this is complexity that
+ will never do anyone any good."""
+ # using .get() means that things will blow up
+ # only if the bps is actually needed (which I'm
+ # betting is pretty unlkely)
+ bps = ratedict.get(self.__svtermstate.ospeed)
+ while 1:
+ m = prog.search(fmt)
+ if not m:
+ os.write(self.output_fd, fmt)
+ break
+ x, y = m.span()
+ os.write(self.output_fd, fmt[:x])
+ fmt = fmt[y:]
+ delay = int(m.group(1))
+ if b"*" in m.group(2):
+ delay *= self.height
+ if self._pad and bps is not None:
+ nchars = (bps * delay) / 1000
+ os.write(self.output_fd, self._pad * nchars)
+ else:
+ time.sleep(float(delay) / 1000.0)
diff --git a/Lib/_pyrepl/unix_eventqueue.py b/Lib/_pyrepl/unix_eventqueue.py
new file mode 100644
index 0000000..70cfade
--- /dev/null
+++ b/Lib/_pyrepl/unix_eventqueue.py
@@ -0,0 +1,152 @@
+# Copyright 2000-2008 Michael Hudson-Doyle <micahel@gmail.com>
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from collections import deque
+
+from . import keymap
+from .console import Event
+from . import curses
+from .trace import trace
+from termios import tcgetattr, VERASE
+import os
+
+
+# Mapping of human-readable key names to their terminal-specific codes
+TERMINAL_KEYNAMES = {
+ "delete": "kdch1",
+ "down": "kcud1",
+ "end": "kend",
+ "enter": "kent",
+ "home": "khome",
+ "insert": "kich1",
+ "left": "kcub1",
+ "page down": "knp",
+ "page up": "kpp",
+ "right": "kcuf1",
+ "up": "kcuu1",
+}
+
+
+# Function keys F1-F20 mapping
+TERMINAL_KEYNAMES.update(("f%d" % i, "kf%d" % i) for i in range(1, 21))
+
+# Known CTRL-arrow keycodes
+CTRL_ARROW_KEYCODES= {
+ # for xterm, gnome-terminal, xfce terminal, etc.
+ b'\033[1;5D': 'ctrl left',
+ b'\033[1;5C': 'ctrl right',
+ # for rxvt
+ b'\033Od': 'ctrl left',
+ b'\033Oc': 'ctrl right',
+}
+
+def get_terminal_keycodes() -> dict[bytes, str]:
+ """
+ Generates a dictionary mapping terminal keycodes to human-readable names.
+ """
+ keycodes = {}
+ for key, terminal_code in TERMINAL_KEYNAMES.items():
+ keycode = curses.tigetstr(terminal_code)
+ trace('key {key} tiname {terminal_code} keycode {keycode!r}', **locals())
+ if keycode:
+ keycodes[keycode] = key
+ keycodes.update(CTRL_ARROW_KEYCODES)
+ return keycodes
+
+class EventQueue:
+ def __init__(self, fd: int, encoding: str) -> None:
+ self.keycodes = get_terminal_keycodes()
+ if os.isatty(fd):
+ backspace = tcgetattr(fd)[6][VERASE]
+ self.keycodes[backspace] = "backspace"
+ self.compiled_keymap = keymap.compile_keymap(self.keycodes)
+ self.keymap = self.compiled_keymap
+ trace("keymap {k!r}", k=self.keymap)
+ self.encoding = encoding
+ self.events: deque[Event] = deque()
+ self.buf = bytearray()
+
+ def get(self) -> Event | None:
+ """
+ Retrieves the next event from the queue.
+ """
+ if self.events:
+ return self.events.popleft()
+ else:
+ return None
+
+ def empty(self) -> bool:
+ """
+ Checks if the queue is empty.
+ """
+ return not self.events
+
+ def flush_buf(self) -> bytearray:
+ """
+ Flushes the buffer and returns its contents.
+ """
+ old = self.buf
+ self.buf = bytearray()
+ return old
+
+ def insert(self, event: Event) -> None:
+ """
+ Inserts an event into the queue.
+ """
+ trace('added event {event}', event=event)
+ self.events.append(event)
+
+ def push(self, char: int | bytes) -> None:
+ """
+ Processes a character by updating the buffer and handling special key mappings.
+ """
+ ord_char = char if isinstance(char, int) else ord(char)
+ char = bytes(bytearray((ord_char,)))
+ self.buf.append(ord_char)
+ if char in self.keymap:
+ if self.keymap is self.compiled_keymap:
+ #sanity check, buffer is empty when a special key comes
+ assert len(self.buf) == 1
+ k = self.keymap[char]
+ trace('found map {k!r}', k=k)
+ if isinstance(k, dict):
+ self.keymap = k
+ else:
+ self.insert(Event('key', k, self.flush_buf()))
+ self.keymap = self.compiled_keymap
+
+ elif self.buf and self.buf[0] == 27: # escape
+ # escape sequence not recognized by our keymap: propagate it
+ # outside so that i can be recognized as an M-... key (see also
+ # the docstring in keymap.py
+ trace('unrecognized escape sequence, propagating...')
+ self.keymap = self.compiled_keymap
+ self.insert(Event('key', '\033', bytearray(b'\033')))
+ for _c in self.flush_buf()[1:]:
+ self.push(_c)
+
+ else:
+ try:
+ decoded = bytes(self.buf).decode(self.encoding)
+ except UnicodeError:
+ return
+ else:
+ self.insert(Event('key', decoded, self.flush_buf()))
+ self.keymap = self.compiled_keymap
diff --git a/Lib/_pyrepl/utils.py b/Lib/_pyrepl/utils.py
new file mode 100644
index 0000000..cd1df7c
--- /dev/null
+++ b/Lib/_pyrepl/utils.py
@@ -0,0 +1,18 @@
+import re
+import unicodedata
+
+ANSI_ESCAPE_SEQUENCE = re.compile(r"\x1b\[[ -@]*[A-~]")
+
+
+def str_width(c: str) -> int:
+ w = unicodedata.east_asian_width(c)
+ if w in ('N', 'Na', 'H', 'A'):
+ return 1
+ return 2
+
+
+def wlen(s: str) -> int:
+ length = sum(str_width(i) for i in s)
+
+ # remove lengths of any escape sequences
+ return length - sum(len(i) for i in ANSI_ESCAPE_SEQUENCE.findall(s))
diff --git a/Lib/code.py b/Lib/code.py
index f4aecdd..1ee1ad6 100644
--- a/Lib/code.py
+++ b/Lib/code.py
@@ -130,7 +130,7 @@ class InteractiveInterpreter:
# over self.write
sys.excepthook(type, value, tb)
- def showtraceback(self):
+ def showtraceback(self, **kwargs):
"""Display the exception that just occurred.
We remove the first stack item because it is our own code.
@@ -138,11 +138,12 @@ class InteractiveInterpreter:
The output is written by self.write(), below.
"""
+ colorize = kwargs.pop('colorize', False)
sys.last_type, sys.last_value, last_tb = ei = sys.exc_info()
sys.last_traceback = last_tb
sys.last_exc = ei[1]
try:
- lines = traceback.format_exception(ei[0], ei[1], last_tb.tb_next)
+ lines = traceback.format_exception(ei[0], ei[1], last_tb.tb_next, colorize=colorize)
if sys.excepthook is sys.__excepthook__:
self.write(''.join(lines))
else:
@@ -170,7 +171,7 @@ class InteractiveConsole(InteractiveInterpreter):
"""
- def __init__(self, locals=None, filename="<console>", local_exit=False):
+ def __init__(self, locals=None, filename="<console>", *, local_exit=False):
"""Constructor.
The optional locals argument will be passed to the
@@ -280,7 +281,7 @@ class InteractiveConsole(InteractiveInterpreter):
elif exitmsg != '':
self.write('%s\n' % exitmsg)
- def push(self, line):
+ def push(self, line, filename=None):
"""Push a line to the interpreter.
The line should not have a trailing newline; it may have
@@ -296,7 +297,9 @@ class InteractiveConsole(InteractiveInterpreter):
"""
self.buffer.append(line)
source = "\n".join(self.buffer)
- more = self.runsource(source, self.filename)
+ if filename is None:
+ filename = self.filename
+ more = self.runsource(source, filename)
if not more:
self.resetbuffer()
return more
diff --git a/Lib/pydoc.py b/Lib/pydoc.py
index 02af672..eaaf824 100755
--- a/Lib/pydoc.py
+++ b/Lib/pydoc.py
@@ -76,6 +76,18 @@ from collections import deque
from reprlib import Repr
from traceback import format_exception_only
+from _pyrepl.pager import (get_pager, plain, escape_less, pipe_pager,
+ plain_pager, tempfile_pager, tty_pager)
+
+
+# --------------------------------------------------------- old names
+
+getpager = get_pager
+pipepager = pipe_pager
+plainpager = plain_pager
+tempfilepager = tempfile_pager
+ttypager = tty_pager
+
# --------------------------------------------------------- common routines
@@ -1640,153 +1652,9 @@ class _PlainTextDoc(TextDoc):
def pager(text, title=''):
"""The first time this is called, determine what kind of pager to use."""
global pager
- pager = getpager()
+ pager = get_pager()
pager(text, title)
-def getpager():
- """Decide what method to use for paging through text."""
- if not hasattr(sys.stdin, "isatty"):
- return plainpager
- if not hasattr(sys.stdout, "isatty"):
- return plainpager
- if not sys.stdin.isatty() or not sys.stdout.isatty():
- return plainpager
- if sys.platform == "emscripten":
- return plainpager
- use_pager = os.environ.get('MANPAGER') or os.environ.get('PAGER')
- if use_pager:
- if sys.platform == 'win32': # pipes completely broken in Windows
- return lambda text, title='': tempfilepager(plain(text), use_pager)
- elif os.environ.get('TERM') in ('dumb', 'emacs'):
- return lambda text, title='': pipepager(plain(text), use_pager, title)
- else:
- return lambda text, title='': pipepager(text, use_pager, title)
- if os.environ.get('TERM') in ('dumb', 'emacs'):
- return plainpager
- if sys.platform == 'win32':
- return lambda text, title='': tempfilepager(plain(text), 'more <')
- if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
- return lambda text, title='': pipepager(text, 'less', title)
-
- import tempfile
- (fd, filename) = tempfile.mkstemp()
- os.close(fd)
- try:
- if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
- return lambda text, title='': pipepager(text, 'more', title)
- else:
- return ttypager
- finally:
- os.unlink(filename)
-
-def plain(text):
- """Remove boldface formatting from text."""
- return re.sub('.\b', '', text)
-
-def escape_less(s):
- return re.sub(r'([?:.%\\])', r'\\\1', s)
-
-def pipepager(text, cmd, title=''):
- """Page through text by feeding it to another program."""
- import subprocess
- env = os.environ.copy()
- if title:
- title += ' '
- esc_title = escape_less(title)
- prompt_string = (
- f' {esc_title}' +
- '?ltline %lt?L/%L.'
- ':byte %bB?s/%s.'
- '.'
- '?e (END):?pB %pB\\%..'
- ' (press h for help or q to quit)')
- env['LESS'] = '-RmPm{0}$PM{0}$'.format(prompt_string)
- proc = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
- errors='backslashreplace', env=env)
- try:
- with proc.stdin as pipe:
- try:
- pipe.write(text)
- except KeyboardInterrupt:
- # We've hereby abandoned whatever text hasn't been written,
- # but the pager is still in control of the terminal.
- pass
- except OSError:
- pass # Ignore broken pipes caused by quitting the pager program.
- while True:
- try:
- proc.wait()
- break
- except KeyboardInterrupt:
- # Ignore ctl-c like the pager itself does. Otherwise the pager is
- # left running and the terminal is in raw mode and unusable.
- pass
-
-def tempfilepager(text, cmd, title=''):
- """Page through text by invoking a program on a temporary file."""
- import tempfile
- with tempfile.TemporaryDirectory() as tempdir:
- filename = os.path.join(tempdir, 'pydoc.out')
- with open(filename, 'w', errors='backslashreplace',
- encoding=os.device_encoding(0) if
- sys.platform == 'win32' else None
- ) as file:
- file.write(text)
- os.system(cmd + ' "' + filename + '"')
-
-def _escape_stdout(text):
- # Escape non-encodable characters to avoid encoding errors later
- encoding = getattr(sys.stdout, 'encoding', None) or 'utf-8'
- return text.encode(encoding, 'backslashreplace').decode(encoding)
-
-def ttypager(text, title=''):
- """Page through text on a text terminal."""
- lines = plain(_escape_stdout(text)).split('\n')
- try:
- import tty
- fd = sys.stdin.fileno()
- old = tty.tcgetattr(fd)
- tty.setcbreak(fd)
- getchar = lambda: sys.stdin.read(1)
- except (ImportError, AttributeError, io.UnsupportedOperation):
- tty = None
- getchar = lambda: sys.stdin.readline()[:-1][:1]
-
- try:
- try:
- h = int(os.environ.get('LINES', 0))
- except ValueError:
- h = 0
- if h <= 1:
- h = 25
- r = inc = h - 1
- sys.stdout.write('\n'.join(lines[:inc]) + '\n')
- while lines[r:]:
- sys.stdout.write('-- more --')
- sys.stdout.flush()
- c = getchar()
-
- if c in ('q', 'Q'):
- sys.stdout.write('\r \r')
- break
- elif c in ('\r', '\n'):
- sys.stdout.write('\r \r' + lines[r] + '\n')
- r = r + 1
- continue
- if c in ('b', 'B', '\x1b'):
- r = r - inc - inc
- if r < 0: r = 0
- sys.stdout.write('\n' + '\n'.join(lines[r:r+inc]) + '\n')
- r = r + inc
-
- finally:
- if tty:
- tty.tcsetattr(fd, tty.TCSAFLUSH, old)
-
-def plainpager(text, title=''):
- """Simply print unformatted text. This is the ultimate fallback."""
- sys.stdout.write(plain(_escape_stdout(text)))
-
def describe(thing):
"""Produce a short description of the given thing."""
if inspect.ismodule(thing):
diff --git a/Lib/site.py b/Lib/site.py
index 93af9c4..b63447d 100644
--- a/Lib/site.py
+++ b/Lib/site.py
@@ -485,6 +485,8 @@ def register_readline():
try:
import readline
import rlcompleter
+ import _pyrepl.readline
+ import _pyrepl.unix_console
except ImportError:
return
@@ -513,13 +515,19 @@ def register_readline():
# http://bugs.python.org/issue5845#msg198636
history = gethistoryfile()
try:
- readline.read_history_file(history)
- except OSError:
+ if os.getenv("PYTHON_BASIC_REPL"):
+ readline.read_history_file(history)
+ else:
+ _pyrepl.readline.read_history_file(history)
+ except (OSError,* _pyrepl.unix_console._error):
pass
def write_history():
try:
- readline.write_history_file(history)
+ if os.getenv("PYTHON_BASIC_REPL"):
+ readline.write_history_file(history)
+ else:
+ _pyrepl.readline.write_history_file(history)
except (FileNotFoundError, PermissionError):
# home directory does not exist or is not writable
# https://bugs.python.org/issue19891
diff --git a/Lib/test/test_pyrepl.py b/Lib/test/test_pyrepl.py
new file mode 100644
index 0000000..c53bdef
--- /dev/null
+++ b/Lib/test/test_pyrepl.py
@@ -0,0 +1,929 @@
+import itertools
+import os
+import rlcompleter
+import sys
+import unittest
+from code import InteractiveConsole
+from functools import partial
+from unittest import TestCase
+from unittest.mock import MagicMock, patch
+
+from test.support import requires
+from test.support.import_helper import import_module
+
+# Optionally test pyrepl. This currently requires that the
+# 'curses' resource be given on the regrtest command line using the -u
+# option. Additionally, we need to attempt to import curses and readline.
+requires('curses')
+curses = import_module('curses')
+readline = import_module('readline')
+
+from _pyrepl.console import Console, Event
+from _pyrepl.readline import ReadlineAlikeReader, ReadlineConfig
+from _pyrepl.simple_interact import _strip_final_indent
+from _pyrepl.unix_eventqueue import EventQueue
+
+
+def more_lines(unicodetext, namespace=None):
+ if namespace is None:
+ namespace = {}
+ src = _strip_final_indent(unicodetext)
+ console = InteractiveConsole(namespace, filename="<stdin>")
+ try:
+ code = console.compile(src, "<stdin>", "single")
+ except (OverflowError, SyntaxError, ValueError):
+ return False
+ else:
+ return code is None
+
+
+def multiline_input(reader, namespace=None):
+ saved = reader.more_lines
+ try:
+ reader.more_lines = partial(more_lines, namespace=namespace)
+ reader.ps1 = reader.ps2 = ">>>"
+ reader.ps3 = reader.ps4 = "..."
+ return reader.readline()
+ finally:
+ reader.more_lines = saved
+ reader.paste_mode = False
+
+
+def code_to_events(code):
+ for c in code:
+ yield Event(evt="key", data=c, raw=bytearray(c.encode("utf-8")))
+
+
+def prepare_mock_console(events, **kwargs):
+ console = MagicMock()
+ console.get_event.side_effect = events
+ console.height = 100
+ console.width = 80
+ for key, val in kwargs.items():
+ setattr(console, key, val)
+ return console
+
+
+def prepare_fake_console(**kwargs):
+ console = FakeConsole()
+ for key, val in kwargs.items():
+ setattr(console, key, val)
+ return console
+
+
+def prepare_reader(console, **kwargs):
+ config = ReadlineConfig(readline_completer=None)
+ reader = ReadlineAlikeReader(console=console, config=config)
+ reader.more_lines = partial(more_lines, namespace=None)
+ reader.paste_mode = True # Avoid extra indents
+
+ def get_prompt(lineno, cursor_on_line) -> str:
+ return ""
+
+ reader.get_prompt = get_prompt # Remove prompt for easier calculations of (x, y)
+
+ for key, val in kwargs.items():
+ setattr(reader, key, val)
+
+ return reader
+
+
+def handle_all_events(
+ events, prepare_console=prepare_mock_console, prepare_reader=prepare_reader
+):
+ console = prepare_console(events)
+ reader = prepare_reader(console)
+ try:
+ while True:
+ reader.handle1()
+ except StopIteration:
+ pass
+ return reader, console
+
+
+handle_events_narrow_console = partial(
+ handle_all_events, prepare_console=partial(prepare_mock_console, width=10)
+)
+
+
+class FakeConsole(Console):
+ def __init__(self, events, encoding="utf-8"):
+ self.events = iter(events)
+ self.encoding = encoding
+ self.screen = []
+ self.height = 100
+ self.width = 80
+
+ def get_event(self, block: bool = True) -> Event | None:
+ return next(self.events)
+
+ def getpending(self) -> Event:
+ return self.get_event(block=False)
+
+ def getheightwidth(self) -> tuple[int, int]:
+ return self.height, self.width
+
+ def refresh(self, screen: list[str], xy: tuple[int, int]) -> None:
+ pass
+
+ def prepare(self) -> None:
+ pass
+
+ def restore(self) -> None:
+ pass
+
+ def move_cursor(self, x: int, y: int) -> None:
+ pass
+
+ def set_cursor_vis(self, visible: bool) -> None:
+ pass
+
+ def push_char(self, char: int | bytes) -> None:
+ pass
+
+ def beep(self) -> None:
+ pass
+
+ def clear(self) -> None:
+ pass
+
+ def finish(self) -> None:
+ pass
+
+ def flushoutput(self) -> None:
+ pass
+
+ def forgetinput(self) -> None:
+ pass
+
+ def wait(self) -> None:
+ pass
+
+ def repaint(self) -> None:
+ pass
+
+
+class TestCursorPosition(TestCase):
+ def test_up_arrow_simple(self):
+ # fmt: off
+ code = (
+ 'def f():\n'
+ ' ...\n'
+ )
+ # fmt: on
+ events = itertools.chain(
+ code_to_events(code),
+ [
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ ],
+ )
+
+ reader, console = handle_all_events(events)
+ self.assertEqual(reader.cxy, (0, 1))
+ console.move_cursor.assert_called_once_with(0, 1)
+
+ def test_down_arrow_end_of_input(self):
+ # fmt: off
+ code = (
+ 'def f():\n'
+ ' ...\n'
+ )
+ # fmt: on
+ events = itertools.chain(
+ code_to_events(code),
+ [
+ Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
+ ],
+ )
+
+ reader, console = handle_all_events(events)
+ self.assertEqual(reader.cxy, (0, 2))
+ console.move_cursor.assert_called_once_with(0, 2)
+
+ def test_left_arrow_simple(self):
+ events = itertools.chain(
+ code_to_events("11+11"),
+ [
+ Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
+ ],
+ )
+
+ reader, console = handle_all_events(events)
+ self.assertEqual(reader.cxy, (4, 0))
+ console.move_cursor.assert_called_once_with(4, 0)
+
+ def test_right_arrow_end_of_line(self):
+ events = itertools.chain(
+ code_to_events("11+11"),
+ [
+ Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
+ ],
+ )
+
+ reader, console = handle_all_events(events)
+ self.assertEqual(reader.cxy, (5, 0))
+ console.move_cursor.assert_called_once_with(5, 0)
+
+ def test_cursor_position_simple_character(self):
+ events = itertools.chain(code_to_events("k"))
+
+ reader, _ = handle_all_events(events)
+ self.assertEqual(reader.pos, 1)
+
+ # 1 for simple character
+ self.assertEqual(reader.cxy, (1, 0))
+
+ def test_cursor_position_double_width_character(self):
+ events = itertools.chain(code_to_events("樂"))
+
+ reader, _ = handle_all_events(events)
+ self.assertEqual(reader.pos, 1)
+
+ # 2 for wide character
+ self.assertEqual(reader.cxy, (2, 0))
+
+ def test_cursor_position_double_width_character_move_left(self):
+ events = itertools.chain(
+ code_to_events("樂"),
+ [
+ Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
+ ],
+ )
+
+ reader, _ = handle_all_events(events)
+ self.assertEqual(reader.pos, 0)
+ self.assertEqual(reader.cxy, (0, 0))
+
+ def test_cursor_position_double_width_character_move_left_right(self):
+ events = itertools.chain(
+ code_to_events("樂"),
+ [
+ Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
+ Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
+ ],
+ )
+
+ reader, _ = handle_all_events(events)
+ self.assertEqual(reader.pos, 1)
+
+ # 2 for wide character
+ self.assertEqual(reader.cxy, (2, 0))
+
+ def test_cursor_position_double_width_characters_move_up(self):
+ for_loop = "for _ in _:"
+
+ # fmt: off
+ code = (
+ f"{for_loop}\n"
+ " ' 可口可乐; 可口可樂'"
+ )
+ # fmt: on
+
+ events = itertools.chain(
+ code_to_events(code),
+ [
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ ],
+ )
+
+ reader, _ = handle_all_events(events)
+
+ # cursor at end of first line
+ self.assertEqual(reader.pos, len(for_loop))
+ self.assertEqual(reader.cxy, (len(for_loop), 0))
+
+ def test_cursor_position_double_width_characters_move_up_down(self):
+ for_loop = "for _ in _:"
+
+ # fmt: off
+ code = (
+ f"{for_loop}\n"
+ " ' 可口可乐; 可口可樂'"
+ )
+ # fmt: on
+
+ events = itertools.chain(
+ code_to_events(code),
+ [
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
+ Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
+ ],
+ )
+
+ reader, _ = handle_all_events(events)
+
+ # cursor here (showing 2nd line only):
+ # < ' 可口可乐; 可口可樂'>
+ # ^
+ self.assertEqual(reader.pos, 19)
+ self.assertEqual(reader.cxy, (10, 1))
+
+ def test_cursor_position_multiple_double_width_characters_move_left(self):
+ events = itertools.chain(
+ code_to_events("' 可口可乐; 可口可樂'"),
+ [
+ Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
+ Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
+ Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
+ ],
+ )
+
+ reader, _ = handle_all_events(events)
+ self.assertEqual(reader.pos, 10)
+
+ # 1 for quote, 1 for space, 2 per wide character,
+ # 1 for semicolon, 1 for space, 2 per wide character
+ self.assertEqual(reader.cxy, (16, 0))
+
+ def test_cursor_position_move_up_to_eol(self):
+ first_line = "for _ in _:"
+ second_line = " hello"
+
+ # fmt: off
+ code = (
+ f"{first_line}\n"
+ f"{second_line}\n"
+ " h\n"
+ " hel"
+ )
+ # fmt: on
+
+ events = itertools.chain(
+ code_to_events(code),
+ [
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ ],
+ )
+
+ reader, _ = handle_all_events(events)
+
+ # Cursor should be at end of line 1, even though line 2 is shorter
+ # for _ in _:
+ # hello
+ # h
+ # hel
+ self.assertEqual(
+ reader.pos, len(first_line) + len(second_line) + 1
+ ) # +1 for newline
+ self.assertEqual(reader.cxy, (len(second_line), 1))
+
+ def test_cursor_position_move_down_to_eol(self):
+ last_line = " hel"
+
+ # fmt: off
+ code = (
+ "for _ in _:\n"
+ " hello\n"
+ " h\n"
+ f"{last_line}"
+ )
+ # fmt: on
+
+ events = itertools.chain(
+ code_to_events(code),
+ [
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
+ Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
+ ],
+ )
+
+ reader, _ = handle_all_events(events)
+
+ # Cursor should be at end of line 3, even though line 2 is shorter
+ # for _ in _:
+ # hello
+ # h
+ # hel
+ self.assertEqual(reader.pos, len(code))
+ self.assertEqual(reader.cxy, (len(last_line), 3))
+
+ def test_cursor_position_multiple_mixed_lines_move_up(self):
+ # fmt: off
+ code = (
+ "def foo():\n"
+ " x = '可口可乐; 可口可樂'\n"
+ " y = 'abckdfjskldfjslkdjf'"
+ )
+ # fmt: on
+
+ events = itertools.chain(
+ code_to_events(code),
+ 13 * [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
+ [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))],
+ )
+
+ reader, _ = handle_all_events(events)
+
+ # By moving left, we're before the s:
+ # y = 'abckdfjskldfjslkdjf'
+ # ^
+ # And we should move before the semi-colon despite the different offset
+ # x = '可口可乐; 可口可樂'
+ # ^
+ self.assertEqual(reader.pos, 22)
+ self.assertEqual(reader.cxy, (15, 1))
+
+ def test_cursor_position_after_wrap_and_move_up(self):
+ # fmt: off
+ code = (
+ "def foo():\n"
+ " hello"
+ )
+ # fmt: on
+
+ events = itertools.chain(
+ code_to_events(code),
+ [
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ ],
+ )
+ reader, _ = handle_events_narrow_console(events)
+
+ # The code looks like this:
+ # def foo()\
+ # :
+ # hello
+ # After moving up we should be after the colon in line 2
+ self.assertEqual(reader.pos, 10)
+ self.assertEqual(reader.cxy, (1, 1))
+
+
+class TestPyReplOutput(TestCase):
+ def prepare_reader(self, events):
+ console = FakeConsole(events)
+ config = ReadlineConfig(readline_completer=None)
+ reader = ReadlineAlikeReader(console=console, config=config)
+ return reader
+
+ def test_basic(self):
+ reader = self.prepare_reader(code_to_events("1+1\n"))
+
+ output = multiline_input(reader)
+ self.assertEqual(output, "1+1")
+
+ def test_multiline_edit(self):
+ events = itertools.chain(
+ code_to_events("def f():\n ...\n\n"),
+ [
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
+ Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
+ Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
+ Event(evt="key", data="backspace", raw=bytearray(b"\x7f")),
+ Event(evt="key", data="g", raw=bytearray(b"g")),
+ Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
+ Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
+ Event(evt="key", data="\n", raw=bytearray(b"\n")),
+ ],
+ )
+ reader = self.prepare_reader(events)
+
+ output = multiline_input(reader)
+ self.assertEqual(output, "def f():\n ...\n ")
+ output = multiline_input(reader)
+ self.assertEqual(output, "def g():\n ...\n ")
+
+ def test_history_navigation_with_up_arrow(self):
+ events = itertools.chain(
+ code_to_events("1+1\n2+2\n"),
+ [
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="\n", raw=bytearray(b"\n")),
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="\n", raw=bytearray(b"\n")),
+ ],
+ )
+
+ reader = self.prepare_reader(events)
+
+ output = multiline_input(reader)
+ self.assertEqual(output, "1+1")
+ output = multiline_input(reader)
+ self.assertEqual(output, "2+2")
+ output = multiline_input(reader)
+ self.assertEqual(output, "2+2")
+ output = multiline_input(reader)
+ self.assertEqual(output, "1+1")
+
+ def test_history_navigation_with_down_arrow(self):
+ events = itertools.chain(
+ code_to_events("1+1\n2+2\n"),
+ [
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="\n", raw=bytearray(b"\n")),
+ Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
+ Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
+ ],
+ )
+
+ reader = self.prepare_reader(events)
+
+ output = multiline_input(reader)
+ self.assertEqual(output, "1+1")
+
+ def test_history_search(self):
+ events = itertools.chain(
+ code_to_events("1+1\n2+2\n3+3\n"),
+ [
+ Event(evt="key", data="\x12", raw=bytearray(b"\x12")),
+ Event(evt="key", data="1", raw=bytearray(b"1")),
+ Event(evt="key", data="\n", raw=bytearray(b"\n")),
+ Event(evt="key", data="\n", raw=bytearray(b"\n")),
+ ],
+ )
+
+ reader = self.prepare_reader(events)
+
+ output = multiline_input(reader)
+ self.assertEqual(output, "1+1")
+ output = multiline_input(reader)
+ self.assertEqual(output, "2+2")
+ output = multiline_input(reader)
+ self.assertEqual(output, "3+3")
+ output = multiline_input(reader)
+ self.assertEqual(output, "1+1")
+
+ def test_control_character(self):
+ events = code_to_events("c\x1d\n")
+ reader = self.prepare_reader(events)
+ output = multiline_input(reader)
+ self.assertEqual(output, "c\x1d")
+
+
+class TestPyReplCompleter(TestCase):
+ def prepare_reader(self, events, namespace):
+ console = FakeConsole(events)
+ config = ReadlineConfig()
+ config.readline_completer = rlcompleter.Completer(namespace).complete
+ reader = ReadlineAlikeReader(console=console, config=config)
+ return reader
+
+ def test_simple_completion(self):
+ events = code_to_events("os.geten\t\n")
+
+ namespace = {"os": os}
+ reader = self.prepare_reader(events, namespace)
+
+ output = multiline_input(reader, namespace)
+ self.assertEqual(output, "os.getenv")
+
+ def test_completion_with_many_options(self):
+ events = code_to_events("os.\t\tO_AS\t\n")
+
+ namespace = {"os": os}
+ reader = self.prepare_reader(events, namespace)
+
+ output = multiline_input(reader, namespace)
+ self.assertEqual(output, "os.O_ASYNC")
+
+ def test_empty_namespace_completion(self):
+ events = code_to_events("os.geten\t\n")
+ namespace = {}
+ reader = self.prepare_reader(events, namespace)
+
+ output = multiline_input(reader, namespace)
+ self.assertEqual(output, "os.geten")
+
+ def test_global_namespace_completion(self):
+ events = code_to_events("py\t\n")
+ namespace = {"python": None}
+ reader = self.prepare_reader(events, namespace)
+ output = multiline_input(reader, namespace)
+ self.assertEqual(output, "python")
+
+
+@patch("_pyrepl.curses.tigetstr", lambda x: b"")
+class TestUnivEventQueue(TestCase):
+ def test_get(self):
+ eq = EventQueue(sys.stdout.fileno(), "utf-8")
+ event = Event("key", "a", b"a")
+ eq.insert(event)
+ self.assertEqual(eq.get(), event)
+
+ def test_empty(self):
+ eq = EventQueue(sys.stdout.fileno(), "utf-8")
+ self.assertTrue(eq.empty())
+ eq.insert(Event("key", "a", b"a"))
+ self.assertFalse(eq.empty())
+
+ def test_flush_buf(self):
+ eq = EventQueue(sys.stdout.fileno(), "utf-8")
+ eq.buf.extend(b"test")
+ self.assertEqual(eq.flush_buf(), b"test")
+ self.assertEqual(eq.buf, bytearray())
+
+ def test_insert(self):
+ eq = EventQueue(sys.stdout.fileno(), "utf-8")
+ event = Event("key", "a", b"a")
+ eq.insert(event)
+ self.assertEqual(eq.events[0], event)
+
+ @patch("_pyrepl.unix_eventqueue.keymap")
+ def test_push_with_key_in_keymap(self, mock_keymap):
+ mock_keymap.compile_keymap.return_value = {"a": "b"}
+ eq = EventQueue(sys.stdout.fileno(), "utf-8")
+ eq.keymap = {b"a": "b"}
+ eq.push("a")
+ self.assertTrue(mock_keymap.compile_keymap.called)
+ self.assertEqual(eq.events[0].evt, "key")
+ self.assertEqual(eq.events[0].data, "b")
+
+ @patch("_pyrepl.unix_eventqueue.keymap")
+ def test_push_without_key_in_keymap(self, mock_keymap):
+ mock_keymap.compile_keymap.return_value = {"a": "b"}
+ eq = EventQueue(sys.stdout.fileno(), "utf-8")
+ eq.keymap = {b"c": "d"}
+ eq.push("a")
+ self.assertTrue(mock_keymap.compile_keymap.called)
+ self.assertEqual(eq.events[0].evt, "key")
+ self.assertEqual(eq.events[0].data, "a")
+
+ @patch("_pyrepl.unix_eventqueue.keymap")
+ def test_push_with_keymap_in_keymap(self, mock_keymap):
+ mock_keymap.compile_keymap.return_value = {"a": "b"}
+ eq = EventQueue(sys.stdout.fileno(), "utf-8")
+ eq.keymap = {b"a": {b"b": "c"}}
+ eq.push("a")
+ self.assertTrue(mock_keymap.compile_keymap.called)
+ self.assertTrue(eq.empty())
+ eq.push("b")
+ self.assertEqual(eq.events[0].evt, "key")
+ self.assertEqual(eq.events[0].data, "c")
+ eq.push("d")
+ self.assertEqual(eq.events[1].evt, "key")
+ self.assertEqual(eq.events[1].data, "d")
+
+ @patch("_pyrepl.unix_eventqueue.keymap")
+ def test_push_with_keymap_in_keymap_and_escape(self, mock_keymap):
+ mock_keymap.compile_keymap.return_value = {"a": "b"}
+ eq = EventQueue(sys.stdout.fileno(), "utf-8")
+ eq.keymap = {b"a": {b"b": "c"}}
+ eq.push("a")
+ self.assertTrue(mock_keymap.compile_keymap.called)
+ self.assertTrue(eq.empty())
+ eq.flush_buf()
+ eq.push("\033")
+ self.assertEqual(eq.events[0].evt, "key")
+ self.assertEqual(eq.events[0].data, "\033")
+ eq.push("b")
+ self.assertEqual(eq.events[1].evt, "key")
+ self.assertEqual(eq.events[1].data, "b")
+
+ def test_push_special_key(self):
+ eq = EventQueue(sys.stdout.fileno(), "utf-8")
+ eq.keymap = {}
+ eq.push("\x1b")
+ eq.push("[")
+ eq.push("A")
+ self.assertEqual(eq.events[0].evt, "key")
+ self.assertEqual(eq.events[0].data, "\x1b")
+
+ def test_push_unrecognized_escape_sequence(self):
+ eq = EventQueue(sys.stdout.fileno(), "utf-8")
+ eq.keymap = {}
+ eq.push("\x1b")
+ eq.push("[")
+ eq.push("Z")
+ self.assertEqual(len(eq.events), 3)
+ self.assertEqual(eq.events[0].evt, "key")
+ self.assertEqual(eq.events[0].data, "\x1b")
+ self.assertEqual(eq.events[1].evt, "key")
+ self.assertEqual(eq.events[1].data, "[")
+ self.assertEqual(eq.events[2].evt, "key")
+ self.assertEqual(eq.events[2].data, "Z")
+
+
+class TestPasteEvent(TestCase):
+ def prepare_reader(self, events):
+ console = FakeConsole(events)
+ config = ReadlineConfig(readline_completer=None)
+ reader = ReadlineAlikeReader(console=console, config=config)
+ return reader
+
+ def test_paste(self):
+ # fmt: off
+ code = (
+ 'def a():\n'
+ ' for x in range(10):\n'
+ ' if x%2:\n'
+ ' print(x)\n'
+ ' else:\n'
+ ' pass\n'
+ )
+ # fmt: on
+
+ events = itertools.chain(
+ [
+ Event(evt="key", data="f3", raw=bytearray(b"\x1bOR")),
+ ],
+ code_to_events(code),
+ [
+ Event(evt="key", data="f3", raw=bytearray(b"\x1bOR")),
+ ],
+ code_to_events("\n"),
+ )
+ reader = self.prepare_reader(events)
+ output = multiline_input(reader)
+ self.assertEqual(output, code)
+
+ def test_paste_mid_newlines(self):
+ # fmt: off
+ code = (
+ 'def f():\n'
+ ' x = y\n'
+ ' \n'
+ ' y = z\n'
+ )
+ # fmt: on
+
+ events = itertools.chain(
+ [
+ Event(evt="key", data="f3", raw=bytearray(b"\x1bOR")),
+ ],
+ code_to_events(code),
+ [
+ Event(evt="key", data="f3", raw=bytearray(b"\x1bOR")),
+ ],
+ code_to_events("\n"),
+ )
+ reader = self.prepare_reader(events)
+ output = multiline_input(reader)
+ self.assertEqual(output, code)
+
+ def test_paste_mid_newlines_not_in_paste_mode(self):
+ # fmt: off
+ code = (
+ 'def f():\n'
+ ' x = y\n'
+ ' \n'
+ ' y = z\n\n'
+ )
+
+ expected = (
+ 'def f():\n'
+ ' x = y\n'
+ ' '
+ )
+ # fmt: on
+
+ events = code_to_events(code)
+ reader = self.prepare_reader(events)
+ output = multiline_input(reader)
+ self.assertEqual(output, expected)
+
+ def test_paste_not_in_paste_mode(self):
+ # fmt: off
+ input_code = (
+ 'def a():\n'
+ ' for x in range(10):\n'
+ ' if x%2:\n'
+ ' print(x)\n'
+ ' else:\n'
+ ' pass\n\n'
+ )
+
+ output_code = (
+ 'def a():\n'
+ ' for x in range(10):\n'
+ ' if x%2:\n'
+ ' print(x)\n'
+ ' else:'
+ )
+ # fmt: on
+
+ events = code_to_events(input_code)
+ reader = self.prepare_reader(events)
+ output = multiline_input(reader)
+ self.assertEqual(output, output_code)
+
+
+class TestReader(TestCase):
+ def assert_screen_equals(self, reader, expected):
+ actual = reader.calc_screen()
+ expected = expected.split("\n")
+ self.assertListEqual(actual, expected)
+
+ def test_calc_screen_wrap_simple(self):
+ events = code_to_events(10 * "a")
+ reader, _ = handle_events_narrow_console(events)
+ self.assert_screen_equals(reader, f"{9*"a"}\\\na")
+
+ def test_calc_screen_wrap_wide_characters(self):
+ events = code_to_events(8 * "a" + "樂")
+ reader, _ = handle_events_narrow_console(events)
+ self.assert_screen_equals(reader, f"{8*"a"}\\\n樂")
+
+ def test_calc_screen_wrap_three_lines(self):
+ events = code_to_events(20 * "a")
+ reader, _ = handle_events_narrow_console(events)
+ self.assert_screen_equals(reader, f"{9*"a"}\\\n{9*"a"}\\\naa")
+
+ def test_calc_screen_wrap_three_lines_mixed_character(self):
+ # fmt: off
+ code = (
+ "def f():\n"
+ f" {8*"a"}\n"
+ f" {5*"樂"}"
+ )
+ # fmt: on
+
+ events = code_to_events(code)
+ reader, _ = handle_events_narrow_console(events)
+
+ # fmt: off
+ self.assert_screen_equals(reader, (
+ "def f():\n"
+ f" {7*"a"}\\\n"
+ "a\n"
+ f" {3*"樂"}\\\n"
+ "樂樂"
+ ))
+ # fmt: on
+
+ def test_calc_screen_backspace(self):
+ events = itertools.chain(
+ code_to_events("aaa"),
+ [
+ Event(evt="key", data="backspace", raw=bytearray(b"\x7f")),
+ ],
+ )
+ reader, _ = handle_all_events(events)
+ self.assert_screen_equals(reader, "aa")
+
+ def test_calc_screen_wrap_removes_after_backspace(self):
+ events = itertools.chain(
+ code_to_events(10 * "a"),
+ [
+ Event(evt="key", data="backspace", raw=bytearray(b"\x7f")),
+ ],
+ )
+ reader, _ = handle_events_narrow_console(events)
+ self.assert_screen_equals(reader, 9 * "a")
+
+ def test_calc_screen_backspace_in_second_line_after_wrap(self):
+ events = itertools.chain(
+ code_to_events(11 * "a"),
+ [
+ Event(evt="key", data="backspace", raw=bytearray(b"\x7f")),
+ ],
+ )
+ reader, _ = handle_events_narrow_console(events)
+ self.assert_screen_equals(reader, f"{9*"a"}\\\na")
+
+ def test_setpos_for_xy_simple(self):
+ events = code_to_events("11+11")
+ reader, _ = handle_all_events(events)
+ reader.setpos_from_xy(0, 0)
+ self.assertEqual(reader.pos, 0)
+
+ def test_setpos_from_xy_multiple_lines(self):
+ # fmt: off
+ code = (
+ "def foo():\n"
+ " return 1"
+ )
+ # fmt: on
+
+ events = code_to_events(code)
+ reader, _ = handle_all_events(events)
+ reader.setpos_from_xy(2, 1)
+ self.assertEqual(reader.pos, 13)
+
+ def test_setpos_from_xy_after_wrap(self):
+ # fmt: off
+ code = (
+ "def foo():\n"
+ " hello"
+ )
+ # fmt: on
+
+ events = code_to_events(code)
+ reader, _ = handle_events_narrow_console(events)
+ reader.setpos_from_xy(2, 2)
+ self.assertEqual(reader.pos, 13)
+
+ def test_setpos_fromxy_in_wrapped_line(self):
+ # fmt: off
+ code = (
+ "def foo():\n"
+ " hello"
+ )
+ # fmt: on
+
+ events = code_to_events(code)
+ reader, _ = handle_events_narrow_console(events)
+ reader.setpos_from_xy(0, 1)
+ self.assertEqual(reader.pos, 9)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_traceback.py b/Lib/test/test_traceback.py
index 8969e01..7e48510 100644
--- a/Lib/test/test_traceback.py
+++ b/Lib/test/test_traceback.py
@@ -500,7 +500,7 @@ class TracebackCases(unittest.TestCase):
traceback.format_exception(e.__class__, e)
with self.assertRaisesRegex(ValueError, 'Both or neither'):
traceback.format_exception(e.__class__, tb=e.__traceback__)
- with self.assertRaisesRegex(TypeError, 'positional-only'):
+ with self.assertRaisesRegex(TypeError, 'required positional argument'):
traceback.format_exception(exc=e)
def test_format_exception_only_exc(self):
@@ -539,7 +539,7 @@ class TracebackCases(unittest.TestCase):
self.assertEqual(
str(inspect.signature(traceback.format_exception)),
('(exc, /, value=<implicit>, tb=<implicit>, limit=None, '
- 'chain=True)'))
+ 'chain=True, **kwargs)'))
self.assertEqual(
str(inspect.signature(traceback.format_exception_only)),
diff --git a/Lib/traceback.py b/Lib/traceback.py
index 8403173..1878779 100644
--- a/Lib/traceback.py
+++ b/Lib/traceback.py
@@ -140,7 +140,7 @@ def _print_exception_bltin(exc, /):
def format_exception(exc, /, value=_sentinel, tb=_sentinel, limit=None, \
- chain=True):
+ chain=True, **kwargs):
"""Format a stack trace and the exception information.
The arguments have the same meaning as the corresponding arguments
@@ -149,9 +149,10 @@ def format_exception(exc, /, value=_sentinel, tb=_sentinel, limit=None, \
these lines are concatenated and printed, exactly the same text is
printed as does print_exception().
"""
+ colorize = kwargs.get("colorize", False)
value, tb = _parse_value_tb(exc, value, tb)
te = TracebackException(type(value), value, tb, limit=limit, compact=True)
- return list(te.format(chain=chain))
+ return list(te.format(chain=chain, colorize=colorize))
def format_exception_only(exc, /, value=_sentinel, *, show_group=False):
diff --git a/Makefile.pre.in b/Makefile.pre.in
index 924ed1f..9032277 100644
--- a/Makefile.pre.in
+++ b/Makefile.pre.in
@@ -2339,6 +2339,7 @@ LIBSUBDIRS= asyncio \
xmlrpc \
zipfile zipfile/_path \
zoneinfo \
+ _pyrepl \
__phello__
TESTSUBDIRS= idlelib/idle_test \
test \
diff --git a/Misc/NEWS.d/next/Core and Builtins/2024-04-28-00-41-17.gh-issue-111201.cQsh5U.rst b/Misc/NEWS.d/next/Core and Builtins/2024-04-28-00-41-17.gh-issue-111201.cQsh5U.rst
new file mode 100644
index 0000000..9176da1
--- /dev/null
+++ b/Misc/NEWS.d/next/Core and Builtins/2024-04-28-00-41-17.gh-issue-111201.cQsh5U.rst
@@ -0,0 +1,4 @@
+The :term:`interactive` interpreter is now implemented in Python, which
+allows for a number of new features like colors, multiline input, history
+viewing, and paste mode. Contributed by Pablo Galindo and Łukasz Langa based
+on code from the PyPy project.
diff --git a/Modules/main.c b/Modules/main.c
index df2ce55..8eded26 100644
--- a/Modules/main.c
+++ b/Modules/main.c
@@ -513,8 +513,13 @@ pymain_run_stdin(PyConfig *config)
return pymain_exit_err_print();
}
- PyCompilerFlags cf = _PyCompilerFlags_INIT;
- int run = PyRun_AnyFileExFlags(stdin, "<stdin>", 0, &cf);
+ if (!isatty(fileno(stdin))
+ || _Py_GetEnv(config->use_environment, "PYTHON_BASIC_REPL")) {
+ PyCompilerFlags cf = _PyCompilerFlags_INIT;
+ int run = PyRun_AnyFileExFlags(stdin, "<stdin>", 0, &cf);
+ return (run != 0);
+ }
+ int run = pymain_run_module(L"_pyrepl", 0);
return (run != 0);
}
@@ -537,9 +542,15 @@ pymain_repl(PyConfig *config, int *exitcode)
return;
}
- PyCompilerFlags cf = _PyCompilerFlags_INIT;
- int res = PyRun_AnyFileFlags(stdin, "<stdin>", &cf);
- *exitcode = (res != 0);
+ if (!isatty(fileno(stdin))) {
+ PyCompilerFlags cf = _PyCompilerFlags_INIT;
+ int run = PyRun_AnyFileExFlags(stdin, "<stdin>", 0, &cf);
+ *exitcode = (run != 0);
+ return;
+ }
+ int run = pymain_run_module(L"_pyrepl", 0);
+ *exitcode = (run != 0);
+ return;
}
diff --git a/Python/clinic/sysmodule.c.h b/Python/clinic/sysmodule.c.h
index 0a8704c..56a831e 100644
--- a/Python/clinic/sysmodule.c.h
+++ b/Python/clinic/sysmodule.c.h
@@ -1485,6 +1485,24 @@ exit:
return return_value;
}
+PyDoc_STRVAR(sys__baserepl__doc__,
+"_baserepl($module, /)\n"
+"--\n"
+"\n"
+"Private function for getting the base REPL");
+
+#define SYS__BASEREPL_METHODDEF \
+ {"_baserepl", (PyCFunction)sys__baserepl, METH_NOARGS, sys__baserepl__doc__},
+
+static PyObject *
+sys__baserepl_impl(PyObject *module);
+
+static PyObject *
+sys__baserepl(PyObject *module, PyObject *Py_UNUSED(ignored))
+{
+ return sys__baserepl_impl(module);
+}
+
PyDoc_STRVAR(sys__is_gil_enabled__doc__,
"_is_gil_enabled($module, /)\n"
"--\n"
@@ -1556,4 +1574,4 @@ exit:
#ifndef SYS_GETANDROIDAPILEVEL_METHODDEF
#define SYS_GETANDROIDAPILEVEL_METHODDEF
#endif /* !defined(SYS_GETANDROIDAPILEVEL_METHODDEF) */
-/*[clinic end generated code: output=352ac7a0085e8a1f input=a9049054013a1b77]*/
+/*[clinic end generated code: output=ef7c35945443d300 input=a9049054013a1b77]*/
diff --git a/Python/pythonrun.c b/Python/pythonrun.c
index 31213ae..ce7f194 100644
--- a/Python/pythonrun.c
+++ b/Python/pythonrun.c
@@ -83,8 +83,6 @@ _PyRun_AnyFileObject(FILE *fp, PyObject *filename, int closeit,
return res;
}
-
-/* Parse input from a file and execute it */
int
PyRun_AnyFileExFlags(FILE *fp, const char *filename, int closeit,
PyCompilerFlags *flags)
diff --git a/Python/stdlib_module_names.h b/Python/stdlib_module_names.h
index ba32084..9686d10 100644
--- a/Python/stdlib_module_names.h
+++ b/Python/stdlib_module_names.h
@@ -65,6 +65,7 @@ static const char* _Py_stdlib_module_names[] = {
"_pydecimal",
"_pyio",
"_pylong",
+"_pyrepl",
"_queue",
"_random",
"_scproxy",
diff --git a/Python/sysmodule.c b/Python/sysmodule.c
index 17c4a5f..69bee32 100644
--- a/Python/sysmodule.c
+++ b/Python/sysmodule.c
@@ -2396,6 +2396,21 @@ sys__get_cpu_count_config_impl(PyObject *module)
}
/*[clinic input]
+sys._baserepl
+
+Private function for getting the base REPL
+[clinic start generated code]*/
+
+static PyObject *
+sys__baserepl_impl(PyObject *module)
+/*[clinic end generated code: output=f19a36375ebe0a45 input=ade0ebb9fab56f3c]*/
+{
+ PyCompilerFlags cf = _PyCompilerFlags_INIT;
+ PyRun_AnyFileExFlags(stdin, "<stdin>", 0, &cf);
+ Py_RETURN_NONE;
+}
+
+/*[clinic input]
sys._is_gil_enabled -> bool
Return True if the GIL is currently enabled and False otherwise.
@@ -2579,6 +2594,7 @@ static PyMethodDef sys_methods[] = {
SYS_UNRAISABLEHOOK_METHODDEF
SYS_GET_INT_MAX_STR_DIGITS_METHODDEF
SYS_SET_INT_MAX_STR_DIGITS_METHODDEF
+ SYS__BASEREPL_METHODDEF
#ifdef Py_STATS
SYS__STATS_ON_METHODDEF
SYS__STATS_OFF_METHODDEF