summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/_pyrepl/commands.py3
-rw-r--r--Lib/_pyrepl/completing_reader.py8
-rw-r--r--Lib/_pyrepl/reader.py154
-rw-r--r--Lib/_pyrepl/readline.py4
-rw-r--r--Lib/_pyrepl/unix_console.py20
-rw-r--r--Lib/_pyrepl/utils.py2
6 files changed, 134 insertions, 57 deletions
diff --git a/Lib/_pyrepl/commands.py b/Lib/_pyrepl/commands.py
index 6bffed1..c3fce91 100644
--- a/Lib/_pyrepl/commands.py
+++ b/Lib/_pyrepl/commands.py
@@ -368,8 +368,6 @@ class self_insert(EditCommand):
r = self.reader
text = self.event * r.get_arg()
r.insert(text)
- if len(text) == 1 and r.pos == len(r.buffer):
- r.calc_screen = r.append_to_screen
class insert_nl(EditCommand):
@@ -483,4 +481,3 @@ class disable_bracketed_paste(Command):
self.reader.paste_mode = False
self.reader.in_bracketed_paste = False
self.reader.dirty = True
- self.reader.calc_screen = self.reader.calc_complete_screen
diff --git a/Lib/_pyrepl/completing_reader.py b/Lib/_pyrepl/completing_reader.py
index 8df35cc..05770aa 100644
--- a/Lib/_pyrepl/completing_reader.py
+++ b/Lib/_pyrepl/completing_reader.py
@@ -209,10 +209,6 @@ class self_insert(commands.self_insert):
r = self.reader # type: ignore[assignment]
commands.self_insert.do(self)
-
- if r.cmpltn_menu_visible or r.cmpltn_message_visible:
- r.calc_screen = r.calc_complete_screen
-
if r.cmpltn_menu_visible:
stem = r.get_stem()
if len(stem) < 1:
@@ -261,8 +257,8 @@ class CompletingReader(Reader):
if not isinstance(cmd, (complete, self_insert)):
self.cmpltn_reset()
- def calc_complete_screen(self) -> list[str]:
- screen = super().calc_complete_screen()
+ def calc_screen(self) -> list[str]:
+ screen = super().calc_screen()
if self.cmpltn_menu_visible:
ly = self.lxy[1]
screen[ly:ly] = self.cmpltn_menu
diff --git a/Lib/_pyrepl/reader.py b/Lib/_pyrepl/reader.py
index 255967e..20eff91 100644
--- a/Lib/_pyrepl/reader.py
+++ b/Lib/_pyrepl/reader.py
@@ -35,15 +35,13 @@ from .trace import trace
# types
Command = commands.Command
if False:
- from typing import Callable
from .types import Callback, SimpleContextManager, KeySpec, CommandName
- CalcScreen = Callable[[], list[str]]
def disp_str(buffer: str) -> tuple[str, list[int]]:
"""disp_str(buffer:string) -> (string, [int])
- Return the string that should be the printed represenation of
+ Return the string that should be the printed representation of
|buffer| and a list detailing where the characters of |buffer|
get used up. E.g.:
@@ -54,11 +52,17 @@ def disp_str(buffer: str) -> tuple[str, list[int]]:
b: list[int] = []
s: list[str] = []
for c in buffer:
- if ord(c) > 128 and unicodedata.category(c).startswith("C"):
+ if ord(c) < 128:
+ s.append(c)
+ b.append(1)
+ elif unicodedata.category(c).startswith("C"):
c = r"\u%04x" % ord(c)
- s.append(c)
- b.append(wlen(c))
- b.extend([0] * (len(c) - 1))
+ s.append(c)
+ b.append(str_width(c))
+ b.extend([0] * (len(c) - 1))
+ else:
+ s.append(c)
+ b.append(str_width(c))
return "".join(s), b
@@ -230,7 +234,6 @@ class Reader:
commands: dict[str, type[Command]] = field(default_factory=make_default_commands)
last_command: type[Command] | None = None
syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table)
- msg_at_bottom: bool = True
keymap: tuple[tuple[str, str], ...] = ()
input_trans: input.KeymapTranslator = field(init=False)
input_trans_stack: list[input.KeymapTranslator] = field(default_factory=list)
@@ -238,8 +241,52 @@ class Reader:
screeninfo: list[tuple[int, list[int]]] = field(init=False)
cxy: tuple[int, int] = field(init=False)
lxy: tuple[int, int] = field(init=False)
- calc_screen: CalcScreen = field(init=False)
scheduled_commands: list[str] = field(default_factory=list)
+ can_colorize: bool = False
+
+ ## cached metadata to speed up screen refreshes
+ @dataclass
+ class RefreshCache:
+ in_bracketed_paste: bool = False
+ screen: list[str] = field(default_factory=list)
+ screeninfo: list[tuple[int, list[int]]] = field(init=False)
+ line_end_offsets: list[int] = field(default_factory=list)
+ pos: int = field(init=False)
+ cxy: tuple[int, int] = field(init=False)
+ dimensions: tuple[int, int] = field(init=False)
+
+ def update_cache(self,
+ reader: Reader,
+ screen: list[str],
+ screeninfo: list[tuple[int, list[int]]],
+ ) -> None:
+ self.in_bracketed_paste = reader.in_bracketed_paste
+ self.screen = screen.copy()
+ self.screeninfo = screeninfo.copy()
+ self.pos = reader.pos
+ self.cxy = reader.cxy
+ self.dimensions = reader.console.width, reader.console.height
+
+ def valid(self, reader: Reader) -> bool:
+ dimensions = reader.console.width, reader.console.height
+ dimensions_changed = dimensions != self.dimensions
+ paste_changed = reader.in_bracketed_paste != self.in_bracketed_paste
+ return not (dimensions_changed or paste_changed)
+
+ def get_cached_location(self, reader: Reader) -> tuple[int, int]:
+ offset = 0
+ earliest_common_pos = min(reader.pos, self.pos)
+ num_common_lines = len(self.line_end_offsets)
+ while num_common_lines > 0:
+ offset = self.line_end_offsets[num_common_lines - 1]
+ if earliest_common_pos > offset:
+ break
+ num_common_lines -= 1
+ else:
+ offset = 0
+ return offset, num_common_lines
+
+ last_refresh_cache: RefreshCache = field(default_factory=RefreshCache)
def __post_init__(self) -> None:
# Enable the use of `insert` without a `prepare` call - necessary to
@@ -252,53 +299,60 @@ class Reader:
self.screeninfo = [(0, [])]
self.cxy = self.pos2xy()
self.lxy = (self.pos, 0)
- self.calc_screen = self.calc_complete_screen
+ self.can_colorize = can_colorize()
+
+ self.last_refresh_cache.screeninfo = self.screeninfo
+ self.last_refresh_cache.pos = self.pos
+ self.last_refresh_cache.cxy = self.cxy
+ self.last_refresh_cache.dimensions = (0, 0)
def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
return default_keymap
- def append_to_screen(self) -> list[str]:
- new_screen = self.screen.copy() or ['']
+ def calc_screen(self) -> list[str]:
+ """Translate changes in self.buffer into changes in self.console.screen."""
+ # Since the last call to calc_screen:
+ # screen and screeninfo may differ due to a completion menu being shown
+ # pos and cxy may differ due to edits, cursor movements, or completion menus
- new_character = self.buffer[-1]
- new_character_len = wlen(new_character)
+ # Lines that are above both the old and new cursor position can't have changed,
+ # unless the terminal has been resized (which might cause reflowing) or we've
+ # entered or left paste mode (which changes prompts, causing reflowing).
+ num_common_lines = 0
+ offset = 0
+ if self.last_refresh_cache.valid(self):
+ offset, num_common_lines = self.last_refresh_cache.get_cached_location(self)
- last_line_len = wlen(new_screen[-1])
- if last_line_len + new_character_len >= self.console.width: # We need to wrap here
- new_screen[-1] += '\\'
- self.screeninfo[-1][1].append(1)
- new_screen.append(self.buffer[-1])
- self.screeninfo.append((0, [new_character_len]))
- else:
- new_screen[-1] += self.buffer[-1]
- self.screeninfo[-1][1].append(new_character_len)
- self.cxy = self.pos2xy()
+ screen = self.last_refresh_cache.screen
+ del screen[num_common_lines:]
- # Reset the function that is used for completing the screen
- self.calc_screen = self.calc_complete_screen
- return new_screen
+ screeninfo = self.last_refresh_cache.screeninfo
+ del screeninfo[num_common_lines:]
+
+ last_refresh_line_end_offsets = self.last_refresh_cache.line_end_offsets
+ del last_refresh_line_end_offsets[num_common_lines:]
- def calc_complete_screen(self) -> list[str]:
- """The purpose of this method is to translate changes in
- self.buffer into changes in self.screen. Currently it rips
- everything down and starts from scratch, which whilst not
- especially efficient is certainly simple(r).
- """
- lines = self.get_unicode().split("\n")
- screen: list[str] = []
- screeninfo: list[tuple[int, list[int]]] = []
pos = self.pos
- for ln, line in enumerate(lines):
+ pos -= offset
+
+ lines = "".join(self.buffer[offset:]).split("\n")
+ cursor_found = False
+ lines_beyond_cursor = 0
+ for ln, line in enumerate(lines, num_common_lines):
ll = len(line)
if 0 <= pos <= ll:
- if self.msg and not self.msg_at_bottom:
- for mline in self.msg.split("\n"):
- screen.append(mline)
- screeninfo.append((0, []))
self.lxy = pos, ln
+ cursor_found = True
+ elif cursor_found:
+ lines_beyond_cursor += 1
+ if lines_beyond_cursor > self.console.height:
+ # No need to keep formatting lines.
+ # The console can't show them.
+ break
prompt = self.get_prompt(ln, ll >= pos >= 0)
while "\n" in prompt:
pre_prompt, _, prompt = prompt.partition("\n")
+ last_refresh_line_end_offsets.append(offset)
screen.append(pre_prompt)
screeninfo.append((0, []))
pos -= ll + 1
@@ -306,6 +360,8 @@ class Reader:
l, l2 = disp_str(line)
wrapcount = (wlen(l) + lp) // self.console.width
if wrapcount == 0:
+ offset += ll + 1 # Takes all of the line plus the newline
+ last_refresh_line_end_offsets.append(offset)
screen.append(prompt + l)
screeninfo.append((lp, l2))
else:
@@ -321,11 +377,14 @@ class Reader:
column += character_width
pre = prompt if i == 0 else ""
if len(l) > index_to_wrap_before:
+ offset += index_to_wrap_before
post = "\\"
after = [1]
else:
+ offset += index_to_wrap_before + 1 # Takes the newline
post = ""
after = []
+ last_refresh_line_end_offsets.append(offset)
screen.append(pre + l[:index_to_wrap_before] + post)
screeninfo.append((prelen, l2[:index_to_wrap_before] + after))
l = l[index_to_wrap_before:]
@@ -333,10 +392,12 @@ class Reader:
i += 1
self.screeninfo = screeninfo
self.cxy = self.pos2xy()
- if self.msg and self.msg_at_bottom:
+ if self.msg:
for mline in self.msg.split("\n"):
screen.append(mline)
screeninfo.append((0, []))
+
+ self.last_refresh_cache.update_cache(self, screen, screeninfo)
return screen
@staticmethod
@@ -456,7 +517,7 @@ class Reader:
`lineno'."""
if self.arg is not None and cursor_on_line:
prompt = f"(arg: {self.arg}) "
- elif self.paste_mode:
+ elif self.paste_mode and not self.in_bracketed_paste:
prompt = "(paste) "
elif "\n" in self.buffer:
if lineno == 0:
@@ -468,7 +529,7 @@ class Reader:
else:
prompt = self.ps1
- if can_colorize():
+ if self.can_colorize:
prompt = f"{ANSIColors.BOLD_MAGENTA}{prompt}{ANSIColors.RESET}"
return prompt
@@ -604,6 +665,9 @@ class Reader:
def refresh(self) -> None:
"""Recalculate and refresh the screen."""
+ if self.in_bracketed_paste and self.buffer and not self.buffer[-1] == "\n":
+ return
+
# this call sets up self.cxy, so call it first.
self.screen = self.calc_screen()
self.console.refresh(self.screen, self.cxy)
@@ -627,7 +691,7 @@ class Reader:
self.after_command(command)
- if self.dirty and not self.in_bracketed_paste:
+ if self.dirty:
self.refresh()
else:
self.update_cursor()
diff --git a/Lib/_pyrepl/readline.py b/Lib/_pyrepl/readline.py
index 7d811bf..b10d0c6 100644
--- a/Lib/_pyrepl/readline.py
+++ b/Lib/_pyrepl/readline.py
@@ -263,6 +263,10 @@ class maybe_accept(commands.Command):
r = self.reader # type: ignore[assignment]
r.dirty = True # this is needed to hide the completion menu, if visible
+ if self.reader.in_bracketed_paste:
+ r.insert("\n")
+ return
+
# if there are already several lines and the cursor
# is not on the last one, always insert a new \n.
text = r.get_unicode()
diff --git a/Lib/_pyrepl/unix_console.py b/Lib/_pyrepl/unix_console.py
index 2f73a59..f1a6b84 100644
--- a/Lib/_pyrepl/unix_console.py
+++ b/Lib/_pyrepl/unix_console.py
@@ -150,6 +150,8 @@ class UnixConsole(Console):
self.pollob = poll()
self.pollob.register(self.input_fd, select.POLLIN)
+ self.input_buffer = b""
+ self.input_buffer_pos = 0
curses.setupterm(term or None, self.output_fd)
self.term = term
@@ -197,6 +199,18 @@ class UnixConsole(Console):
self.event_queue = EventQueue(self.input_fd, self.encoding)
self.cursor_visible = 1
+ def __read(self, n: int) -> bytes:
+ if not self.input_buffer or self.input_buffer_pos >= len(self.input_buffer):
+ self.input_buffer = os.read(self.input_fd, 10000)
+
+ ret = self.input_buffer[self.input_buffer_pos : self.input_buffer_pos + n]
+ self.input_buffer_pos += len(ret)
+ if self.input_buffer_pos >= len(self.input_buffer):
+ self.input_buffer = b""
+ self.input_buffer_pos = 0
+ return ret
+
+
def change_encoding(self, encoding: str) -> None:
"""
Change the encoding used for I/O operations.
@@ -373,7 +387,7 @@ class UnixConsole(Console):
while self.event_queue.empty():
while True:
try:
- self.push_char(os.read(self.input_fd, 1))
+ self.push_char(self.__read(1))
except OSError as err:
if err.errno == errno.EINTR:
if not self.event_queue.empty():
@@ -491,7 +505,7 @@ class UnixConsole(Console):
e.raw += e.raw
amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0]
- raw = os.read(self.input_fd, amount)
+ raw = self.__read(amount)
data = str(raw, self.encoding, "replace")
e.data += data
e.raw += raw
@@ -514,7 +528,7 @@ class UnixConsole(Console):
e.raw += e.raw
amount = 10000
- raw = os.read(self.input_fd, amount)
+ raw = self.__read(amount)
data = str(raw, self.encoding, "replace")
e.data += data
e.raw += raw
diff --git a/Lib/_pyrepl/utils.py b/Lib/_pyrepl/utils.py
index 96e917e..20dbb1f 100644
--- a/Lib/_pyrepl/utils.py
+++ b/Lib/_pyrepl/utils.py
@@ -16,6 +16,8 @@ def str_width(c: str) -> int:
def wlen(s: str) -> int:
+ if len(s) == 1:
+ return str_width(s)
length = sum(str_width(i) for i in s)
# remove lengths of any escape sequences
sequence = ANSI_ESCAPE_SEQUENCE.findall(s)