summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_interpreters
diff options
context:
space:
mode:
authorEric Snow <ericsnowcurrently@gmail.com>2024-02-28 23:08:08 (GMT)
committerGitHub <noreply@github.com>2024-02-28 23:08:08 (GMT)
commite80abd57a82ea1beae0a82423d45c6eb8c5c5c74 (patch)
tree3fd81507ec511caa0a98f2141ce8af6953359fb0 /Lib/test/test_interpreters
parent67c19e57b5c928278ebd191a545979ce786f06b3 (diff)
downloadcpython-e80abd57a82ea1beae0a82423d45c6eb8c5c5c74.zip
cpython-e80abd57a82ea1beae0a82423d45c6eb8c5c5c74.tar.gz
cpython-e80abd57a82ea1beae0a82423d45c6eb8c5c5c74.tar.bz2
gh-76785: Update test.support.interpreters to Align With PEP 734 (gh-115566)
This brings the code under test.support.interpreters, and the corresponding extension modules, in line with recent updates to PEP 734. (Note: PEP 734 has not been accepted at this time. However, we are using an internal copy of the implementation in the test suite to exercise the existing subinterpreters feature.)
Diffstat (limited to 'Lib/test/test_interpreters')
-rw-r--r--Lib/test/test_interpreters/test_api.py291
-rw-r--r--Lib/test/test_interpreters/test_channels.py4
-rw-r--r--Lib/test/test_interpreters/test_lifecycle.py2
-rw-r--r--Lib/test/test_interpreters/test_queues.py151
-rw-r--r--Lib/test/test_interpreters/utils.py19
5 files changed, 371 insertions, 96 deletions
diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py
index aefd326..363143f 100644
--- a/Lib/test/test_interpreters/test_api.py
+++ b/Lib/test/test_interpreters/test_api.py
@@ -280,7 +280,7 @@ class TestInterpreterIsRunning(TestBase):
def test_finished(self):
r, w = self.pipe()
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import os
os.write({w}, b'x')
""")
@@ -312,7 +312,7 @@ class TestInterpreterIsRunning(TestBase):
FINISHED = b'F'
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import os
import threading
@@ -326,7 +326,7 @@ class TestInterpreterIsRunning(TestBase):
self.assertFalse(interp.is_running())
os.write(w_thread, DONE)
- interp.exec_sync('t.join()')
+ interp.exec('t.join()')
self.assertEqual(os.read(r_interp, 1), FINISHED)
@@ -393,7 +393,7 @@ class TestInterpreterClose(TestBase):
interp2 = interpreters.create()
self.assertEqual(set(interpreters.list_all()),
{main, interp1, interp2})
- interp1.exec_sync(dedent(f"""
+ interp1.exec(dedent(f"""
from test.support import interpreters
interp2 = interpreters.Interpreter({interp2.id})
interp2.close()
@@ -427,7 +427,7 @@ class TestInterpreterClose(TestBase):
FINISHED = b'F'
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import os
import threading
import time
@@ -503,27 +503,27 @@ class TestInterpreterPrepareMain(TestBase):
interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'})
# Make sure neither was actually bound.
- with self.assertRaises(interpreters.ExecFailure):
- interp.exec_sync('print(foo)')
- with self.assertRaises(interpreters.ExecFailure):
- interp.exec_sync('print(spam)')
+ with self.assertRaises(interpreters.ExecutionFailed):
+ interp.exec('print(foo)')
+ with self.assertRaises(interpreters.ExecutionFailed):
+ interp.exec('print(spam)')
-class TestInterpreterExecSync(TestBase):
+class TestInterpreterExec(TestBase):
def test_success(self):
interp = interpreters.create()
script, file = _captured_script('print("it worked!", end="")')
with file:
- interp.exec_sync(script)
+ interp.exec(script)
out = file.read()
self.assertEqual(out, 'it worked!')
def test_failure(self):
interp = interpreters.create()
- with self.assertRaises(interpreters.ExecFailure):
- interp.exec_sync('raise Exception')
+ with self.assertRaises(interpreters.ExecutionFailed):
+ interp.exec('raise Exception')
def test_display_preserved_exception(self):
tempdir = self.temp_dir()
@@ -542,21 +542,21 @@ class TestInterpreterExecSync(TestBase):
spam.eggs()
interp = interpreters.create()
- interp.exec_sync(script)
+ interp.exec(script)
""")
stdout, stderr = self.assert_python_failure(scriptfile)
self.maxDiff = None
- interpmod_line, = (l for l in stderr.splitlines() if ' exec_sync' in l)
- # File "{interpreters.__file__}", line 179, in exec_sync
+ interpmod_line, = (l for l in stderr.splitlines() if ' exec' in l)
+ # File "{interpreters.__file__}", line 179, in exec
self.assertEqual(stderr, dedent(f"""\
Traceback (most recent call last):
File "{scriptfile}", line 9, in <module>
- interp.exec_sync(script)
- ~~~~~~~~~~~~~~~~^^^^^^^^
+ interp.exec(script)
+ ~~~~~~~~~~~^^^^^^^^
{interpmod_line.strip()}
- raise ExecFailure(excinfo)
- test.support.interpreters.ExecFailure: RuntimeError: uh-oh!
+ raise ExecutionFailed(excinfo)
+ test.support.interpreters.ExecutionFailed: RuntimeError: uh-oh!
Uncaught in the interpreter:
@@ -578,7 +578,7 @@ class TestInterpreterExecSync(TestBase):
script, file = _captured_script('print("it worked!", end="")')
with file:
def f():
- interp.exec_sync(script)
+ interp.exec(script)
t = threading.Thread(target=f)
t.start()
@@ -604,7 +604,7 @@ class TestInterpreterExecSync(TestBase):
with open('{file.name}', 'w', encoding='utf-8') as out:
out.write('{expected}')
""")
- interp.exec_sync(script)
+ interp.exec(script)
file.seek(0)
content = file.read()
@@ -615,17 +615,17 @@ class TestInterpreterExecSync(TestBase):
interp = interpreters.create()
with _running(interp):
with self.assertRaises(RuntimeError):
- interp.exec_sync('print("spam")')
+ interp.exec('print("spam")')
def test_bad_script(self):
interp = interpreters.create()
with self.assertRaises(TypeError):
- interp.exec_sync(10)
+ interp.exec(10)
def test_bytes_for_script(self):
interp = interpreters.create()
with self.assertRaises(TypeError):
- interp.exec_sync(b'print("spam")')
+ interp.exec(b'print("spam")')
def test_with_background_threads_still_running(self):
r_interp, w_interp = self.pipe()
@@ -636,7 +636,7 @@ class TestInterpreterExecSync(TestBase):
FINISHED = b'F'
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import os
import threading
@@ -648,46 +648,229 @@ class TestInterpreterExecSync(TestBase):
t.start()
os.write({w_interp}, {RAN!r})
""")
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
os.write({w_interp}, {RAN!r})
""")
os.write(w_thread, DONE)
- interp.exec_sync('t.join()')
+ interp.exec('t.join()')
self.assertEqual(os.read(r_interp, 1), RAN)
self.assertEqual(os.read(r_interp, 1), RAN)
self.assertEqual(os.read(r_interp, 1), FINISHED)
# test_xxsubinterpreters covers the remaining
- # Interpreter.exec_sync() behavior.
+ # Interpreter.exec() behavior.
-class TestInterpreterRun(TestBase):
-
- def test_success(self):
- interp = interpreters.create()
- script, file = _captured_script('print("it worked!", end="")')
- with file:
- t = interp.run(script)
+def call_func_noop():
+ pass
+
+
+def call_func_return_shareable():
+ return (1, None)
+
+
+def call_func_return_not_shareable():
+ return [1, 2, 3]
+
+
+def call_func_failure():
+ raise Exception('spam!')
+
+
+def call_func_ident(value):
+ return value
+
+
+def get_call_func_closure(value):
+ def call_func_closure():
+ return value
+ return call_func_closure
+
+
+class Spam:
+
+ @staticmethod
+ def noop():
+ pass
+
+ @classmethod
+ def from_values(cls, *values):
+ return cls(values)
+
+ def __init__(self, value):
+ self.value = value
+
+ def __call__(self, *args, **kwargs):
+ return (self.value, args, kwargs)
+
+ def __eq__(self, other):
+ if not isinstance(other, Spam):
+ return NotImplemented
+ return self.value == other.value
+
+ def run(self, *args, **kwargs):
+ return (self.value, args, kwargs)
+
+
+def call_func_complex(op, /, value=None, *args, exc=None, **kwargs):
+ if exc is not None:
+ raise exc
+ if op == '':
+ raise ValueError('missing op')
+ elif op == 'ident':
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ return value
+ elif op == 'full-ident':
+ return (value, args, kwargs)
+ elif op == 'globals':
+ if value is not None or args or kwargs:
+ raise Exception((value, args, kwargs))
+ return __name__
+ elif op == 'interpid':
+ if value is not None or args or kwargs:
+ raise Exception((value, args, kwargs))
+ return interpreters.get_current().id
+ elif op == 'closure':
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ return get_call_func_closure(value)
+ elif op == 'custom':
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ return Spam(value)
+ elif op == 'custom-inner':
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ class Eggs(Spam):
+ pass
+ return Eggs(value)
+ elif not isinstance(op, str):
+ raise TypeError(op)
+ else:
+ raise NotImplementedError(op)
+
+
+class TestInterpreterCall(TestBase):
+
+ # signature
+ # - blank
+ # - args
+ # - kwargs
+ # - args, kwargs
+ # return
+ # - nothing (None)
+ # - simple
+ # - closure
+ # - custom
+ # ops:
+ # - do nothing
+ # - fail
+ # - echo
+ # - do complex, relative to interpreter
+ # scope
+ # - global func
+ # - local closure
+ # - returned closure
+ # - callable type instance
+ # - type
+ # - classmethod
+ # - staticmethod
+ # - instance method
+ # exception
+ # - builtin
+ # - custom
+ # - preserves info (e.g. SyntaxError)
+ # - matching error display
+
+ def test_call(self):
+ interp = interpreters.create()
+
+ for i, (callable, args, kwargs) in enumerate([
+ (call_func_noop, (), {}),
+ (call_func_return_shareable, (), {}),
+ (call_func_return_not_shareable, (), {}),
+ (Spam.noop, (), {}),
+ ]):
+ with self.subTest(f'success case #{i+1}'):
+ res = interp.call(callable)
+ self.assertIs(res, None)
+
+ for i, (callable, args, kwargs) in enumerate([
+ (call_func_ident, ('spamspamspam',), {}),
+ (get_call_func_closure, (42,), {}),
+ (get_call_func_closure(42), (), {}),
+ (Spam.from_values, (), {}),
+ (Spam.from_values, (1, 2, 3), {}),
+ (Spam, ('???'), {}),
+ (Spam(101), (), {}),
+ (Spam(10101).run, (), {}),
+ (call_func_complex, ('ident', 'spam'), {}),
+ (call_func_complex, ('full-ident', 'spam'), {}),
+ (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
+ (call_func_complex, ('globals',), {}),
+ (call_func_complex, ('interpid',), {}),
+ (call_func_complex, ('closure',), {'value': '~~~'}),
+ (call_func_complex, ('custom', 'spam!'), {}),
+ (call_func_complex, ('custom-inner', 'eggs!'), {}),
+ (call_func_complex, ('???',), {'exc': ValueError('spam')}),
+ ]):
+ with self.subTest(f'invalid case #{i+1}'):
+ with self.assertRaises(Exception):
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ interp.call(callable)
+
+ with self.assertRaises(interpreters.ExecutionFailed):
+ interp.call(call_func_failure)
+
+ def test_call_in_thread(self):
+ interp = interpreters.create()
+
+ for i, (callable, args, kwargs) in enumerate([
+ (call_func_noop, (), {}),
+ (call_func_return_shareable, (), {}),
+ (call_func_return_not_shareable, (), {}),
+ (Spam.noop, (), {}),
+ ]):
+ with self.subTest(f'success case #{i+1}'):
+ with self.captured_thread_exception() as ctx:
+ t = interp.call_in_thread(callable)
+ t.join()
+ self.assertIsNone(ctx.caught)
+
+ for i, (callable, args, kwargs) in enumerate([
+ (call_func_ident, ('spamspamspam',), {}),
+ (get_call_func_closure, (42,), {}),
+ (get_call_func_closure(42), (), {}),
+ (Spam.from_values, (), {}),
+ (Spam.from_values, (1, 2, 3), {}),
+ (Spam, ('???'), {}),
+ (Spam(101), (), {}),
+ (Spam(10101).run, (), {}),
+ (call_func_complex, ('ident', 'spam'), {}),
+ (call_func_complex, ('full-ident', 'spam'), {}),
+ (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
+ (call_func_complex, ('globals',), {}),
+ (call_func_complex, ('interpid',), {}),
+ (call_func_complex, ('closure',), {'value': '~~~'}),
+ (call_func_complex, ('custom', 'spam!'), {}),
+ (call_func_complex, ('custom-inner', 'eggs!'), {}),
+ (call_func_complex, ('???',), {'exc': ValueError('spam')}),
+ ]):
+ with self.subTest(f'invalid case #{i+1}'):
+ if args or kwargs:
+ continue
+ with self.captured_thread_exception() as ctx:
+ t = interp.call_in_thread(callable)
+ t.join()
+ self.assertIsNotNone(ctx.caught)
+
+ with self.captured_thread_exception() as ctx:
+ t = interp.call_in_thread(call_func_failure)
t.join()
- out = file.read()
-
- self.assertEqual(out, 'it worked!')
-
- def test_failure(self):
- caught = False
- def excepthook(args):
- nonlocal caught
- caught = True
- threading.excepthook = excepthook
- try:
- interp = interpreters.create()
- t = interp.run('raise Exception')
- t.join()
-
- self.assertTrue(caught)
- except BaseException:
- threading.excepthook = threading.__excepthook__
+ self.assertIsNotNone(ctx.caught)
class TestIsShareable(TestBase):
diff --git a/Lib/test/test_interpreters/test_channels.py b/Lib/test/test_interpreters/test_channels.py
index 3c3e188..07e5038 100644
--- a/Lib/test/test_interpreters/test_channels.py
+++ b/Lib/test/test_interpreters/test_channels.py
@@ -120,7 +120,7 @@ class TestSendRecv(TestBase):
def test_send_recv_same_interpreter(self):
interp = interpreters.create()
- interp.exec_sync(dedent("""
+ interp.exec(dedent("""
from test.support.interpreters import channels
r, s = channels.create()
orig = b'spam'
@@ -193,7 +193,7 @@ class TestSendRecv(TestBase):
def test_send_recv_nowait_same_interpreter(self):
interp = interpreters.create()
- interp.exec_sync(dedent("""
+ interp.exec(dedent("""
from test.support.interpreters import channels
r, s = channels.create()
orig = b'spam'
diff --git a/Lib/test/test_interpreters/test_lifecycle.py b/Lib/test/test_interpreters/test_lifecycle.py
index c2917d8..67b6f43 100644
--- a/Lib/test/test_interpreters/test_lifecycle.py
+++ b/Lib/test/test_interpreters/test_lifecycle.py
@@ -124,7 +124,7 @@ class StartupTests(TestBase):
orig = sys.path[0]
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import json
import sys
print(json.dumps({{
diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py
index 2a8ca99..65b5435 100644
--- a/Lib/test/test_interpreters/test_queues.py
+++ b/Lib/test/test_interpreters/test_queues.py
@@ -51,20 +51,20 @@ class QueueTests(TestBase):
queue1 = queues.create()
interp = interpreters.create()
- interp.exec_sync(dedent(f"""
+ interp.exec(dedent(f"""
from test.support.interpreters import queues
queue1 = queues.Queue({queue1.id})
"""));
with self.subTest('same interpreter'):
queue2 = queues.create()
- queue1.put(queue2)
+ queue1.put(queue2, syncobj=True)
queue3 = queue1.get()
self.assertIs(queue3, queue2)
with self.subTest('from current interpreter'):
queue4 = queues.create()
- queue1.put(queue4)
+ queue1.put(queue4, syncobj=True)
out = _run_output(interp, dedent("""
queue4 = queue1.get()
print(queue4.id)
@@ -75,7 +75,7 @@ class QueueTests(TestBase):
with self.subTest('from subinterpreter'):
out = _run_output(interp, dedent("""
queue5 = queues.create()
- queue1.put(queue5)
+ queue1.put(queue5, syncobj=True)
print(queue5.id)
"""))
qid = int(out)
@@ -118,7 +118,7 @@ class TestQueueOps(TestBase):
def test_empty(self):
queue = queues.create()
before = queue.empty()
- queue.put(None)
+ queue.put(None, syncobj=True)
during = queue.empty()
queue.get()
after = queue.empty()
@@ -133,7 +133,7 @@ class TestQueueOps(TestBase):
queue = queues.create(3)
for _ in range(3):
actual.append(queue.full())
- queue.put(None)
+ queue.put(None, syncobj=True)
actual.append(queue.full())
for _ in range(3):
queue.get()
@@ -147,16 +147,16 @@ class TestQueueOps(TestBase):
queue = queues.create()
for _ in range(3):
actual.append(queue.qsize())
- queue.put(None)
+ queue.put(None, syncobj=True)
actual.append(queue.qsize())
queue.get()
actual.append(queue.qsize())
- queue.put(None)
+ queue.put(None, syncobj=True)
actual.append(queue.qsize())
for _ in range(3):
queue.get()
actual.append(queue.qsize())
- queue.put(None)
+ queue.put(None, syncobj=True)
actual.append(queue.qsize())
queue.get()
actual.append(queue.qsize())
@@ -165,30 +165,81 @@ class TestQueueOps(TestBase):
def test_put_get_main(self):
expected = list(range(20))
- queue = queues.create()
- for i in range(20):
- queue.put(i)
- actual = [queue.get() for _ in range(20)]
+ for syncobj in (True, False):
+ kwds = dict(syncobj=syncobj)
+ with self.subTest(f'syncobj={syncobj}'):
+ queue = queues.create()
+ for i in range(20):
+ queue.put(i, **kwds)
+ actual = [queue.get() for _ in range(20)]
- self.assertEqual(actual, expected)
+ self.assertEqual(actual, expected)
def test_put_timeout(self):
- queue = queues.create(2)
- queue.put(None)
- queue.put(None)
- with self.assertRaises(queues.QueueFull):
- queue.put(None, timeout=0.1)
- queue.get()
- queue.put(None)
+ for syncobj in (True, False):
+ kwds = dict(syncobj=syncobj)
+ with self.subTest(f'syncobj={syncobj}'):
+ queue = queues.create(2)
+ queue.put(None, **kwds)
+ queue.put(None, **kwds)
+ with self.assertRaises(queues.QueueFull):
+ queue.put(None, timeout=0.1, **kwds)
+ queue.get()
+ queue.put(None, **kwds)
def test_put_nowait(self):
- queue = queues.create(2)
- queue.put_nowait(None)
- queue.put_nowait(None)
- with self.assertRaises(queues.QueueFull):
- queue.put_nowait(None)
- queue.get()
- queue.put_nowait(None)
+ for syncobj in (True, False):
+ kwds = dict(syncobj=syncobj)
+ with self.subTest(f'syncobj={syncobj}'):
+ queue = queues.create(2)
+ queue.put_nowait(None, **kwds)
+ queue.put_nowait(None, **kwds)
+ with self.assertRaises(queues.QueueFull):
+ queue.put_nowait(None, **kwds)
+ queue.get()
+ queue.put_nowait(None, **kwds)
+
+ def test_put_syncobj(self):
+ for obj in [
+ None,
+ True,
+ 10,
+ 'spam',
+ b'spam',
+ (0, 'a'),
+ ]:
+ with self.subTest(repr(obj)):
+ queue = queues.create()
+ queue.put(obj, syncobj=True)
+ obj2 = queue.get()
+ self.assertEqual(obj2, obj)
+
+ for obj in [
+ [1, 2, 3],
+ {'a': 13, 'b': 17},
+ ]:
+ with self.subTest(repr(obj)):
+ queue = queues.create()
+ with self.assertRaises(interpreters.NotShareableError):
+ queue.put(obj, syncobj=True)
+
+ def test_put_not_syncobj(self):
+ for obj in [
+ None,
+ True,
+ 10,
+ 'spam',
+ b'spam',
+ (0, 'a'),
+ # not shareable
+ [1, 2, 3],
+ {'a': 13, 'b': 17},
+ ]:
+ with self.subTest(repr(obj)):
+ queue = queues.create()
+ queue.put(obj, syncobj=False)
+ obj2 = queue.get()
+ self.assertEqual(obj2, obj)
def test_get_timeout(self):
queue = queues.create()
@@ -200,13 +251,41 @@ class TestQueueOps(TestBase):
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
+ def test_put_get_default_syncobj(self):
+ expected = list(range(20))
+ queue = queues.create(syncobj=True)
+ for i in range(20):
+ queue.put(i)
+ actual = [queue.get() for _ in range(20)]
+
+ self.assertEqual(actual, expected)
+
+ obj = [1, 2, 3] # lists are not shareable
+ with self.assertRaises(interpreters.NotShareableError):
+ queue.put(obj)
+
+ def test_put_get_default_not_syncobj(self):
+ expected = list(range(20))
+ queue = queues.create(syncobj=False)
+ for i in range(20):
+ queue.put(i)
+ actual = [queue.get() for _ in range(20)]
+
+ self.assertEqual(actual, expected)
+
+ obj = [1, 2, 3] # lists are not shareable
+ queue.put(obj)
+ obj2 = queue.get()
+ self.assertEqual(obj, obj2)
+ self.assertIsNot(obj, obj2)
+
def test_put_get_same_interpreter(self):
interp = interpreters.create()
- interp.exec_sync(dedent("""
+ interp.exec(dedent("""
from test.support.interpreters import queues
queue = queues.create()
orig = b'spam'
- queue.put(orig)
+ queue.put(orig, syncobj=True)
obj = queue.get()
assert obj == orig, 'expected: obj == orig'
assert obj is not orig, 'expected: obj is not orig'
@@ -219,7 +298,7 @@ class TestQueueOps(TestBase):
self.assertEqual(len(queues.list_all()), 2)
obj1 = b'spam'
- queue1.put(obj1)
+ queue1.put(obj1, syncobj=True)
out = _run_output(
interp,
@@ -236,7 +315,7 @@ class TestQueueOps(TestBase):
obj2 = b'eggs'
print(id(obj2))
assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0'
- queue2.put(obj2)
+ queue2.put(obj2, syncobj=True)
assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1'
"""))
self.assertEqual(len(queues.list_all()), 2)
@@ -258,8 +337,8 @@ class TestQueueOps(TestBase):
queue = queues.Queue({queue.id})
obj1 = b'spam'
obj2 = b'eggs'
- queue.put(obj1)
- queue.put(obj2)
+ queue.put(obj1, syncobj=True)
+ queue.put(obj2, syncobj=True)
"""))
self.assertEqual(queue.qsize(), 2)
@@ -281,12 +360,12 @@ class TestQueueOps(TestBase):
break
except queues.QueueEmpty:
continue
- queue2.put(obj)
+ queue2.put(obj, syncobj=True)
t = threading.Thread(target=f)
t.start()
orig = b'spam'
- queue1.put(orig)
+ queue1.put(orig, syncobj=True)
obj = queue2.get()
t.join()
diff --git a/Lib/test/test_interpreters/utils.py b/Lib/test/test_interpreters/utils.py
index 3a37ed0..973d05d 100644
--- a/Lib/test/test_interpreters/utils.py
+++ b/Lib/test/test_interpreters/utils.py
@@ -4,8 +4,9 @@ import os.path
import subprocess
import sys
import tempfile
-import threading
from textwrap import dedent
+import threading
+import types
import unittest
from test import support
@@ -41,7 +42,7 @@ def _run_output(interp, request, init=None):
with rpipe:
if init:
interp.prepare_main(init)
- interp.exec_sync(script)
+ interp.exec(script)
return rpipe.read()
@@ -49,7 +50,7 @@ def _run_output(interp, request, init=None):
def _running(interp):
r, w = os.pipe()
def run():
- interp.exec_sync(dedent(f"""
+ interp.exec(dedent(f"""
# wait for "signal"
with open({r}) as rpipe:
rpipe.read()
@@ -84,6 +85,18 @@ class TestBase(unittest.TestCase):
self.addCleanup(lambda: os_helper.rmtree(tempdir))
return tempdir
+ @contextlib.contextmanager
+ def captured_thread_exception(self):
+ ctx = types.SimpleNamespace(caught=None)
+ def excepthook(args):
+ ctx.caught = args
+ orig_excepthook = threading.excepthook
+ threading.excepthook = excepthook
+ try:
+ yield ctx
+ finally:
+ threading.excepthook = orig_excepthook
+
def make_script(self, filename, dirname=None, text=None):
if text:
text = dedent(text)