summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/support/interpreters/__init__.py51
-rw-r--r--Lib/test/support/interpreters/queues.py74
-rw-r--r--Lib/test/test_interpreters/test_api.py291
-rw-r--r--Lib/test/test_interpreters/test_channels.py4
-rw-r--r--Lib/test/test_interpreters/test_lifecycle.py2
-rw-r--r--Lib/test/test_interpreters/test_queues.py151
-rw-r--r--Lib/test/test_interpreters/utils.py19
-rw-r--r--Lib/test/test_sys.py4
-rw-r--r--Lib/test/test_threading.py2
9 files changed, 477 insertions, 121 deletions
diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py
index 15a908e..d02ffba 100644
--- a/Lib/test/support/interpreters/__init__.py
+++ b/Lib/test/support/interpreters/__init__.py
@@ -6,7 +6,7 @@ import _xxsubinterpreters as _interpreters
# aliases:
from _xxsubinterpreters import (
- InterpreterError, InterpreterNotFoundError,
+ InterpreterError, InterpreterNotFoundError, NotShareableError,
is_shareable,
)
@@ -14,7 +14,8 @@ from _xxsubinterpreters import (
__all__ = [
'get_current', 'get_main', 'create', 'list_all', 'is_shareable',
'Interpreter',
- 'InterpreterError', 'InterpreterNotFoundError', 'ExecFailure',
+ 'InterpreterError', 'InterpreterNotFoundError', 'ExecutionFailed',
+ 'NotShareableError',
'create_queue', 'Queue', 'QueueEmpty', 'QueueFull',
]
@@ -42,7 +43,11 @@ Uncaught in the interpreter:
{formatted}
""".strip()
-class ExecFailure(RuntimeError):
+class ExecutionFailed(RuntimeError):
+ """An unhandled exception happened during execution.
+
+ This is raised from Interpreter.exec() and Interpreter.call().
+ """
def __init__(self, excinfo):
msg = excinfo.formatted
@@ -157,7 +162,7 @@ class Interpreter:
ns = dict(ns, **kwargs) if ns is not None else kwargs
_interpreters.set___main___attrs(self._id, ns)
- def exec_sync(self, code, /):
+ def exec(self, code, /):
"""Run the given source code in the interpreter.
This is essentially the same as calling the builtin "exec"
@@ -166,10 +171,10 @@ class Interpreter:
There is no return value.
- If the code raises an unhandled exception then an ExecFailure
- is raised, which summarizes the unhandled exception. The actual
- exception is discarded because objects cannot be shared between
- interpreters.
+ If the code raises an unhandled exception then an ExecutionFailed
+ exception is raised, which summarizes the unhandled exception.
+ The actual exception is discarded because objects cannot be
+ shared between interpreters.
This blocks the current Python thread until done. During
that time, the previous interpreter is allowed to run
@@ -177,11 +182,35 @@ class Interpreter:
"""
excinfo = _interpreters.exec(self._id, code)
if excinfo is not None:
- raise ExecFailure(excinfo)
+ raise ExecutionFailed(excinfo)
+
+ def call(self, callable, /):
+ """Call the object in the interpreter with given args/kwargs.
+
+ Only functions that take no arguments and have no closure
+ are supported.
- def run(self, code, /):
+ The return value is discarded.
+
+ If the callable raises an exception then the error display
+ (including full traceback) is send back between the interpreters
+ and an ExecutionFailed exception is raised, much like what
+ happens with Interpreter.exec().
+ """
+ # XXX Support args and kwargs.
+ # XXX Support arbitrary callables.
+ # XXX Support returning the return value (e.g. via pickle).
+ excinfo = _interpreters.call(self._id, callable)
+ if excinfo is not None:
+ raise ExecutionFailed(excinfo)
+
+ def call_in_thread(self, callable, /):
+ """Return a new thread that calls the object in the interpreter.
+
+ The return value and any raised exception are discarded.
+ """
def task():
- self.exec_sync(code)
+ self.call(callable)
t = threading.Thread(target=task)
t.start()
return t
diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py
index aead0c4..2cc616b 100644
--- a/Lib/test/support/interpreters/queues.py
+++ b/Lib/test/support/interpreters/queues.py
@@ -1,5 +1,6 @@
"""Cross-interpreter Queues High Level Module."""
+import pickle
import queue
import time
import weakref
@@ -31,20 +32,26 @@ class QueueFull(_queues.QueueFull, queue.Full):
"""
-def create(maxsize=0):
+_SHARED_ONLY = 0
+_PICKLED = 1
+
+def create(maxsize=0, *, syncobj=False):
"""Return a new cross-interpreter queue.
The queue may be used to pass data safely between interpreters.
+
+ "syncobj" sets the default for Queue.put()
+ and Queue.put_nowait().
"""
- qid = _queues.create(maxsize)
- return Queue(qid)
+ fmt = _SHARED_ONLY if syncobj else _PICKLED
+ qid = _queues.create(maxsize, fmt)
+ return Queue(qid, _fmt=fmt)
def list_all():
"""Return a list of all open queues."""
- return [Queue(qid)
- for qid in _queues.list_all()]
-
+ return [Queue(qid, _fmt=fmt)
+ for qid, fmt in _queues.list_all()]
_known_queues = weakref.WeakValueDictionary()
@@ -52,17 +59,20 @@ _known_queues = weakref.WeakValueDictionary()
class Queue:
"""A cross-interpreter queue."""
- def __new__(cls, id, /):
+ def __new__(cls, id, /, *, _fmt=None):
# There is only one instance for any given ID.
if isinstance(id, int):
id = int(id)
else:
raise TypeError(f'id must be an int, got {id!r}')
+ if _fmt is None:
+ _fmt = _queues.get_default_fmt(id)
try:
self = _known_queues[id]
except KeyError:
self = super().__new__(cls)
self._id = id
+ self._fmt = _fmt
_known_queues[id] = self
_queues.bind(id)
return self
@@ -105,20 +115,50 @@ class Queue:
return _queues.get_count(self._id)
def put(self, obj, timeout=None, *,
+ syncobj=None,
_delay=10 / 1000, # 10 milliseconds
):
"""Add the object to the queue.
This blocks while the queue is full.
+
+ If "syncobj" is None (the default) then it uses the
+ queue's default, set with create_queue()..
+
+ If "syncobj" is false then all objects are supported,
+ at the expense of worse performance.
+
+ If "syncobj" is true then the object must be "shareable".
+ Examples of "shareable" objects include the builtin singletons,
+ str, and memoryview. One benefit is that such objects are
+ passed through the queue efficiently.
+
+ The key difference, though, is conceptual: the corresponding
+ object returned from Queue.get() will be strictly equivalent
+ to the given obj. In other words, the two objects will be
+ effectively indistinguishable from each other, even if the
+ object is mutable. The received object may actually be the
+ same object, or a copy (immutable values only), or a proxy.
+ Regardless, the received object should be treated as though
+ the original has been shared directly, whether or not it
+ actually is. That's a slightly different and stronger promise
+ than just (initial) equality, which is all "syncobj=False"
+ can promise.
"""
+ if syncobj is None:
+ fmt = self._fmt
+ else:
+ fmt = _SHARED_ONLY if syncobj else _PICKLED
if timeout is not None:
timeout = int(timeout)
if timeout < 0:
raise ValueError(f'timeout value must be non-negative')
end = time.time() + timeout
+ if fmt is _PICKLED:
+ obj = pickle.dumps(obj)
while True:
try:
- _queues.put(self._id, obj)
+ _queues.put(self._id, obj, fmt)
except _queues.QueueFull as exc:
if timeout is not None and time.time() >= end:
exc.__class__ = QueueFull
@@ -127,9 +167,15 @@ class Queue:
else:
break
- def put_nowait(self, obj):
+ def put_nowait(self, obj, *, syncobj=None):
+ if syncobj is None:
+ fmt = self._fmt
+ else:
+ fmt = _SHARED_ONLY if syncobj else _PICKLED
+ if fmt is _PICKLED:
+ obj = pickle.dumps(obj)
try:
- return _queues.put(self._id, obj)
+ _queues.put(self._id, obj, fmt)
except _queues.QueueFull as exc:
exc.__class__ = QueueFull
raise # re-raise
@@ -148,12 +194,18 @@ class Queue:
end = time.time() + timeout
while True:
try:
- return _queues.get(self._id)
+ obj, fmt = _queues.get(self._id)
except _queues.QueueEmpty as exc:
if timeout is not None and time.time() >= end:
exc.__class__ = QueueEmpty
raise # re-raise
time.sleep(_delay)
+ else:
+ break
+ if fmt == _PICKLED:
+ obj = pickle.loads(obj)
+ else:
+ assert fmt == _SHARED_ONLY
return obj
def get_nowait(self):
diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py
index aefd326..363143f 100644
--- a/Lib/test/test_interpreters/test_api.py
+++ b/Lib/test/test_interpreters/test_api.py
@@ -280,7 +280,7 @@ class TestInterpreterIsRunning(TestBase):
def test_finished(self):
r, w = self.pipe()
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import os
os.write({w}, b'x')
""")
@@ -312,7 +312,7 @@ class TestInterpreterIsRunning(TestBase):
FINISHED = b'F'
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import os
import threading
@@ -326,7 +326,7 @@ class TestInterpreterIsRunning(TestBase):
self.assertFalse(interp.is_running())
os.write(w_thread, DONE)
- interp.exec_sync('t.join()')
+ interp.exec('t.join()')
self.assertEqual(os.read(r_interp, 1), FINISHED)
@@ -393,7 +393,7 @@ class TestInterpreterClose(TestBase):
interp2 = interpreters.create()
self.assertEqual(set(interpreters.list_all()),
{main, interp1, interp2})
- interp1.exec_sync(dedent(f"""
+ interp1.exec(dedent(f"""
from test.support import interpreters
interp2 = interpreters.Interpreter({interp2.id})
interp2.close()
@@ -427,7 +427,7 @@ class TestInterpreterClose(TestBase):
FINISHED = b'F'
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import os
import threading
import time
@@ -503,27 +503,27 @@ class TestInterpreterPrepareMain(TestBase):
interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'})
# Make sure neither was actually bound.
- with self.assertRaises(interpreters.ExecFailure):
- interp.exec_sync('print(foo)')
- with self.assertRaises(interpreters.ExecFailure):
- interp.exec_sync('print(spam)')
+ with self.assertRaises(interpreters.ExecutionFailed):
+ interp.exec('print(foo)')
+ with self.assertRaises(interpreters.ExecutionFailed):
+ interp.exec('print(spam)')
-class TestInterpreterExecSync(TestBase):
+class TestInterpreterExec(TestBase):
def test_success(self):
interp = interpreters.create()
script, file = _captured_script('print("it worked!", end="")')
with file:
- interp.exec_sync(script)
+ interp.exec(script)
out = file.read()
self.assertEqual(out, 'it worked!')
def test_failure(self):
interp = interpreters.create()
- with self.assertRaises(interpreters.ExecFailure):
- interp.exec_sync('raise Exception')
+ with self.assertRaises(interpreters.ExecutionFailed):
+ interp.exec('raise Exception')
def test_display_preserved_exception(self):
tempdir = self.temp_dir()
@@ -542,21 +542,21 @@ class TestInterpreterExecSync(TestBase):
spam.eggs()
interp = interpreters.create()
- interp.exec_sync(script)
+ interp.exec(script)
""")
stdout, stderr = self.assert_python_failure(scriptfile)
self.maxDiff = None
- interpmod_line, = (l for l in stderr.splitlines() if ' exec_sync' in l)
- # File "{interpreters.__file__}", line 179, in exec_sync
+ interpmod_line, = (l for l in stderr.splitlines() if ' exec' in l)
+ # File "{interpreters.__file__}", line 179, in exec
self.assertEqual(stderr, dedent(f"""\
Traceback (most recent call last):
File "{scriptfile}", line 9, in <module>
- interp.exec_sync(script)
- ~~~~~~~~~~~~~~~~^^^^^^^^
+ interp.exec(script)
+ ~~~~~~~~~~~^^^^^^^^
{interpmod_line.strip()}
- raise ExecFailure(excinfo)
- test.support.interpreters.ExecFailure: RuntimeError: uh-oh!
+ raise ExecutionFailed(excinfo)
+ test.support.interpreters.ExecutionFailed: RuntimeError: uh-oh!
Uncaught in the interpreter:
@@ -578,7 +578,7 @@ class TestInterpreterExecSync(TestBase):
script, file = _captured_script('print("it worked!", end="")')
with file:
def f():
- interp.exec_sync(script)
+ interp.exec(script)
t = threading.Thread(target=f)
t.start()
@@ -604,7 +604,7 @@ class TestInterpreterExecSync(TestBase):
with open('{file.name}', 'w', encoding='utf-8') as out:
out.write('{expected}')
""")
- interp.exec_sync(script)
+ interp.exec(script)
file.seek(0)
content = file.read()
@@ -615,17 +615,17 @@ class TestInterpreterExecSync(TestBase):
interp = interpreters.create()
with _running(interp):
with self.assertRaises(RuntimeError):
- interp.exec_sync('print("spam")')
+ interp.exec('print("spam")')
def test_bad_script(self):
interp = interpreters.create()
with self.assertRaises(TypeError):
- interp.exec_sync(10)
+ interp.exec(10)
def test_bytes_for_script(self):
interp = interpreters.create()
with self.assertRaises(TypeError):
- interp.exec_sync(b'print("spam")')
+ interp.exec(b'print("spam")')
def test_with_background_threads_still_running(self):
r_interp, w_interp = self.pipe()
@@ -636,7 +636,7 @@ class TestInterpreterExecSync(TestBase):
FINISHED = b'F'
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import os
import threading
@@ -648,46 +648,229 @@ class TestInterpreterExecSync(TestBase):
t.start()
os.write({w_interp}, {RAN!r})
""")
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
os.write({w_interp}, {RAN!r})
""")
os.write(w_thread, DONE)
- interp.exec_sync('t.join()')
+ interp.exec('t.join()')
self.assertEqual(os.read(r_interp, 1), RAN)
self.assertEqual(os.read(r_interp, 1), RAN)
self.assertEqual(os.read(r_interp, 1), FINISHED)
# test_xxsubinterpreters covers the remaining
- # Interpreter.exec_sync() behavior.
+ # Interpreter.exec() behavior.
-class TestInterpreterRun(TestBase):
-
- def test_success(self):
- interp = interpreters.create()
- script, file = _captured_script('print("it worked!", end="")')
- with file:
- t = interp.run(script)
+def call_func_noop():
+ pass
+
+
+def call_func_return_shareable():
+ return (1, None)
+
+
+def call_func_return_not_shareable():
+ return [1, 2, 3]
+
+
+def call_func_failure():
+ raise Exception('spam!')
+
+
+def call_func_ident(value):
+ return value
+
+
+def get_call_func_closure(value):
+ def call_func_closure():
+ return value
+ return call_func_closure
+
+
+class Spam:
+
+ @staticmethod
+ def noop():
+ pass
+
+ @classmethod
+ def from_values(cls, *values):
+ return cls(values)
+
+ def __init__(self, value):
+ self.value = value
+
+ def __call__(self, *args, **kwargs):
+ return (self.value, args, kwargs)
+
+ def __eq__(self, other):
+ if not isinstance(other, Spam):
+ return NotImplemented
+ return self.value == other.value
+
+ def run(self, *args, **kwargs):
+ return (self.value, args, kwargs)
+
+
+def call_func_complex(op, /, value=None, *args, exc=None, **kwargs):
+ if exc is not None:
+ raise exc
+ if op == '':
+ raise ValueError('missing op')
+ elif op == 'ident':
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ return value
+ elif op == 'full-ident':
+ return (value, args, kwargs)
+ elif op == 'globals':
+ if value is not None or args or kwargs:
+ raise Exception((value, args, kwargs))
+ return __name__
+ elif op == 'interpid':
+ if value is not None or args or kwargs:
+ raise Exception((value, args, kwargs))
+ return interpreters.get_current().id
+ elif op == 'closure':
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ return get_call_func_closure(value)
+ elif op == 'custom':
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ return Spam(value)
+ elif op == 'custom-inner':
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ class Eggs(Spam):
+ pass
+ return Eggs(value)
+ elif not isinstance(op, str):
+ raise TypeError(op)
+ else:
+ raise NotImplementedError(op)
+
+
+class TestInterpreterCall(TestBase):
+
+ # signature
+ # - blank
+ # - args
+ # - kwargs
+ # - args, kwargs
+ # return
+ # - nothing (None)
+ # - simple
+ # - closure
+ # - custom
+ # ops:
+ # - do nothing
+ # - fail
+ # - echo
+ # - do complex, relative to interpreter
+ # scope
+ # - global func
+ # - local closure
+ # - returned closure
+ # - callable type instance
+ # - type
+ # - classmethod
+ # - staticmethod
+ # - instance method
+ # exception
+ # - builtin
+ # - custom
+ # - preserves info (e.g. SyntaxError)
+ # - matching error display
+
+ def test_call(self):
+ interp = interpreters.create()
+
+ for i, (callable, args, kwargs) in enumerate([
+ (call_func_noop, (), {}),
+ (call_func_return_shareable, (), {}),
+ (call_func_return_not_shareable, (), {}),
+ (Spam.noop, (), {}),
+ ]):
+ with self.subTest(f'success case #{i+1}'):
+ res = interp.call(callable)
+ self.assertIs(res, None)
+
+ for i, (callable, args, kwargs) in enumerate([
+ (call_func_ident, ('spamspamspam',), {}),
+ (get_call_func_closure, (42,), {}),
+ (get_call_func_closure(42), (), {}),
+ (Spam.from_values, (), {}),
+ (Spam.from_values, (1, 2, 3), {}),
+ (Spam, ('???'), {}),
+ (Spam(101), (), {}),
+ (Spam(10101).run, (), {}),
+ (call_func_complex, ('ident', 'spam'), {}),
+ (call_func_complex, ('full-ident', 'spam'), {}),
+ (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
+ (call_func_complex, ('globals',), {}),
+ (call_func_complex, ('interpid',), {}),
+ (call_func_complex, ('closure',), {'value': '~~~'}),
+ (call_func_complex, ('custom', 'spam!'), {}),
+ (call_func_complex, ('custom-inner', 'eggs!'), {}),
+ (call_func_complex, ('???',), {'exc': ValueError('spam')}),
+ ]):
+ with self.subTest(f'invalid case #{i+1}'):
+ with self.assertRaises(Exception):
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ interp.call(callable)
+
+ with self.assertRaises(interpreters.ExecutionFailed):
+ interp.call(call_func_failure)
+
+ def test_call_in_thread(self):
+ interp = interpreters.create()
+
+ for i, (callable, args, kwargs) in enumerate([
+ (call_func_noop, (), {}),
+ (call_func_return_shareable, (), {}),
+ (call_func_return_not_shareable, (), {}),
+ (Spam.noop, (), {}),
+ ]):
+ with self.subTest(f'success case #{i+1}'):
+ with self.captured_thread_exception() as ctx:
+ t = interp.call_in_thread(callable)
+ t.join()
+ self.assertIsNone(ctx.caught)
+
+ for i, (callable, args, kwargs) in enumerate([
+ (call_func_ident, ('spamspamspam',), {}),
+ (get_call_func_closure, (42,), {}),
+ (get_call_func_closure(42), (), {}),
+ (Spam.from_values, (), {}),
+ (Spam.from_values, (1, 2, 3), {}),
+ (Spam, ('???'), {}),
+ (Spam(101), (), {}),
+ (Spam(10101).run, (), {}),
+ (call_func_complex, ('ident', 'spam'), {}),
+ (call_func_complex, ('full-ident', 'spam'), {}),
+ (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
+ (call_func_complex, ('globals',), {}),
+ (call_func_complex, ('interpid',), {}),
+ (call_func_complex, ('closure',), {'value': '~~~'}),
+ (call_func_complex, ('custom', 'spam!'), {}),
+ (call_func_complex, ('custom-inner', 'eggs!'), {}),
+ (call_func_complex, ('???',), {'exc': ValueError('spam')}),
+ ]):
+ with self.subTest(f'invalid case #{i+1}'):
+ if args or kwargs:
+ continue
+ with self.captured_thread_exception() as ctx:
+ t = interp.call_in_thread(callable)
+ t.join()
+ self.assertIsNotNone(ctx.caught)
+
+ with self.captured_thread_exception() as ctx:
+ t = interp.call_in_thread(call_func_failure)
t.join()
- out = file.read()
-
- self.assertEqual(out, 'it worked!')
-
- def test_failure(self):
- caught = False
- def excepthook(args):
- nonlocal caught
- caught = True
- threading.excepthook = excepthook
- try:
- interp = interpreters.create()
- t = interp.run('raise Exception')
- t.join()
-
- self.assertTrue(caught)
- except BaseException:
- threading.excepthook = threading.__excepthook__
+ self.assertIsNotNone(ctx.caught)
class TestIsShareable(TestBase):
diff --git a/Lib/test/test_interpreters/test_channels.py b/Lib/test/test_interpreters/test_channels.py
index 3c3e188..07e5038 100644
--- a/Lib/test/test_interpreters/test_channels.py
+++ b/Lib/test/test_interpreters/test_channels.py
@@ -120,7 +120,7 @@ class TestSendRecv(TestBase):
def test_send_recv_same_interpreter(self):
interp = interpreters.create()
- interp.exec_sync(dedent("""
+ interp.exec(dedent("""
from test.support.interpreters import channels
r, s = channels.create()
orig = b'spam'
@@ -193,7 +193,7 @@ class TestSendRecv(TestBase):
def test_send_recv_nowait_same_interpreter(self):
interp = interpreters.create()
- interp.exec_sync(dedent("""
+ interp.exec(dedent("""
from test.support.interpreters import channels
r, s = channels.create()
orig = b'spam'
diff --git a/Lib/test/test_interpreters/test_lifecycle.py b/Lib/test/test_interpreters/test_lifecycle.py
index c2917d8..67b6f43 100644
--- a/Lib/test/test_interpreters/test_lifecycle.py
+++ b/Lib/test/test_interpreters/test_lifecycle.py
@@ -124,7 +124,7 @@ class StartupTests(TestBase):
orig = sys.path[0]
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import json
import sys
print(json.dumps({{
diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py
index 2a8ca99..65b5435 100644
--- a/Lib/test/test_interpreters/test_queues.py
+++ b/Lib/test/test_interpreters/test_queues.py
@@ -51,20 +51,20 @@ class QueueTests(TestBase):
queue1 = queues.create()
interp = interpreters.create()
- interp.exec_sync(dedent(f"""
+ interp.exec(dedent(f"""
from test.support.interpreters import queues
queue1 = queues.Queue({queue1.id})
"""));
with self.subTest('same interpreter'):
queue2 = queues.create()
- queue1.put(queue2)
+ queue1.put(queue2, syncobj=True)
queue3 = queue1.get()
self.assertIs(queue3, queue2)
with self.subTest('from current interpreter'):
queue4 = queues.create()
- queue1.put(queue4)
+ queue1.put(queue4, syncobj=True)
out = _run_output(interp, dedent("""
queue4 = queue1.get()
print(queue4.id)
@@ -75,7 +75,7 @@ class QueueTests(TestBase):
with self.subTest('from subinterpreter'):
out = _run_output(interp, dedent("""
queue5 = queues.create()
- queue1.put(queue5)
+ queue1.put(queue5, syncobj=True)
print(queue5.id)
"""))
qid = int(out)
@@ -118,7 +118,7 @@ class TestQueueOps(TestBase):
def test_empty(self):
queue = queues.create()
before = queue.empty()
- queue.put(None)
+ queue.put(None, syncobj=True)
during = queue.empty()
queue.get()
after = queue.empty()
@@ -133,7 +133,7 @@ class TestQueueOps(TestBase):
queue = queues.create(3)
for _ in range(3):
actual.append(queue.full())
- queue.put(None)
+ queue.put(None, syncobj=True)
actual.append(queue.full())
for _ in range(3):
queue.get()
@@ -147,16 +147,16 @@ class TestQueueOps(TestBase):
queue = queues.create()
for _ in range(3):
actual.append(queue.qsize())
- queue.put(None)
+ queue.put(None, syncobj=True)
actual.append(queue.qsize())
queue.get()
actual.append(queue.qsize())
- queue.put(None)
+ queue.put(None, syncobj=True)
actual.append(queue.qsize())
for _ in range(3):
queue.get()
actual.append(queue.qsize())
- queue.put(None)
+ queue.put(None, syncobj=True)
actual.append(queue.qsize())
queue.get()
actual.append(queue.qsize())
@@ -165,30 +165,81 @@ class TestQueueOps(TestBase):
def test_put_get_main(self):
expected = list(range(20))
- queue = queues.create()
- for i in range(20):
- queue.put(i)
- actual = [queue.get() for _ in range(20)]
+ for syncobj in (True, False):
+ kwds = dict(syncobj=syncobj)
+ with self.subTest(f'syncobj={syncobj}'):
+ queue = queues.create()
+ for i in range(20):
+ queue.put(i, **kwds)
+ actual = [queue.get() for _ in range(20)]
- self.assertEqual(actual, expected)
+ self.assertEqual(actual, expected)
def test_put_timeout(self):
- queue = queues.create(2)
- queue.put(None)
- queue.put(None)
- with self.assertRaises(queues.QueueFull):
- queue.put(None, timeout=0.1)
- queue.get()
- queue.put(None)
+ for syncobj in (True, False):
+ kwds = dict(syncobj=syncobj)
+ with self.subTest(f'syncobj={syncobj}'):
+ queue = queues.create(2)
+ queue.put(None, **kwds)
+ queue.put(None, **kwds)
+ with self.assertRaises(queues.QueueFull):
+ queue.put(None, timeout=0.1, **kwds)
+ queue.get()
+ queue.put(None, **kwds)
def test_put_nowait(self):
- queue = queues.create(2)
- queue.put_nowait(None)
- queue.put_nowait(None)
- with self.assertRaises(queues.QueueFull):
- queue.put_nowait(None)
- queue.get()
- queue.put_nowait(None)
+ for syncobj in (True, False):
+ kwds = dict(syncobj=syncobj)
+ with self.subTest(f'syncobj={syncobj}'):
+ queue = queues.create(2)
+ queue.put_nowait(None, **kwds)
+ queue.put_nowait(None, **kwds)
+ with self.assertRaises(queues.QueueFull):
+ queue.put_nowait(None, **kwds)
+ queue.get()
+ queue.put_nowait(None, **kwds)
+
+ def test_put_syncobj(self):
+ for obj in [
+ None,
+ True,
+ 10,
+ 'spam',
+ b'spam',
+ (0, 'a'),
+ ]:
+ with self.subTest(repr(obj)):
+ queue = queues.create()
+ queue.put(obj, syncobj=True)
+ obj2 = queue.get()
+ self.assertEqual(obj2, obj)
+
+ for obj in [
+ [1, 2, 3],
+ {'a': 13, 'b': 17},
+ ]:
+ with self.subTest(repr(obj)):
+ queue = queues.create()
+ with self.assertRaises(interpreters.NotShareableError):
+ queue.put(obj, syncobj=True)
+
+ def test_put_not_syncobj(self):
+ for obj in [
+ None,
+ True,
+ 10,
+ 'spam',
+ b'spam',
+ (0, 'a'),
+ # not shareable
+ [1, 2, 3],
+ {'a': 13, 'b': 17},
+ ]:
+ with self.subTest(repr(obj)):
+ queue = queues.create()
+ queue.put(obj, syncobj=False)
+ obj2 = queue.get()
+ self.assertEqual(obj2, obj)
def test_get_timeout(self):
queue = queues.create()
@@ -200,13 +251,41 @@ class TestQueueOps(TestBase):
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
+ def test_put_get_default_syncobj(self):
+ expected = list(range(20))
+ queue = queues.create(syncobj=True)
+ for i in range(20):
+ queue.put(i)
+ actual = [queue.get() for _ in range(20)]
+
+ self.assertEqual(actual, expected)
+
+ obj = [1, 2, 3] # lists are not shareable
+ with self.assertRaises(interpreters.NotShareableError):
+ queue.put(obj)
+
+ def test_put_get_default_not_syncobj(self):
+ expected = list(range(20))
+ queue = queues.create(syncobj=False)
+ for i in range(20):
+ queue.put(i)
+ actual = [queue.get() for _ in range(20)]
+
+ self.assertEqual(actual, expected)
+
+ obj = [1, 2, 3] # lists are not shareable
+ queue.put(obj)
+ obj2 = queue.get()
+ self.assertEqual(obj, obj2)
+ self.assertIsNot(obj, obj2)
+
def test_put_get_same_interpreter(self):
interp = interpreters.create()
- interp.exec_sync(dedent("""
+ interp.exec(dedent("""
from test.support.interpreters import queues
queue = queues.create()
orig = b'spam'
- queue.put(orig)
+ queue.put(orig, syncobj=True)
obj = queue.get()
assert obj == orig, 'expected: obj == orig'
assert obj is not orig, 'expected: obj is not orig'
@@ -219,7 +298,7 @@ class TestQueueOps(TestBase):
self.assertEqual(len(queues.list_all()), 2)
obj1 = b'spam'
- queue1.put(obj1)
+ queue1.put(obj1, syncobj=True)
out = _run_output(
interp,
@@ -236,7 +315,7 @@ class TestQueueOps(TestBase):
obj2 = b'eggs'
print(id(obj2))
assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0'
- queue2.put(obj2)
+ queue2.put(obj2, syncobj=True)
assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1'
"""))
self.assertEqual(len(queues.list_all()), 2)
@@ -258,8 +337,8 @@ class TestQueueOps(TestBase):
queue = queues.Queue({queue.id})
obj1 = b'spam'
obj2 = b'eggs'
- queue.put(obj1)
- queue.put(obj2)
+ queue.put(obj1, syncobj=True)
+ queue.put(obj2, syncobj=True)
"""))
self.assertEqual(queue.qsize(), 2)
@@ -281,12 +360,12 @@ class TestQueueOps(TestBase):
break
except queues.QueueEmpty:
continue
- queue2.put(obj)
+ queue2.put(obj, syncobj=True)
t = threading.Thread(target=f)
t.start()
orig = b'spam'
- queue1.put(orig)
+ queue1.put(orig, syncobj=True)
obj = queue2.get()
t.join()
diff --git a/Lib/test/test_interpreters/utils.py b/Lib/test/test_interpreters/utils.py
index 3a37ed0..973d05d 100644
--- a/Lib/test/test_interpreters/utils.py
+++ b/Lib/test/test_interpreters/utils.py
@@ -4,8 +4,9 @@ import os.path
import subprocess
import sys
import tempfile
-import threading
from textwrap import dedent
+import threading
+import types
import unittest
from test import support
@@ -41,7 +42,7 @@ def _run_output(interp, request, init=None):
with rpipe:
if init:
interp.prepare_main(init)
- interp.exec_sync(script)
+ interp.exec(script)
return rpipe.read()
@@ -49,7 +50,7 @@ def _run_output(interp, request, init=None):
def _running(interp):
r, w = os.pipe()
def run():
- interp.exec_sync(dedent(f"""
+ interp.exec(dedent(f"""
# wait for "signal"
with open({r}) as rpipe:
rpipe.read()
@@ -84,6 +85,18 @@ class TestBase(unittest.TestCase):
self.addCleanup(lambda: os_helper.rmtree(tempdir))
return tempdir
+ @contextlib.contextmanager
+ def captured_thread_exception(self):
+ ctx = types.SimpleNamespace(caught=None)
+ def excepthook(args):
+ ctx.caught = args
+ orig_excepthook = threading.excepthook
+ threading.excepthook = excepthook
+ try:
+ yield ctx
+ finally:
+ threading.excepthook = orig_excepthook
+
def make_script(self, filename, dirname=None, text=None):
if text:
text = dedent(text)
diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py
index 71671a5..38dcabd 100644
--- a/Lib/test/test_sys.py
+++ b/Lib/test/test_sys.py
@@ -729,7 +729,7 @@ class SysModuleTest(unittest.TestCase):
self.assertIs(t, s)
interp = interpreters.create()
- interp.exec_sync(textwrap.dedent(f'''
+ interp.exec(textwrap.dedent(f'''
import sys
t = sys.intern({s!r})
assert id(t) != {id(s)}, (id(t), {id(s)})
@@ -744,7 +744,7 @@ class SysModuleTest(unittest.TestCase):
t = sys.intern(s)
interp = interpreters.create()
- interp.exec_sync(textwrap.dedent(f'''
+ interp.exec(textwrap.dedent(f'''
import sys
t = sys.intern({s!r})
assert id(t) == {id(t)}, (id(t), {id(t)})
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 1ab223b..3b5c37c 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -1478,7 +1478,7 @@ class SubinterpThreadingTests(BaseTestCase):
DONE = b'D'
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import os
import threading
import time