summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Doc/whatsnew/3.13.rst13
-rw-r--r--Lib/_pyrepl/__main__.py6
-rw-r--r--Lib/_pyrepl/console.py30
-rw-r--r--Lib/_pyrepl/reader.py16
-rw-r--r--Lib/_pyrepl/readline.py11
-rw-r--r--Lib/_pyrepl/simple_interact.py6
-rw-r--r--Lib/_pyrepl/unix_console.py28
-rw-r--r--Lib/_pyrepl/windows_console.py587
-rw-r--r--Lib/test/test_pyrepl/__init__.py12
-rw-r--r--Lib/test/test_pyrepl/support.py2
-rw-r--r--Lib/test/test_pyrepl/test_pyrepl.py5
-rw-r--r--Lib/test/test_pyrepl/test_unix_console.py11
-rw-r--r--Lib/test/test_pyrepl/test_unix_eventqueue.py10
-rw-r--r--Lib/test/test_pyrepl/test_windows_console.py330
-rw-r--r--Misc/NEWS.d/next/Windows/2024-05-25-18-43-10.gh-issue-111201.SLPJIx.rst1
15 files changed, 1019 insertions, 49 deletions
diff --git a/Doc/whatsnew/3.13.rst b/Doc/whatsnew/3.13.rst
index 8799cf2..d09d7ce 100644
--- a/Doc/whatsnew/3.13.rst
+++ b/Doc/whatsnew/3.13.rst
@@ -154,10 +154,10 @@ New Features
A Better Interactive Interpreter
--------------------------------
-On Unix-like systems like Linux or macOS, Python now uses a new
-:term:`interactive` shell. When the user starts the :term:`REPL` from an
-interactive terminal, and both :mod:`curses` and :mod:`readline` are
-available, the interactive shell now supports the following new features:
+On Unix-like systems like Linux or macOS as well as Windows, Python now
+uses a new :term:`interactive` shell. When the user starts the
+:term:`REPL` from an interactive terminal the interactive shell now
+supports the following new features:
* Colorized prompts.
* Multiline editing with history preservation.
@@ -174,10 +174,13 @@ available, the interactive shell now supports the following new features:
If the new interactive shell is not desired, it can be disabled via
the :envvar:`PYTHON_BASIC_REPL` environment variable.
+The new shell requires :mod:`curses` on Unix-like systems.
+
For more on interactive mode, see :ref:`tut-interac`.
(Contributed by Pablo Galindo Salgado, Ɓukasz Langa, and
-Lysandros Nikolaou in :gh:`111201` based on code from the PyPy project.)
+Lysandros Nikolaou in :gh:`111201` based on code from the PyPy project.
+Windows support contributed by Dino Viehland and Anthony Shaw.)
.. _whatsnew313-improved-error-messages:
diff --git a/Lib/_pyrepl/__main__.py b/Lib/_pyrepl/__main__.py
index c598019..dae4ba6 100644
--- a/Lib/_pyrepl/__main__.py
+++ b/Lib/_pyrepl/__main__.py
@@ -1,7 +1,11 @@
import os
import sys
-CAN_USE_PYREPL = sys.platform != "win32"
+CAN_USE_PYREPL: bool
+if sys.platform != "win32":
+ CAN_USE_PYREPL = True
+else:
+ CAN_USE_PYREPL = sys.getwindowsversion().build >= 10586 # Windows 10 TH2
def interactive_console(mainmodule=None, quiet=False, pythonstartup=False):
diff --git a/Lib/_pyrepl/console.py b/Lib/_pyrepl/console.py
index d7e86e7..fcabf78 100644
--- a/Lib/_pyrepl/console.py
+++ b/Lib/_pyrepl/console.py
@@ -19,10 +19,18 @@
from __future__ import annotations
+import sys
+
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
+TYPE_CHECKING = False
+
+if TYPE_CHECKING:
+ from typing import IO
+
+
@dataclass
class Event:
evt: str
@@ -36,6 +44,25 @@ class Console(ABC):
height: int = 25
width: int = 80
+ def __init__(
+ self,
+ f_in: IO[bytes] | int = 0,
+ f_out: IO[bytes] | int = 1,
+ term: str = "",
+ encoding: str = "",
+ ):
+ self.encoding = encoding or sys.getdefaultencoding()
+
+ if isinstance(f_in, int):
+ self.input_fd = f_in
+ else:
+ self.input_fd = f_in.fileno()
+
+ if isinstance(f_out, int):
+ self.output_fd = f_out
+ else:
+ self.output_fd = f_out.fileno()
+
@abstractmethod
def refresh(self, screen: list[str], xy: tuple[int, int]) -> None: ...
@@ -108,5 +135,4 @@ class Console(ABC):
...
@abstractmethod
- def repaint(self) -> None:
- ...
+ def repaint(self) -> None: ...
diff --git a/Lib/_pyrepl/reader.py b/Lib/_pyrepl/reader.py
index 1c816d5..8d9a22c 100644
--- a/Lib/_pyrepl/reader.py
+++ b/Lib/_pyrepl/reader.py
@@ -442,14 +442,13 @@ class Reader:
"""
if self.arg is None:
return default
- else:
- return self.arg
+ return self.arg
def get_prompt(self, lineno: int, cursor_on_line: bool) -> str:
"""Return what should be in the left-hand margin for line
`lineno'."""
if self.arg is not None and cursor_on_line:
- prompt = "(arg: %s) " % self.arg
+ prompt = f"(arg: {self.arg}) "
elif self.paste_mode:
prompt = "(paste) "
elif "\n" in self.buffer:
@@ -515,12 +514,12 @@ class Reader:
offset = l - 1 if in_wrapped_line else l # need to remove backslash
if offset >= pos:
break
+
+ if p + sum(l2) >= self.console.width:
+ pos -= l - 1 # -1 cause backslash is not in buffer
else:
- if p + sum(l2) >= self.console.width:
- pos -= l - 1 # -1 cause backslash is not in buffer
- else:
- pos -= l + 1 # +1 cause newline is in buffer
- y += 1
+ pos -= l + 1 # +1 cause newline is in buffer
+ y += 1
return p + sum(l2[:pos]), y
def insert(self, text: str | list[str]) -> None:
@@ -582,7 +581,6 @@ class Reader:
for arg in ("msg", "ps1", "ps2", "ps3", "ps4", "paste_mode"):
setattr(self, arg, prev_state[arg])
self.prepare()
- pass
def finish(self) -> None:
"""Called when a command signals that we're finished."""
diff --git a/Lib/_pyrepl/readline.py b/Lib/_pyrepl/readline.py
index 01da926..7d811bf 100644
--- a/Lib/_pyrepl/readline.py
+++ b/Lib/_pyrepl/readline.py
@@ -38,7 +38,14 @@ from rlcompleter import Completer as RLCompleter
from . import commands, historical_reader
from .completing_reader import CompletingReader
-from .unix_console import UnixConsole, _error
+from .console import Console as ConsoleType
+
+Console: type[ConsoleType]
+_error: tuple[type[Exception], ...] | type[Exception]
+try:
+ from .unix_console import UnixConsole as Console, _error
+except ImportError:
+ from .windows_console import WindowsConsole as Console, _error
ENCODING = sys.getdefaultencoding() or "latin1"
@@ -339,7 +346,7 @@ class _ReadlineWrapper:
def get_reader(self) -> ReadlineAlikeReader:
if self.reader is None:
- console = UnixConsole(self.f_in, self.f_out, encoding=ENCODING)
+ console = Console(self.f_in, self.f_out, encoding=ENCODING)
self.reader = ReadlineAlikeReader(console=console, config=self.config)
return self.reader
diff --git a/Lib/_pyrepl/simple_interact.py b/Lib/_pyrepl/simple_interact.py
index 11e831c..c624f6e 100644
--- a/Lib/_pyrepl/simple_interact.py
+++ b/Lib/_pyrepl/simple_interact.py
@@ -34,8 +34,12 @@ import ast
from types import ModuleType
from .readline import _get_reader, multiline_input
-from .unix_console import _error
+_error: tuple[type[Exception], ...] | type[Exception]
+try:
+ from .unix_console import _error
+except ModuleNotFoundError:
+ from .windows_console import _error
def check() -> str:
"""Returns the error message if there is a problem initializing the state."""
diff --git a/Lib/_pyrepl/unix_console.py b/Lib/_pyrepl/unix_console.py
index ec7d063..4bdb022 100644
--- a/Lib/_pyrepl/unix_console.py
+++ b/Lib/_pyrepl/unix_console.py
@@ -143,18 +143,7 @@ class UnixConsole(Console):
- term (str): Terminal name.
- encoding (str): Encoding to use for I/O operations.
"""
-
- self.encoding = encoding or sys.getdefaultencoding()
-
- if isinstance(f_in, int):
- self.input_fd = f_in
- else:
- self.input_fd = f_in.fileno()
-
- if isinstance(f_out, int):
- self.output_fd = f_out
- else:
- self.output_fd = f_out.fileno()
+ super().__init__(f_in, f_out, term, encoding)
self.pollob = poll()
self.pollob.register(self.input_fd, select.POLLIN)
@@ -592,14 +581,19 @@ class UnixConsole(Console):
px_pos = 0
j = 0
for c in oldline:
- if j >= px_coord: break
+ if j >= px_coord:
+ break
j += wlen(c)
px_pos += 1
# reuse the oldline as much as possible, but stop as soon as we
# encounter an ESCAPE, because it might be the start of an escape
# sequene
- while x_coord < minlen and oldline[x_pos] == newline[x_pos] and newline[x_pos] != "\x1b":
+ while (
+ x_coord < minlen
+ and oldline[x_pos] == newline[x_pos]
+ and newline[x_pos] != "\x1b"
+ ):
x_coord += wlen(newline[x_pos])
x_pos += 1
@@ -619,7 +613,11 @@ class UnixConsole(Console):
self.__posxy = x_coord + character_width, y
# if it's a single character change in the middle of the line
- elif x_coord < minlen and oldline[x_pos + 1 :] == newline[x_pos + 1 :] and wlen(oldline[x_pos]) == wlen(newline[x_pos]):
+ elif (
+ x_coord < minlen
+ and oldline[x_pos + 1 :] == newline[x_pos + 1 :]
+ and wlen(oldline[x_pos]) == wlen(newline[x_pos])
+ ):
character_width = wlen(newline[x_pos])
self.__move(x_coord, y)
self.__write(newline[x_pos])
diff --git a/Lib/_pyrepl/windows_console.py b/Lib/_pyrepl/windows_console.py
new file mode 100644
index 0000000..2277865
--- /dev/null
+++ b/Lib/_pyrepl/windows_console.py
@@ -0,0 +1,587 @@
+# Copyright 2000-2004 Michael Hudson-Doyle <micahel@gmail.com>
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+
+import io
+from multiprocessing import Value
+import os
+import sys
+
+from abc import ABC, abstractmethod
+from collections import deque
+from dataclasses import dataclass, field
+import ctypes
+from ctypes.wintypes import (
+ _COORD,
+ WORD,
+ SMALL_RECT,
+ BOOL,
+ HANDLE,
+ CHAR,
+ DWORD,
+ WCHAR,
+ SHORT,
+)
+from ctypes import Structure, POINTER, Union
+from .console import Event, Console
+from .trace import trace
+from .utils import wlen
+
+try:
+ from ctypes import GetLastError, WinDLL, windll, WinError # type: ignore[attr-defined]
+except:
+ # Keep MyPy happy off Windows
+ from ctypes import CDLL as WinDLL, cdll as windll
+
+ def GetLastError() -> int:
+ return 42
+
+ class WinError(OSError): # type: ignore[no-redef]
+ def __init__(self, err: int | None, descr: str | None = None) -> None:
+ self.err = err
+ self.descr = descr
+
+
+TYPE_CHECKING = False
+
+if TYPE_CHECKING:
+ from typing import IO
+
+VK_MAP: dict[int, str] = {
+ 0x23: "end", # VK_END
+ 0x24: "home", # VK_HOME
+ 0x25: "left", # VK_LEFT
+ 0x26: "up", # VK_UP
+ 0x27: "right", # VK_RIGHT
+ 0x28: "down", # VK_DOWN
+ 0x2E: "delete", # VK_DELETE
+ 0x70: "f1", # VK_F1
+ 0x71: "f2", # VK_F2
+ 0x72: "f3", # VK_F3
+ 0x73: "f4", # VK_F4
+ 0x74: "f5", # VK_F5
+ 0x75: "f6", # VK_F6
+ 0x76: "f7", # VK_F7
+ 0x77: "f8", # VK_F8
+ 0x78: "f9", # VK_F9
+ 0x79: "f10", # VK_F10
+ 0x7A: "f11", # VK_F11
+ 0x7B: "f12", # VK_F12
+ 0x7C: "f13", # VK_F13
+ 0x7D: "f14", # VK_F14
+ 0x7E: "f15", # VK_F15
+ 0x7F: "f16", # VK_F16
+ 0x79: "f17", # VK_F17
+ 0x80: "f18", # VK_F18
+ 0x81: "f19", # VK_F19
+ 0x82: "f20", # VK_F20
+}
+
+# Console escape codes: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences
+ERASE_IN_LINE = "\x1b[K"
+MOVE_LEFT = "\x1b[{}D"
+MOVE_RIGHT = "\x1b[{}C"
+MOVE_UP = "\x1b[{}A"
+MOVE_DOWN = "\x1b[{}B"
+CLEAR = "\x1b[H\x1b[J"
+
+
+class _error(Exception):
+ pass
+
+
+class WindowsConsole(Console):
+ def __init__(
+ self,
+ f_in: IO[bytes] | int = 0,
+ f_out: IO[bytes] | int = 1,
+ term: str = "",
+ encoding: str = "",
+ ):
+ super().__init__(f_in, f_out, term, encoding)
+
+ SetConsoleMode(
+ OutHandle,
+ ENABLE_WRAP_AT_EOL_OUTPUT
+ | ENABLE_PROCESSED_OUTPUT
+ | ENABLE_VIRTUAL_TERMINAL_PROCESSING,
+ )
+ self.screen: list[str] = []
+ self.width = 80
+ self.height = 25
+ self.__offset = 0
+ self.event_queue: deque[Event] = deque()
+ try:
+ self.out = io._WindowsConsoleIO(self.output_fd, "w") # type: ignore[attr-defined]
+ except ValueError:
+ # Console I/O is redirected, fallback...
+ self.out = None
+
+ def refresh(self, screen: list[str], c_xy: tuple[int, int]) -> None:
+ """
+ Refresh the console screen.
+
+ Parameters:
+ - screen (list): List of strings representing the screen contents.
+ - c_xy (tuple): Cursor position (x, y) on the screen.
+ """
+ cx, cy = c_xy
+
+ while len(self.screen) < min(len(screen), self.height):
+ self._hide_cursor()
+ self._move_relative(0, len(self.screen) - 1)
+ self.__write("\n")
+ self.__posxy = 0, len(self.screen)
+ self.screen.append("")
+
+ px, py = self.__posxy
+ old_offset = offset = self.__offset
+ height = self.height
+
+ # we make sure the cursor is on the screen, and that we're
+ # using all of the screen if we can
+ if cy < offset:
+ offset = cy
+ elif cy >= offset + height:
+ offset = cy - height + 1
+ scroll_lines = offset - old_offset
+
+ # Scrolling the buffer as the current input is greater than the visible
+ # portion of the window. We need to scroll the visible portion and the
+ # entire history
+ self._scroll(scroll_lines, self._getscrollbacksize())
+ self.__posxy = self.__posxy[0], self.__posxy[1] + scroll_lines
+ self.__offset += scroll_lines
+
+ for i in range(scroll_lines):
+ self.screen.append("")
+ elif offset > 0 and len(screen) < offset + height:
+ offset = max(len(screen) - height, 0)
+ screen.append("")
+
+ oldscr = self.screen[old_offset : old_offset + height]
+ newscr = screen[offset : offset + height]
+
+ self.__offset = offset
+
+ self._hide_cursor()
+ for (
+ y,
+ oldline,
+ newline,
+ ) in zip(range(offset, offset + height), oldscr, newscr):
+ if oldline != newline:
+ self.__write_changed_line(y, oldline, newline, px)
+
+ y = len(newscr)
+ while y < len(oldscr):
+ self._move_relative(0, y)
+ self.__posxy = 0, y
+ self._erase_to_end()
+ y += 1
+
+ self._show_cursor()
+
+ self.screen = screen
+ self.move_cursor(cx, cy)
+
+ def __write_changed_line(
+ self, y: int, oldline: str, newline: str, px_coord: int
+ ) -> None:
+ # this is frustrating; there's no reason to test (say)
+ # self.dch1 inside the loop -- but alternative ways of
+ # structuring this function are equally painful (I'm trying to
+ # avoid writing code generators these days...)
+ minlen = min(wlen(oldline), wlen(newline))
+ x_pos = 0
+ x_coord = 0
+
+ px_pos = 0
+ j = 0
+ for c in oldline:
+ if j >= px_coord:
+ break
+ j += wlen(c)
+ px_pos += 1
+
+ # reuse the oldline as much as possible, but stop as soon as we
+ # encounter an ESCAPE, because it might be the start of an escape
+ # sequene
+ while (
+ x_coord < minlen
+ and oldline[x_pos] == newline[x_pos]
+ and newline[x_pos] != "\x1b"
+ ):
+ x_coord += wlen(newline[x_pos])
+ x_pos += 1
+
+ self._hide_cursor()
+ self._move_relative(x_coord, y)
+ if wlen(oldline) > wlen(newline):
+ self._erase_to_end()
+
+ self.__write(newline[x_pos:])
+ if wlen(newline) == self.width:
+ # If we wrapped we want to start at the next line
+ self._move_relative(0, y + 1)
+ self.__posxy = 0, y + 1
+ else:
+ self.__posxy = wlen(newline), y
+
+ if "\x1b" in newline or y != self.__posxy[1]:
+ # ANSI escape characters are present, so we can't assume
+ # anything about the position of the cursor. Moving the cursor
+ # to the left margin should work to get to a known position.
+ self.move_cursor(0, y)
+
+ def _scroll(
+ self, top: int, bottom: int, left: int | None = None, right: int | None = None
+ ) -> None:
+ scroll_rect = SMALL_RECT()
+ scroll_rect.Top = SHORT(top)
+ scroll_rect.Bottom = SHORT(bottom)
+ scroll_rect.Left = SHORT(0 if left is None else left)
+ scroll_rect.Right = SHORT(
+ self.getheightwidth()[1] - 1 if right is None else right
+ )
+ destination_origin = _COORD()
+ fill_info = CHAR_INFO()
+ fill_info.UnicodeChar = " "
+
+ if not ScrollConsoleScreenBuffer(
+ OutHandle, scroll_rect, None, destination_origin, fill_info
+ ):
+ raise WinError(GetLastError())
+
+ def _hide_cursor(self):
+ self.__write("\x1b[?25l")
+
+ def _show_cursor(self):
+ self.__write("\x1b[?25h")
+
+ def _enable_blinking(self):
+ self.__write("\x1b[?12h")
+
+ def _disable_blinking(self):
+ self.__write("\x1b[?12l")
+
+ def __write(self, text: str) -> None:
+ if self.out is not None:
+ self.out.write(text.encode(self.encoding, "replace"))
+ self.out.flush()
+ else:
+ os.write(self.output_fd, text.encode(self.encoding, "replace"))
+
+ @property
+ def screen_xy(self) -> tuple[int, int]:
+ info = CONSOLE_SCREEN_BUFFER_INFO()
+ if not GetConsoleScreenBufferInfo(OutHandle, info):
+ raise WinError(GetLastError())
+ return info.dwCursorPosition.X, info.dwCursorPosition.Y
+
+ def _erase_to_end(self) -> None:
+ self.__write(ERASE_IN_LINE)
+
+ def prepare(self) -> None:
+ trace("prepare")
+ self.screen = []
+ self.height, self.width = self.getheightwidth()
+
+ self.__posxy = 0, 0
+ self.__gone_tall = 0
+ self.__offset = 0
+
+ def restore(self) -> None:
+ pass
+
+ def _move_relative(self, x: int, y: int) -> None:
+ """Moves relative to the current __posxy"""
+ dx = x - self.__posxy[0]
+ dy = y - self.__posxy[1]
+ if dx < 0:
+ self.__write(MOVE_LEFT.format(-dx))
+ elif dx > 0:
+ self.__write(MOVE_RIGHT.format(dx))
+
+ if dy < 0:
+ self.__write(MOVE_UP.format(-dy))
+ elif dy > 0:
+ self.__write(MOVE_DOWN.format(dy))
+
+ def move_cursor(self, x: int, y: int) -> None:
+ if x < 0 or y < 0:
+ raise ValueError(f"Bad cursor position {x}, {y}")
+
+ if y < self.__offset or y >= self.__offset + self.height:
+ self.event_queue.insert(0, Event("scroll", ""))
+ else:
+ self._move_relative(x, y)
+ self.__posxy = x, y
+
+ def set_cursor_vis(self, visible: bool) -> None:
+ if visible:
+ self._show_cursor()
+ else:
+ self._hide_cursor()
+
+ def getheightwidth(self) -> tuple[int, int]:
+ """Return (height, width) where height and width are the height
+ and width of the terminal window in characters."""
+ info = CONSOLE_SCREEN_BUFFER_INFO()
+ if not GetConsoleScreenBufferInfo(OutHandle, info):
+ raise WinError(GetLastError())
+ return (
+ info.srWindow.Bottom - info.srWindow.Top + 1,
+ info.srWindow.Right - info.srWindow.Left + 1,
+ )
+
+ def _getscrollbacksize(self) -> int:
+ info = CONSOLE_SCREEN_BUFFER_INFO()
+ if not GetConsoleScreenBufferInfo(OutHandle, info):
+ raise WinError(GetLastError())
+
+ return info.srWindow.Bottom # type: ignore[no-any-return]
+
+ def _read_input(self) -> INPUT_RECORD | None:
+ rec = INPUT_RECORD()
+ read = DWORD()
+ if not ReadConsoleInput(InHandle, rec, 1, read):
+ raise WinError(GetLastError())
+
+ if read.value == 0:
+ return None
+
+ return rec
+
+ def get_event(self, block: bool = True) -> Event | None:
+ """Return an Event instance. Returns None if |block| is false
+ and there is no event pending, otherwise waits for the
+ completion of an event."""
+ if self.event_queue:
+ return self.event_queue.pop()
+
+ while True:
+ rec = self._read_input()
+ if rec is None:
+ if block:
+ continue
+ return None
+
+ if rec.EventType == WINDOW_BUFFER_SIZE_EVENT:
+ return Event("resize", "")
+
+ if rec.EventType != KEY_EVENT or not rec.Event.KeyEvent.bKeyDown:
+ # Only process keys and keydown events
+ if block:
+ continue
+ return None
+
+ key = rec.Event.KeyEvent.uChar.UnicodeChar
+
+ if rec.Event.KeyEvent.uChar.UnicodeChar == "\r":
+ # Make enter make unix-like
+ return Event(evt="key", data="\n", raw=b"\n")
+ elif rec.Event.KeyEvent.wVirtualKeyCode == 8:
+ # Turn backspace directly into the command
+ return Event(
+ evt="key",
+ data="backspace",
+ raw=rec.Event.KeyEvent.uChar.UnicodeChar,
+ )
+ elif rec.Event.KeyEvent.uChar.UnicodeChar == "\x00":
+ # Handle special keys like arrow keys and translate them into the appropriate command
+ code = VK_MAP.get(rec.Event.KeyEvent.wVirtualKeyCode)
+ if code:
+ return Event(
+ evt="key", data=code, raw=rec.Event.KeyEvent.uChar.UnicodeChar
+ )
+ if block:
+ continue
+
+ return None
+
+ return Event(evt="key", data=key, raw=rec.Event.KeyEvent.uChar.UnicodeChar)
+
+ def push_char(self, char: int | bytes) -> None:
+ """
+ Push a character to the console event queue.
+ """
+ raise NotImplementedError("push_char not supported on Windows")
+
+ def beep(self) -> None:
+ self.__write("\x07")
+
+ def clear(self) -> None:
+ """Wipe the screen"""
+ self.__write(CLEAR)
+ self.__posxy = 0, 0
+ self.screen = [""]
+
+ def finish(self) -> None:
+ """Move the cursor to the end of the display and otherwise get
+ ready for end. XXX could be merged with restore? Hmm."""
+ y = len(self.screen) - 1
+ while y >= 0 and not self.screen[y]:
+ y -= 1
+ self._move_relative(0, min(y, self.height + self.__offset - 1))
+ self.__write("\r\n")
+
+ def flushoutput(self) -> None:
+ """Flush all output to the screen (assuming there's some
+ buffering going on somewhere).
+
+ All output on Windows is unbuffered so this is a nop"""
+ pass
+
+ def forgetinput(self) -> None:
+ """Forget all pending, but not yet processed input."""
+ while self._read_input() is not None:
+ pass
+
+ def getpending(self) -> Event:
+ """Return the characters that have been typed but not yet
+ processed."""
+ return Event("key", "", b"")
+
+ def wait(self) -> None:
+ """Wait for an event."""
+ raise NotImplementedError("No wait support")
+
+ def repaint(self) -> None:
+ raise NotImplementedError("No repaint support")
+
+
+# Windows interop
+class CONSOLE_SCREEN_BUFFER_INFO(Structure):
+ _fields_ = [
+ ("dwSize", _COORD),
+ ("dwCursorPosition", _COORD),
+ ("wAttributes", WORD),
+ ("srWindow", SMALL_RECT),
+ ("dwMaximumWindowSize", _COORD),
+ ]
+
+
+class CONSOLE_CURSOR_INFO(Structure):
+ _fields_ = [
+ ("dwSize", DWORD),
+ ("bVisible", BOOL),
+ ]
+
+
+class CHAR_INFO(Structure):
+ _fields_ = [
+ ("UnicodeChar", WCHAR),
+ ("Attributes", WORD),
+ ]
+
+
+class Char(Union):
+ _fields_ = [
+ ("UnicodeChar", WCHAR),
+ ("Char", CHAR),
+ ]
+
+
+class KeyEvent(ctypes.Structure):
+ _fields_ = [
+ ("bKeyDown", BOOL),
+ ("wRepeatCount", WORD),
+ ("wVirtualKeyCode", WORD),
+ ("wVirtualScanCode", WORD),
+ ("uChar", Char),
+ ("dwControlKeyState", DWORD),
+ ]
+
+
+class WindowsBufferSizeEvent(ctypes.Structure):
+ _fields_ = [("dwSize", _COORD)]
+
+
+class ConsoleEvent(ctypes.Union):
+ _fields_ = [
+ ("KeyEvent", KeyEvent),
+ ("WindowsBufferSizeEvent", WindowsBufferSizeEvent),
+ ]
+
+
+class INPUT_RECORD(Structure):
+ _fields_ = [("EventType", WORD), ("Event", ConsoleEvent)]
+
+
+KEY_EVENT = 0x01
+FOCUS_EVENT = 0x10
+MENU_EVENT = 0x08
+MOUSE_EVENT = 0x02
+WINDOW_BUFFER_SIZE_EVENT = 0x04
+
+ENABLE_PROCESSED_OUTPUT = 0x01
+ENABLE_WRAP_AT_EOL_OUTPUT = 0x02
+ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x04
+
+STD_INPUT_HANDLE = -10
+STD_OUTPUT_HANDLE = -11
+
+if sys.platform == "win32":
+ _KERNEL32 = WinDLL("kernel32", use_last_error=True)
+
+ GetStdHandle = windll.kernel32.GetStdHandle
+ GetStdHandle.argtypes = [DWORD]
+ GetStdHandle.restype = HANDLE
+
+ GetConsoleScreenBufferInfo = _KERNEL32.GetConsoleScreenBufferInfo
+ GetConsoleScreenBufferInfo.argtypes = [
+ HANDLE,
+ ctypes.POINTER(CONSOLE_SCREEN_BUFFER_INFO),
+ ]
+ GetConsoleScreenBufferInfo.restype = BOOL
+
+ ScrollConsoleScreenBuffer = _KERNEL32.ScrollConsoleScreenBufferW
+ ScrollConsoleScreenBuffer.argtypes = [
+ HANDLE,
+ POINTER(SMALL_RECT),
+ POINTER(SMALL_RECT),
+ _COORD,
+ POINTER(CHAR_INFO),
+ ]
+ ScrollConsoleScreenBuffer.restype = BOOL
+
+ SetConsoleMode = _KERNEL32.SetConsoleMode
+ SetConsoleMode.argtypes = [HANDLE, DWORD]
+ SetConsoleMode.restype = BOOL
+
+ ReadConsoleInput = _KERNEL32.ReadConsoleInputW
+ ReadConsoleInput.argtypes = [HANDLE, POINTER(INPUT_RECORD), DWORD, POINTER(DWORD)]
+ ReadConsoleInput.restype = BOOL
+
+ OutHandle = GetStdHandle(STD_OUTPUT_HANDLE)
+ InHandle = GetStdHandle(STD_INPUT_HANDLE)
+else:
+
+ def _win_only(*args, **kwargs):
+ raise NotImplementedError("Windows only")
+
+ GetStdHandle = _win_only
+ GetConsoleScreenBufferInfo = _win_only
+ ScrollConsoleScreenBuffer = _win_only
+ SetConsoleMode = _win_only
+ ReadConsoleInput = _win_only
+ OutHandle = 0
+ InHandle = 0
diff --git a/Lib/test/test_pyrepl/__init__.py b/Lib/test/test_pyrepl/__init__.py
index fa38b86..8359d98 100644
--- a/Lib/test/test_pyrepl/__init__.py
+++ b/Lib/test/test_pyrepl/__init__.py
@@ -1,12 +1,14 @@
import os
+import sys
from test.support import requires, load_package_tests
from test.support.import_helper import import_module
-# Optionally test pyrepl. This currently requires that the
-# 'curses' resource be given on the regrtest command line using the -u
-# option. Additionally, we need to attempt to import curses and readline.
-requires("curses")
-curses = import_module("curses")
+if sys.platform != "win32":
+ # On non-Windows platforms, testing pyrepl currently requires that the
+ # 'curses' resource be given on the regrtest command line using the -u
+ # option. Additionally, we need to attempt to import curses and readline.
+ requires("curses")
+ curses = import_module("curses")
def load_tests(*args):
diff --git a/Lib/test/test_pyrepl/support.py b/Lib/test/test_pyrepl/support.py
index 7553904..d2f5429 100644
--- a/Lib/test/test_pyrepl/support.py
+++ b/Lib/test/test_pyrepl/support.py
@@ -55,7 +55,7 @@ def prepare_reader(console: Console, **kwargs):
return reader
-def prepare_console(events: Iterable[Event], **kwargs):
+def prepare_console(events: Iterable[Event], **kwargs) -> MagicMock | Console:
console = MagicMock()
console.get_event.side_effect = events
console.height = 100
diff --git a/Lib/test/test_pyrepl/test_pyrepl.py b/Lib/test/test_pyrepl/test_pyrepl.py
index 910e71d..45114e7 100644
--- a/Lib/test/test_pyrepl/test_pyrepl.py
+++ b/Lib/test/test_pyrepl/test_pyrepl.py
@@ -587,14 +587,15 @@ class TestPyReplCompleter(TestCase):
reader = ReadlineAlikeReader(console=console, config=config)
return reader
+ @patch("rlcompleter._readline_available", False)
def test_simple_completion(self):
- events = code_to_events("os.geten\t\n")
+ events = code_to_events("os.getpid\t\n")
namespace = {"os": os}
reader = self.prepare_reader(events, namespace)
output = multiline_input(reader, namespace)
- self.assertEqual(output, "os.getenv")
+ self.assertEqual(output, "os.getpid()")
def test_completion_with_many_options(self):
# Test with something that initially displays many options
diff --git a/Lib/test/test_pyrepl/test_unix_console.py b/Lib/test/test_pyrepl/test_unix_console.py
index e1faa00..d0b98f1 100644
--- a/Lib/test/test_pyrepl/test_unix_console.py
+++ b/Lib/test/test_pyrepl/test_unix_console.py
@@ -1,12 +1,16 @@
import itertools
+import sys
+import unittest
from functools import partial
from unittest import TestCase
from unittest.mock import MagicMock, call, patch, ANY
from .support import handle_all_events, code_to_events
-from _pyrepl.console import Event
-from _pyrepl.unix_console import UnixConsole
-
+try:
+ from _pyrepl.console import Event
+ from _pyrepl.unix_console import UnixConsole
+except ImportError:
+ pass
def unix_console(events, **kwargs):
console = UnixConsole()
@@ -67,6 +71,7 @@ TERM_CAPABILITIES = {
}
+@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows")
@patch("_pyrepl.curses.tigetstr", lambda s: TERM_CAPABILITIES.get(s))
@patch(
"_pyrepl.curses.tparm",
diff --git a/Lib/test/test_pyrepl/test_unix_eventqueue.py b/Lib/test/test_pyrepl/test_unix_eventqueue.py
index c06536b..301f799 100644
--- a/Lib/test/test_pyrepl/test_unix_eventqueue.py
+++ b/Lib/test/test_pyrepl/test_unix_eventqueue.py
@@ -1,11 +1,15 @@
import tempfile
import unittest
+import sys
from unittest.mock import patch
-from _pyrepl.console import Event
-from _pyrepl.unix_eventqueue import EventQueue
-
+try:
+ from _pyrepl.console import Event
+ from _pyrepl.unix_eventqueue import EventQueue
+except ImportError:
+ pass
+@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows")
@patch("_pyrepl.curses.tigetstr", lambda x: b"")
class TestUnixEventQueue(unittest.TestCase):
def setUp(self):
diff --git a/Lib/test/test_pyrepl/test_windows_console.py b/Lib/test/test_pyrepl/test_windows_console.py
new file mode 100644
index 0000000..0b90142
--- /dev/null
+++ b/Lib/test/test_pyrepl/test_windows_console.py
@@ -0,0 +1,330 @@
+import itertools
+import sys
+import unittest
+from functools import partial
+from typing import Iterable
+from unittest import TestCase
+from unittest.mock import MagicMock, call
+
+from .support import handle_all_events, code_to_events
+
+try:
+ from _pyrepl.console import Event, Console
+ from _pyrepl.windows_console import (
+ WindowsConsole,
+ MOVE_LEFT,
+ MOVE_RIGHT,
+ MOVE_UP,
+ MOVE_DOWN,
+ ERASE_IN_LINE,
+ )
+except ImportError:
+ pass
+
+
+@unittest.skipIf(sys.platform != "win32", "Test class specifically for Windows")
+class WindowsConsoleTests(TestCase):
+ def console(self, events, **kwargs) -> Console:
+ console = WindowsConsole()
+ console.get_event = MagicMock(side_effect=events)
+ console._scroll = MagicMock()
+ console._hide_cursor = MagicMock()
+ console._show_cursor = MagicMock()
+ console._getscrollbacksize = MagicMock(42)
+ console.out = MagicMock()
+
+ height = kwargs.get("height", 25)
+ width = kwargs.get("width", 80)
+ console.getheightwidth = MagicMock(side_effect=lambda: (height, width))
+
+ console.prepare()
+ for key, val in kwargs.items():
+ setattr(console, key, val)
+ return console
+
+ def handle_events(self, events: Iterable[Event], **kwargs):
+ return handle_all_events(events, partial(self.console, **kwargs))
+
+ def handle_events_narrow(self, events):
+ return self.handle_events(events, width=5)
+
+ def handle_events_short(self, events):
+ return self.handle_events(events, height=1)
+
+ def handle_events_height_3(self, events):
+ return self.handle_events(events, height=3)
+
+ def test_simple_addition(self):
+ code = "12+34"
+ events = code_to_events(code)
+ _, con = self.handle_events(events)
+ con.out.write.assert_any_call(b"1")
+ con.out.write.assert_any_call(b"2")
+ con.out.write.assert_any_call(b"+")
+ con.out.write.assert_any_call(b"3")
+ con.out.write.assert_any_call(b"4")
+ con.restore()
+
+ def test_wrap(self):
+ code = "12+34"
+ events = code_to_events(code)
+ _, con = self.handle_events_narrow(events)
+ con.out.write.assert_any_call(b"1")
+ con.out.write.assert_any_call(b"2")
+ con.out.write.assert_any_call(b"+")
+ con.out.write.assert_any_call(b"3")
+ con.out.write.assert_any_call(b"\\")
+ con.out.write.assert_any_call(b"\n")
+ con.out.write.assert_any_call(b"4")
+ con.restore()
+
+ def test_resize_wider(self):
+ code = "1234567890"
+ events = code_to_events(code)
+ reader, console = self.handle_events_narrow(events)
+
+ console.height = 20
+ console.width = 80
+ console.getheightwidth = MagicMock(lambda _: (20, 80))
+
+ def same_reader(_):
+ return reader
+
+ def same_console(events):
+ console.get_event = MagicMock(side_effect=events)
+ return console
+
+ _, con = handle_all_events(
+ [Event(evt="resize", data=None)],
+ prepare_reader=same_reader,
+ prepare_console=same_console,
+ )
+
+ con.out.write.assert_any_call(self.move_right(2))
+ con.out.write.assert_any_call(self.move_up(2))
+ con.out.write.assert_any_call(b"567890")
+
+ con.restore()
+
+ def test_resize_narrower(self):
+ code = "1234567890"
+ events = code_to_events(code)
+ reader, console = self.handle_events(events)
+
+ console.height = 20
+ console.width = 4
+ console.getheightwidth = MagicMock(lambda _: (20, 4))
+
+ def same_reader(_):
+ return reader
+
+ def same_console(events):
+ console.get_event = MagicMock(side_effect=events)
+ return console
+
+ _, con = handle_all_events(
+ [Event(evt="resize", data=None)],
+ prepare_reader=same_reader,
+ prepare_console=same_console,
+ )
+
+ con.out.write.assert_any_call(b"456\\")
+ con.out.write.assert_any_call(b"789\\")
+
+ con.restore()
+
+ def test_cursor_left(self):
+ code = "1"
+ events = itertools.chain(
+ code_to_events(code),
+ [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
+ )
+ _, con = self.handle_events(events)
+ con.out.write.assert_any_call(self.move_left())
+ con.restore()
+
+ def test_cursor_left_right(self):
+ code = "1"
+ events = itertools.chain(
+ code_to_events(code),
+ [
+ Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
+ Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
+ ],
+ )
+ _, con = self.handle_events(events)
+ con.out.write.assert_any_call(self.move_left())
+ con.out.write.assert_any_call(self.move_right())
+ con.restore()
+
+ def test_cursor_up(self):
+ code = "1\n2+3"
+ events = itertools.chain(
+ code_to_events(code),
+ [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))],
+ )
+ _, con = self.handle_events(events)
+ con.out.write.assert_any_call(self.move_up())
+ con.restore()
+
+ def test_cursor_up_down(self):
+ code = "1\n2+3"
+ events = itertools.chain(
+ code_to_events(code),
+ [
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
+ ],
+ )
+ _, con = self.handle_events(events)
+ con.out.write.assert_any_call(self.move_up())
+ con.out.write.assert_any_call(self.move_down())
+ con.restore()
+
+ def test_cursor_back_write(self):
+ events = itertools.chain(
+ code_to_events("1"),
+ [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
+ code_to_events("2"),
+ )
+ _, con = self.handle_events(events)
+ con.out.write.assert_any_call(b"1")
+ con.out.write.assert_any_call(self.move_left())
+ con.out.write.assert_any_call(b"21")
+ con.restore()
+
+ def test_multiline_function_move_up_short_terminal(self):
+ # fmt: off
+ code = (
+ "def f():\n"
+ " foo"
+ )
+ # fmt: on
+
+ events = itertools.chain(
+ code_to_events(code),
+ [
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="scroll", data=None),
+ ],
+ )
+ _, con = self.handle_events_short(events)
+ con.out.write.assert_any_call(self.move_left(5))
+ con.out.write.assert_any_call(self.move_up())
+ con.restore()
+
+ def test_multiline_function_move_up_down_short_terminal(self):
+ # fmt: off
+ code = (
+ "def f():\n"
+ " foo"
+ )
+ # fmt: on
+
+ events = itertools.chain(
+ code_to_events(code),
+ [
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="scroll", data=None),
+ Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
+ Event(evt="scroll", data=None),
+ ],
+ )
+ _, con = self.handle_events_short(events)
+ con.out.write.assert_any_call(self.move_left(8))
+ con.out.write.assert_any_call(self.erase_in_line())
+ con.restore()
+
+ def test_resize_bigger_on_multiline_function(self):
+ # fmt: off
+ code = (
+ "def f():\n"
+ " foo"
+ )
+ # fmt: on
+
+ events = itertools.chain(code_to_events(code))
+ reader, console = self.handle_events_short(events)
+
+ console.height = 2
+ console.getheightwidth = MagicMock(lambda _: (2, 80))
+
+ def same_reader(_):
+ return reader
+
+ def same_console(events):
+ console.get_event = MagicMock(side_effect=events)
+ return console
+
+ _, con = handle_all_events(
+ [Event(evt="resize", data=None)],
+ prepare_reader=same_reader,
+ prepare_console=same_console,
+ )
+ con.out.write.assert_has_calls(
+ [
+ call(self.move_left(5)),
+ call(self.move_up()),
+ call(b"def f():"),
+ call(self.move_left(3)),
+ call(self.move_down()),
+ ]
+ )
+ console.restore()
+ con.restore()
+
+ def test_resize_smaller_on_multiline_function(self):
+ # fmt: off
+ code = (
+ "def f():\n"
+ " foo"
+ )
+ # fmt: on
+
+ events = itertools.chain(code_to_events(code))
+ reader, console = self.handle_events_height_3(events)
+
+ console.height = 1
+ console.getheightwidth = MagicMock(lambda _: (1, 80))
+
+ def same_reader(_):
+ return reader
+
+ def same_console(events):
+ console.get_event = MagicMock(side_effect=events)
+ return console
+
+ _, con = handle_all_events(
+ [Event(evt="resize", data=None)],
+ prepare_reader=same_reader,
+ prepare_console=same_console,
+ )
+ con.out.write.assert_has_calls(
+ [
+ call(self.move_left(5)),
+ call(self.move_up()),
+ call(self.erase_in_line()),
+ call(b" foo"),
+ ]
+ )
+ console.restore()
+ con.restore()
+
+ def move_up(self, lines=1):
+ return MOVE_UP.format(lines).encode("utf8")
+
+ def move_down(self, lines=1):
+ return MOVE_DOWN.format(lines).encode("utf8")
+
+ def move_left(self, cols=1):
+ return MOVE_LEFT.format(cols).encode("utf8")
+
+ def move_right(self, cols=1):
+ return MOVE_RIGHT.format(cols).encode("utf8")
+
+ def erase_in_line(self):
+ return ERASE_IN_LINE.encode("utf8")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Misc/NEWS.d/next/Windows/2024-05-25-18-43-10.gh-issue-111201.SLPJIx.rst b/Misc/NEWS.d/next/Windows/2024-05-25-18-43-10.gh-issue-111201.SLPJIx.rst
new file mode 100644
index 0000000..f3918ed
--- /dev/null
+++ b/Misc/NEWS.d/next/Windows/2024-05-25-18-43-10.gh-issue-111201.SLPJIx.rst
@@ -0,0 +1 @@
+Add support for new pyrepl on Windows