summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatt Wozniski <mwozniski@bloomberg.net>2025-05-06 09:44:49 (GMT)
committerGitHub <noreply@github.com>2025-05-06 09:44:49 (GMT)
commitfd37f1a8ad8e244ddccfc76608b02b6e29e30e16 (patch)
tree9aa78d00ccfe027913430ce9b9b053c86e38080a
parent120c9d42f278297d373096fbae035767192006d7 (diff)
downloadcpython-fd37f1a8ad8e244ddccfc76608b02b6e29e30e16.zip
cpython-fd37f1a8ad8e244ddccfc76608b02b6e29e30e16.tar.gz
cpython-fd37f1a8ad8e244ddccfc76608b02b6e29e30e16.tar.bz2
gh-133490: Fix syntax highlighting for remote PDB (#133494)
-rw-r--r--Lib/_pyrepl/utils.py11
-rw-r--r--Lib/pdb.py2
-rw-r--r--Lib/test/test_remote_pdb.py152
3 files changed, 158 insertions, 7 deletions
diff --git a/Lib/_pyrepl/utils.py b/Lib/_pyrepl/utils.py
index dd327d6..38cf6b5 100644
--- a/Lib/_pyrepl/utils.py
+++ b/Lib/_pyrepl/utils.py
@@ -23,9 +23,9 @@ IDENTIFIERS_AFTER = {"def", "class"}
BUILTINS = {str(name) for name in dir(builtins) if not name.startswith('_')}
-def THEME():
+def THEME(**kwargs):
# Not cached: the user can modify the theme inside the interactive session.
- return _colorize.get_theme().syntax
+ return _colorize.get_theme(**kwargs).syntax
class Span(NamedTuple):
@@ -254,7 +254,10 @@ def is_soft_keyword_used(*tokens: TI | None) -> bool:
def disp_str(
- buffer: str, colors: list[ColorSpan] | None = None, start_index: int = 0
+ buffer: str,
+ colors: list[ColorSpan] | None = None,
+ start_index: int = 0,
+ force_color: bool = False,
) -> tuple[CharBuffer, CharWidths]:
r"""Decompose the input buffer into a printable variant with applied colors.
@@ -295,7 +298,7 @@ def disp_str(
# move past irrelevant spans
colors.pop(0)
- theme = THEME()
+ theme = THEME(force_color=force_color)
pre_color = ""
post_color = ""
if colors and colors[0].span.start < start_index:
diff --git a/Lib/pdb.py b/Lib/pdb.py
index 4efda17..f89d104 100644
--- a/Lib/pdb.py
+++ b/Lib/pdb.py
@@ -1069,7 +1069,7 @@ class Pdb(bdb.Bdb, cmd.Cmd):
def _colorize_code(self, code):
if self.colorize:
colors = list(_pyrepl.utils.gen_colors(code))
- chars, _ = _pyrepl.utils.disp_str(code, colors=colors)
+ chars, _ = _pyrepl.utils.disp_str(code, colors=colors, force_color=True)
code = "".join(chars)
return code
diff --git a/Lib/test/test_remote_pdb.py b/Lib/test/test_remote_pdb.py
index 7c77bed..aef8a6b 100644
--- a/Lib/test/test_remote_pdb.py
+++ b/Lib/test/test_remote_pdb.py
@@ -3,6 +3,7 @@ import time
import itertools
import json
import os
+import re
import signal
import socket
import subprocess
@@ -12,9 +13,9 @@ import textwrap
import threading
import unittest
import unittest.mock
-from contextlib import closing, contextmanager, redirect_stdout, ExitStack
+from contextlib import closing, contextmanager, redirect_stdout, redirect_stderr, ExitStack
from pathlib import Path
-from test.support import is_wasi, os_helper, requires_subprocess, SHORT_TIMEOUT
+from test.support import is_wasi, cpython_only, force_color, requires_subprocess, SHORT_TIMEOUT
from test.support.os_helper import temp_dir, TESTFN, unlink
from typing import Dict, List, Optional, Tuple, Union, Any
@@ -1431,5 +1432,152 @@ class PdbConnectTestCase(unittest.TestCase):
self.assertIn("Function returned: 42", stdout)
self.assertEqual(process.returncode, 0)
+
+def _supports_remote_attaching():
+ from contextlib import suppress
+ PROCESS_VM_READV_SUPPORTED = False
+
+ try:
+ from _remote_debugging import PROCESS_VM_READV_SUPPORTED
+ except ImportError:
+ pass
+
+ return PROCESS_VM_READV_SUPPORTED
+
+
+@unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not enabled")
+@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32",
+ "Test only runs on Linux, Windows and MacOS")
+@unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(),
+ "Testing on Linux requires process_vm_readv support")
+@cpython_only
+@requires_subprocess()
+class PdbAttachTestCase(unittest.TestCase):
+ def setUp(self):
+ # Create a server socket that will wait for the debugger to connect
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.bind(('127.0.0.1', 0)) # Let OS assign port
+ self.sock.listen(1)
+ self.port = self.sock.getsockname()[1]
+ self._create_script()
+
+ def _create_script(self, script=None):
+ # Create a file for subprocess script
+ script = textwrap.dedent(
+ f"""
+ import socket
+ import time
+
+ def foo():
+ return bar()
+
+ def bar():
+ return baz()
+
+ def baz():
+ x = 1
+ # Trigger attach
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(('127.0.0.1', {self.port}))
+ sock.close()
+ count = 0
+ while x == 1 and count < 100:
+ count += 1
+ time.sleep(0.1)
+ return x
+
+ result = foo()
+ print(f"Function returned: {{result}}")
+ """
+ )
+
+ self.script_path = TESTFN + "_connect_test.py"
+ with open(self.script_path, 'w') as f:
+ f.write(script)
+
+ def tearDown(self):
+ self.sock.close()
+ try:
+ unlink(self.script_path)
+ except OSError:
+ pass
+
+ def do_integration_test(self, client_stdin):
+ process = subprocess.Popen(
+ [sys.executable, self.script_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True
+ )
+ self.addCleanup(process.stdout.close)
+ self.addCleanup(process.stderr.close)
+
+ # Wait for the process to reach our attachment point
+ self.sock.settimeout(10)
+ conn, _ = self.sock.accept()
+ conn.close()
+
+ client_stdin = io.StringIO(client_stdin)
+ client_stdout = io.StringIO()
+ client_stderr = io.StringIO()
+
+ self.addCleanup(client_stdin.close)
+ self.addCleanup(client_stdout.close)
+ self.addCleanup(client_stderr.close)
+ self.addCleanup(process.wait)
+
+ with (
+ unittest.mock.patch("sys.stdin", client_stdin),
+ redirect_stdout(client_stdout),
+ redirect_stderr(client_stderr),
+ unittest.mock.patch("sys.argv", ["pdb", "-p", str(process.pid)]),
+ ):
+ try:
+ pdb.main()
+ except PermissionError:
+ self.skipTest("Insufficient permissions for remote execution")
+
+ process.wait()
+ server_stdout = process.stdout.read()
+ server_stderr = process.stderr.read()
+
+ if process.returncode != 0:
+ print("server failed")
+ print(f"server stdout:\n{server_stdout}")
+ print(f"server stderr:\n{server_stderr}")
+
+ self.assertEqual(process.returncode, 0)
+ return {
+ "client": {
+ "stdout": client_stdout.getvalue(),
+ "stderr": client_stderr.getvalue(),
+ },
+ "server": {
+ "stdout": server_stdout,
+ "stderr": server_stderr,
+ },
+ }
+
+ def test_attach_to_process_without_colors(self):
+ with force_color(False):
+ output = self.do_integration_test("ll\nx=42\n")
+ self.assertEqual(output["client"]["stderr"], "")
+ self.assertEqual(output["server"]["stderr"], "")
+
+ self.assertEqual(output["server"]["stdout"], "Function returned: 42\n")
+ self.assertIn("while x == 1", output["client"]["stdout"])
+ self.assertNotIn("\x1b", output["client"]["stdout"])
+
+ def test_attach_to_process_with_colors(self):
+ with force_color(True):
+ output = self.do_integration_test("ll\nx=42\n")
+ self.assertEqual(output["client"]["stderr"], "")
+ self.assertEqual(output["server"]["stderr"], "")
+
+ self.assertEqual(output["server"]["stdout"], "Function returned: 42\n")
+ self.assertIn("\x1b", output["client"]["stdout"])
+ self.assertNotIn("while x == 1", output["client"]["stdout"])
+ self.assertIn("while x == 1", re.sub("\x1b[^m]*m", "", output["client"]["stdout"]))
+
if __name__ == "__main__":
unittest.main()