diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/test/support/interpreters/__init__.py | 51 | ||||
-rw-r--r-- | Lib/test/support/interpreters/queues.py | 74 | ||||
-rw-r--r-- | Lib/test/test_interpreters/test_api.py | 291 | ||||
-rw-r--r-- | Lib/test/test_interpreters/test_channels.py | 4 | ||||
-rw-r--r-- | Lib/test/test_interpreters/test_lifecycle.py | 2 | ||||
-rw-r--r-- | Lib/test/test_interpreters/test_queues.py | 151 | ||||
-rw-r--r-- | Lib/test/test_interpreters/utils.py | 19 | ||||
-rw-r--r-- | Lib/test/test_sys.py | 4 | ||||
-rw-r--r-- | Lib/test/test_threading.py | 2 |
9 files changed, 477 insertions, 121 deletions
diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py index 15a908e..d02ffba 100644 --- a/Lib/test/support/interpreters/__init__.py +++ b/Lib/test/support/interpreters/__init__.py @@ -6,7 +6,7 @@ import _xxsubinterpreters as _interpreters # aliases: from _xxsubinterpreters import ( - InterpreterError, InterpreterNotFoundError, + InterpreterError, InterpreterNotFoundError, NotShareableError, is_shareable, ) @@ -14,7 +14,8 @@ from _xxsubinterpreters import ( __all__ = [ 'get_current', 'get_main', 'create', 'list_all', 'is_shareable', 'Interpreter', - 'InterpreterError', 'InterpreterNotFoundError', 'ExecFailure', + 'InterpreterError', 'InterpreterNotFoundError', 'ExecutionFailed', + 'NotShareableError', 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull', ] @@ -42,7 +43,11 @@ Uncaught in the interpreter: {formatted} """.strip() -class ExecFailure(RuntimeError): +class ExecutionFailed(RuntimeError): + """An unhandled exception happened during execution. + + This is raised from Interpreter.exec() and Interpreter.call(). + """ def __init__(self, excinfo): msg = excinfo.formatted @@ -157,7 +162,7 @@ class Interpreter: ns = dict(ns, **kwargs) if ns is not None else kwargs _interpreters.set___main___attrs(self._id, ns) - def exec_sync(self, code, /): + def exec(self, code, /): """Run the given source code in the interpreter. This is essentially the same as calling the builtin "exec" @@ -166,10 +171,10 @@ class Interpreter: There is no return value. - If the code raises an unhandled exception then an ExecFailure - is raised, which summarizes the unhandled exception. The actual - exception is discarded because objects cannot be shared between - interpreters. + If the code raises an unhandled exception then an ExecutionFailed + exception is raised, which summarizes the unhandled exception. + The actual exception is discarded because objects cannot be + shared between interpreters. This blocks the current Python thread until done. During that time, the previous interpreter is allowed to run @@ -177,11 +182,35 @@ class Interpreter: """ excinfo = _interpreters.exec(self._id, code) if excinfo is not None: - raise ExecFailure(excinfo) + raise ExecutionFailed(excinfo) + + def call(self, callable, /): + """Call the object in the interpreter with given args/kwargs. + + Only functions that take no arguments and have no closure + are supported. - def run(self, code, /): + The return value is discarded. + + If the callable raises an exception then the error display + (including full traceback) is send back between the interpreters + and an ExecutionFailed exception is raised, much like what + happens with Interpreter.exec(). + """ + # XXX Support args and kwargs. + # XXX Support arbitrary callables. + # XXX Support returning the return value (e.g. via pickle). + excinfo = _interpreters.call(self._id, callable) + if excinfo is not None: + raise ExecutionFailed(excinfo) + + def call_in_thread(self, callable, /): + """Return a new thread that calls the object in the interpreter. + + The return value and any raised exception are discarded. + """ def task(): - self.exec_sync(code) + self.call(callable) t = threading.Thread(target=task) t.start() return t diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index aead0c4..2cc616b 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -1,5 +1,6 @@ """Cross-interpreter Queues High Level Module.""" +import pickle import queue import time import weakref @@ -31,20 +32,26 @@ class QueueFull(_queues.QueueFull, queue.Full): """ -def create(maxsize=0): +_SHARED_ONLY = 0 +_PICKLED = 1 + +def create(maxsize=0, *, syncobj=False): """Return a new cross-interpreter queue. The queue may be used to pass data safely between interpreters. + + "syncobj" sets the default for Queue.put() + and Queue.put_nowait(). """ - qid = _queues.create(maxsize) - return Queue(qid) + fmt = _SHARED_ONLY if syncobj else _PICKLED + qid = _queues.create(maxsize, fmt) + return Queue(qid, _fmt=fmt) def list_all(): """Return a list of all open queues.""" - return [Queue(qid) - for qid in _queues.list_all()] - + return [Queue(qid, _fmt=fmt) + for qid, fmt in _queues.list_all()] _known_queues = weakref.WeakValueDictionary() @@ -52,17 +59,20 @@ _known_queues = weakref.WeakValueDictionary() class Queue: """A cross-interpreter queue.""" - def __new__(cls, id, /): + def __new__(cls, id, /, *, _fmt=None): # There is only one instance for any given ID. if isinstance(id, int): id = int(id) else: raise TypeError(f'id must be an int, got {id!r}') + if _fmt is None: + _fmt = _queues.get_default_fmt(id) try: self = _known_queues[id] except KeyError: self = super().__new__(cls) self._id = id + self._fmt = _fmt _known_queues[id] = self _queues.bind(id) return self @@ -105,20 +115,50 @@ class Queue: return _queues.get_count(self._id) def put(self, obj, timeout=None, *, + syncobj=None, _delay=10 / 1000, # 10 milliseconds ): """Add the object to the queue. This blocks while the queue is full. + + If "syncobj" is None (the default) then it uses the + queue's default, set with create_queue().. + + If "syncobj" is false then all objects are supported, + at the expense of worse performance. + + If "syncobj" is true then the object must be "shareable". + Examples of "shareable" objects include the builtin singletons, + str, and memoryview. One benefit is that such objects are + passed through the queue efficiently. + + The key difference, though, is conceptual: the corresponding + object returned from Queue.get() will be strictly equivalent + to the given obj. In other words, the two objects will be + effectively indistinguishable from each other, even if the + object is mutable. The received object may actually be the + same object, or a copy (immutable values only), or a proxy. + Regardless, the received object should be treated as though + the original has been shared directly, whether or not it + actually is. That's a slightly different and stronger promise + than just (initial) equality, which is all "syncobj=False" + can promise. """ + if syncobj is None: + fmt = self._fmt + else: + fmt = _SHARED_ONLY if syncobj else _PICKLED if timeout is not None: timeout = int(timeout) if timeout < 0: raise ValueError(f'timeout value must be non-negative') end = time.time() + timeout + if fmt is _PICKLED: + obj = pickle.dumps(obj) while True: try: - _queues.put(self._id, obj) + _queues.put(self._id, obj, fmt) except _queues.QueueFull as exc: if timeout is not None and time.time() >= end: exc.__class__ = QueueFull @@ -127,9 +167,15 @@ class Queue: else: break - def put_nowait(self, obj): + def put_nowait(self, obj, *, syncobj=None): + if syncobj is None: + fmt = self._fmt + else: + fmt = _SHARED_ONLY if syncobj else _PICKLED + if fmt is _PICKLED: + obj = pickle.dumps(obj) try: - return _queues.put(self._id, obj) + _queues.put(self._id, obj, fmt) except _queues.QueueFull as exc: exc.__class__ = QueueFull raise # re-raise @@ -148,12 +194,18 @@ class Queue: end = time.time() + timeout while True: try: - return _queues.get(self._id) + obj, fmt = _queues.get(self._id) except _queues.QueueEmpty as exc: if timeout is not None and time.time() >= end: exc.__class__ = QueueEmpty raise # re-raise time.sleep(_delay) + else: + break + if fmt == _PICKLED: + obj = pickle.loads(obj) + else: + assert fmt == _SHARED_ONLY return obj def get_nowait(self): diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py index aefd326..363143f 100644 --- a/Lib/test/test_interpreters/test_api.py +++ b/Lib/test/test_interpreters/test_api.py @@ -280,7 +280,7 @@ class TestInterpreterIsRunning(TestBase): def test_finished(self): r, w = self.pipe() interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os os.write({w}, b'x') """) @@ -312,7 +312,7 @@ class TestInterpreterIsRunning(TestBase): FINISHED = b'F' interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os import threading @@ -326,7 +326,7 @@ class TestInterpreterIsRunning(TestBase): self.assertFalse(interp.is_running()) os.write(w_thread, DONE) - interp.exec_sync('t.join()') + interp.exec('t.join()') self.assertEqual(os.read(r_interp, 1), FINISHED) @@ -393,7 +393,7 @@ class TestInterpreterClose(TestBase): interp2 = interpreters.create() self.assertEqual(set(interpreters.list_all()), {main, interp1, interp2}) - interp1.exec_sync(dedent(f""" + interp1.exec(dedent(f""" from test.support import interpreters interp2 = interpreters.Interpreter({interp2.id}) interp2.close() @@ -427,7 +427,7 @@ class TestInterpreterClose(TestBase): FINISHED = b'F' interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os import threading import time @@ -503,27 +503,27 @@ class TestInterpreterPrepareMain(TestBase): interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'}) # Make sure neither was actually bound. - with self.assertRaises(interpreters.ExecFailure): - interp.exec_sync('print(foo)') - with self.assertRaises(interpreters.ExecFailure): - interp.exec_sync('print(spam)') + with self.assertRaises(interpreters.ExecutionFailed): + interp.exec('print(foo)') + with self.assertRaises(interpreters.ExecutionFailed): + interp.exec('print(spam)') -class TestInterpreterExecSync(TestBase): +class TestInterpreterExec(TestBase): def test_success(self): interp = interpreters.create() script, file = _captured_script('print("it worked!", end="")') with file: - interp.exec_sync(script) + interp.exec(script) out = file.read() self.assertEqual(out, 'it worked!') def test_failure(self): interp = interpreters.create() - with self.assertRaises(interpreters.ExecFailure): - interp.exec_sync('raise Exception') + with self.assertRaises(interpreters.ExecutionFailed): + interp.exec('raise Exception') def test_display_preserved_exception(self): tempdir = self.temp_dir() @@ -542,21 +542,21 @@ class TestInterpreterExecSync(TestBase): spam.eggs() interp = interpreters.create() - interp.exec_sync(script) + interp.exec(script) """) stdout, stderr = self.assert_python_failure(scriptfile) self.maxDiff = None - interpmod_line, = (l for l in stderr.splitlines() if ' exec_sync' in l) - # File "{interpreters.__file__}", line 179, in exec_sync + interpmod_line, = (l for l in stderr.splitlines() if ' exec' in l) + # File "{interpreters.__file__}", line 179, in exec self.assertEqual(stderr, dedent(f"""\ Traceback (most recent call last): File "{scriptfile}", line 9, in <module> - interp.exec_sync(script) - ~~~~~~~~~~~~~~~~^^^^^^^^ + interp.exec(script) + ~~~~~~~~~~~^^^^^^^^ {interpmod_line.strip()} - raise ExecFailure(excinfo) - test.support.interpreters.ExecFailure: RuntimeError: uh-oh! + raise ExecutionFailed(excinfo) + test.support.interpreters.ExecutionFailed: RuntimeError: uh-oh! Uncaught in the interpreter: @@ -578,7 +578,7 @@ class TestInterpreterExecSync(TestBase): script, file = _captured_script('print("it worked!", end="")') with file: def f(): - interp.exec_sync(script) + interp.exec(script) t = threading.Thread(target=f) t.start() @@ -604,7 +604,7 @@ class TestInterpreterExecSync(TestBase): with open('{file.name}', 'w', encoding='utf-8') as out: out.write('{expected}') """) - interp.exec_sync(script) + interp.exec(script) file.seek(0) content = file.read() @@ -615,17 +615,17 @@ class TestInterpreterExecSync(TestBase): interp = interpreters.create() with _running(interp): with self.assertRaises(RuntimeError): - interp.exec_sync('print("spam")') + interp.exec('print("spam")') def test_bad_script(self): interp = interpreters.create() with self.assertRaises(TypeError): - interp.exec_sync(10) + interp.exec(10) def test_bytes_for_script(self): interp = interpreters.create() with self.assertRaises(TypeError): - interp.exec_sync(b'print("spam")') + interp.exec(b'print("spam")') def test_with_background_threads_still_running(self): r_interp, w_interp = self.pipe() @@ -636,7 +636,7 @@ class TestInterpreterExecSync(TestBase): FINISHED = b'F' interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os import threading @@ -648,46 +648,229 @@ class TestInterpreterExecSync(TestBase): t.start() os.write({w_interp}, {RAN!r}) """) - interp.exec_sync(f"""if True: + interp.exec(f"""if True: os.write({w_interp}, {RAN!r}) """) os.write(w_thread, DONE) - interp.exec_sync('t.join()') + interp.exec('t.join()') self.assertEqual(os.read(r_interp, 1), RAN) self.assertEqual(os.read(r_interp, 1), RAN) self.assertEqual(os.read(r_interp, 1), FINISHED) # test_xxsubinterpreters covers the remaining - # Interpreter.exec_sync() behavior. + # Interpreter.exec() behavior. -class TestInterpreterRun(TestBase): - - def test_success(self): - interp = interpreters.create() - script, file = _captured_script('print("it worked!", end="")') - with file: - t = interp.run(script) +def call_func_noop(): + pass + + +def call_func_return_shareable(): + return (1, None) + + +def call_func_return_not_shareable(): + return [1, 2, 3] + + +def call_func_failure(): + raise Exception('spam!') + + +def call_func_ident(value): + return value + + +def get_call_func_closure(value): + def call_func_closure(): + return value + return call_func_closure + + +class Spam: + + @staticmethod + def noop(): + pass + + @classmethod + def from_values(cls, *values): + return cls(values) + + def __init__(self, value): + self.value = value + + def __call__(self, *args, **kwargs): + return (self.value, args, kwargs) + + def __eq__(self, other): + if not isinstance(other, Spam): + return NotImplemented + return self.value == other.value + + def run(self, *args, **kwargs): + return (self.value, args, kwargs) + + +def call_func_complex(op, /, value=None, *args, exc=None, **kwargs): + if exc is not None: + raise exc + if op == '': + raise ValueError('missing op') + elif op == 'ident': + if args or kwargs: + raise Exception((args, kwargs)) + return value + elif op == 'full-ident': + return (value, args, kwargs) + elif op == 'globals': + if value is not None or args or kwargs: + raise Exception((value, args, kwargs)) + return __name__ + elif op == 'interpid': + if value is not None or args or kwargs: + raise Exception((value, args, kwargs)) + return interpreters.get_current().id + elif op == 'closure': + if args or kwargs: + raise Exception((args, kwargs)) + return get_call_func_closure(value) + elif op == 'custom': + if args or kwargs: + raise Exception((args, kwargs)) + return Spam(value) + elif op == 'custom-inner': + if args or kwargs: + raise Exception((args, kwargs)) + class Eggs(Spam): + pass + return Eggs(value) + elif not isinstance(op, str): + raise TypeError(op) + else: + raise NotImplementedError(op) + + +class TestInterpreterCall(TestBase): + + # signature + # - blank + # - args + # - kwargs + # - args, kwargs + # return + # - nothing (None) + # - simple + # - closure + # - custom + # ops: + # - do nothing + # - fail + # - echo + # - do complex, relative to interpreter + # scope + # - global func + # - local closure + # - returned closure + # - callable type instance + # - type + # - classmethod + # - staticmethod + # - instance method + # exception + # - builtin + # - custom + # - preserves info (e.g. SyntaxError) + # - matching error display + + def test_call(self): + interp = interpreters.create() + + for i, (callable, args, kwargs) in enumerate([ + (call_func_noop, (), {}), + (call_func_return_shareable, (), {}), + (call_func_return_not_shareable, (), {}), + (Spam.noop, (), {}), + ]): + with self.subTest(f'success case #{i+1}'): + res = interp.call(callable) + self.assertIs(res, None) + + for i, (callable, args, kwargs) in enumerate([ + (call_func_ident, ('spamspamspam',), {}), + (get_call_func_closure, (42,), {}), + (get_call_func_closure(42), (), {}), + (Spam.from_values, (), {}), + (Spam.from_values, (1, 2, 3), {}), + (Spam, ('???'), {}), + (Spam(101), (), {}), + (Spam(10101).run, (), {}), + (call_func_complex, ('ident', 'spam'), {}), + (call_func_complex, ('full-ident', 'spam'), {}), + (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}), + (call_func_complex, ('globals',), {}), + (call_func_complex, ('interpid',), {}), + (call_func_complex, ('closure',), {'value': '~~~'}), + (call_func_complex, ('custom', 'spam!'), {}), + (call_func_complex, ('custom-inner', 'eggs!'), {}), + (call_func_complex, ('???',), {'exc': ValueError('spam')}), + ]): + with self.subTest(f'invalid case #{i+1}'): + with self.assertRaises(Exception): + if args or kwargs: + raise Exception((args, kwargs)) + interp.call(callable) + + with self.assertRaises(interpreters.ExecutionFailed): + interp.call(call_func_failure) + + def test_call_in_thread(self): + interp = interpreters.create() + + for i, (callable, args, kwargs) in enumerate([ + (call_func_noop, (), {}), + (call_func_return_shareable, (), {}), + (call_func_return_not_shareable, (), {}), + (Spam.noop, (), {}), + ]): + with self.subTest(f'success case #{i+1}'): + with self.captured_thread_exception() as ctx: + t = interp.call_in_thread(callable) + t.join() + self.assertIsNone(ctx.caught) + + for i, (callable, args, kwargs) in enumerate([ + (call_func_ident, ('spamspamspam',), {}), + (get_call_func_closure, (42,), {}), + (get_call_func_closure(42), (), {}), + (Spam.from_values, (), {}), + (Spam.from_values, (1, 2, 3), {}), + (Spam, ('???'), {}), + (Spam(101), (), {}), + (Spam(10101).run, (), {}), + (call_func_complex, ('ident', 'spam'), {}), + (call_func_complex, ('full-ident', 'spam'), {}), + (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}), + (call_func_complex, ('globals',), {}), + (call_func_complex, ('interpid',), {}), + (call_func_complex, ('closure',), {'value': '~~~'}), + (call_func_complex, ('custom', 'spam!'), {}), + (call_func_complex, ('custom-inner', 'eggs!'), {}), + (call_func_complex, ('???',), {'exc': ValueError('spam')}), + ]): + with self.subTest(f'invalid case #{i+1}'): + if args or kwargs: + continue + with self.captured_thread_exception() as ctx: + t = interp.call_in_thread(callable) + t.join() + self.assertIsNotNone(ctx.caught) + + with self.captured_thread_exception() as ctx: + t = interp.call_in_thread(call_func_failure) t.join() - out = file.read() - - self.assertEqual(out, 'it worked!') - - def test_failure(self): - caught = False - def excepthook(args): - nonlocal caught - caught = True - threading.excepthook = excepthook - try: - interp = interpreters.create() - t = interp.run('raise Exception') - t.join() - - self.assertTrue(caught) - except BaseException: - threading.excepthook = threading.__excepthook__ + self.assertIsNotNone(ctx.caught) class TestIsShareable(TestBase): diff --git a/Lib/test/test_interpreters/test_channels.py b/Lib/test/test_interpreters/test_channels.py index 3c3e188..07e5038 100644 --- a/Lib/test/test_interpreters/test_channels.py +++ b/Lib/test/test_interpreters/test_channels.py @@ -120,7 +120,7 @@ class TestSendRecv(TestBase): def test_send_recv_same_interpreter(self): interp = interpreters.create() - interp.exec_sync(dedent(""" + interp.exec(dedent(""" from test.support.interpreters import channels r, s = channels.create() orig = b'spam' @@ -193,7 +193,7 @@ class TestSendRecv(TestBase): def test_send_recv_nowait_same_interpreter(self): interp = interpreters.create() - interp.exec_sync(dedent(""" + interp.exec(dedent(""" from test.support.interpreters import channels r, s = channels.create() orig = b'spam' diff --git a/Lib/test/test_interpreters/test_lifecycle.py b/Lib/test/test_interpreters/test_lifecycle.py index c2917d8..67b6f43 100644 --- a/Lib/test/test_interpreters/test_lifecycle.py +++ b/Lib/test/test_interpreters/test_lifecycle.py @@ -124,7 +124,7 @@ class StartupTests(TestBase): orig = sys.path[0] interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import json import sys print(json.dumps({{ diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py index 2a8ca99..65b5435 100644 --- a/Lib/test/test_interpreters/test_queues.py +++ b/Lib/test/test_interpreters/test_queues.py @@ -51,20 +51,20 @@ class QueueTests(TestBase): queue1 = queues.create() interp = interpreters.create() - interp.exec_sync(dedent(f""" + interp.exec(dedent(f""" from test.support.interpreters import queues queue1 = queues.Queue({queue1.id}) """)); with self.subTest('same interpreter'): queue2 = queues.create() - queue1.put(queue2) + queue1.put(queue2, syncobj=True) queue3 = queue1.get() self.assertIs(queue3, queue2) with self.subTest('from current interpreter'): queue4 = queues.create() - queue1.put(queue4) + queue1.put(queue4, syncobj=True) out = _run_output(interp, dedent(""" queue4 = queue1.get() print(queue4.id) @@ -75,7 +75,7 @@ class QueueTests(TestBase): with self.subTest('from subinterpreter'): out = _run_output(interp, dedent(""" queue5 = queues.create() - queue1.put(queue5) + queue1.put(queue5, syncobj=True) print(queue5.id) """)) qid = int(out) @@ -118,7 +118,7 @@ class TestQueueOps(TestBase): def test_empty(self): queue = queues.create() before = queue.empty() - queue.put(None) + queue.put(None, syncobj=True) during = queue.empty() queue.get() after = queue.empty() @@ -133,7 +133,7 @@ class TestQueueOps(TestBase): queue = queues.create(3) for _ in range(3): actual.append(queue.full()) - queue.put(None) + queue.put(None, syncobj=True) actual.append(queue.full()) for _ in range(3): queue.get() @@ -147,16 +147,16 @@ class TestQueueOps(TestBase): queue = queues.create() for _ in range(3): actual.append(queue.qsize()) - queue.put(None) + queue.put(None, syncobj=True) actual.append(queue.qsize()) queue.get() actual.append(queue.qsize()) - queue.put(None) + queue.put(None, syncobj=True) actual.append(queue.qsize()) for _ in range(3): queue.get() actual.append(queue.qsize()) - queue.put(None) + queue.put(None, syncobj=True) actual.append(queue.qsize()) queue.get() actual.append(queue.qsize()) @@ -165,30 +165,81 @@ class TestQueueOps(TestBase): def test_put_get_main(self): expected = list(range(20)) - queue = queues.create() - for i in range(20): - queue.put(i) - actual = [queue.get() for _ in range(20)] + for syncobj in (True, False): + kwds = dict(syncobj=syncobj) + with self.subTest(f'syncobj={syncobj}'): + queue = queues.create() + for i in range(20): + queue.put(i, **kwds) + actual = [queue.get() for _ in range(20)] - self.assertEqual(actual, expected) + self.assertEqual(actual, expected) def test_put_timeout(self): - queue = queues.create(2) - queue.put(None) - queue.put(None) - with self.assertRaises(queues.QueueFull): - queue.put(None, timeout=0.1) - queue.get() - queue.put(None) + for syncobj in (True, False): + kwds = dict(syncobj=syncobj) + with self.subTest(f'syncobj={syncobj}'): + queue = queues.create(2) + queue.put(None, **kwds) + queue.put(None, **kwds) + with self.assertRaises(queues.QueueFull): + queue.put(None, timeout=0.1, **kwds) + queue.get() + queue.put(None, **kwds) def test_put_nowait(self): - queue = queues.create(2) - queue.put_nowait(None) - queue.put_nowait(None) - with self.assertRaises(queues.QueueFull): - queue.put_nowait(None) - queue.get() - queue.put_nowait(None) + for syncobj in (True, False): + kwds = dict(syncobj=syncobj) + with self.subTest(f'syncobj={syncobj}'): + queue = queues.create(2) + queue.put_nowait(None, **kwds) + queue.put_nowait(None, **kwds) + with self.assertRaises(queues.QueueFull): + queue.put_nowait(None, **kwds) + queue.get() + queue.put_nowait(None, **kwds) + + def test_put_syncobj(self): + for obj in [ + None, + True, + 10, + 'spam', + b'spam', + (0, 'a'), + ]: + with self.subTest(repr(obj)): + queue = queues.create() + queue.put(obj, syncobj=True) + obj2 = queue.get() + self.assertEqual(obj2, obj) + + for obj in [ + [1, 2, 3], + {'a': 13, 'b': 17}, + ]: + with self.subTest(repr(obj)): + queue = queues.create() + with self.assertRaises(interpreters.NotShareableError): + queue.put(obj, syncobj=True) + + def test_put_not_syncobj(self): + for obj in [ + None, + True, + 10, + 'spam', + b'spam', + (0, 'a'), + # not shareable + [1, 2, 3], + {'a': 13, 'b': 17}, + ]: + with self.subTest(repr(obj)): + queue = queues.create() + queue.put(obj, syncobj=False) + obj2 = queue.get() + self.assertEqual(obj2, obj) def test_get_timeout(self): queue = queues.create() @@ -200,13 +251,41 @@ class TestQueueOps(TestBase): with self.assertRaises(queues.QueueEmpty): queue.get_nowait() + def test_put_get_default_syncobj(self): + expected = list(range(20)) + queue = queues.create(syncobj=True) + for i in range(20): + queue.put(i) + actual = [queue.get() for _ in range(20)] + + self.assertEqual(actual, expected) + + obj = [1, 2, 3] # lists are not shareable + with self.assertRaises(interpreters.NotShareableError): + queue.put(obj) + + def test_put_get_default_not_syncobj(self): + expected = list(range(20)) + queue = queues.create(syncobj=False) + for i in range(20): + queue.put(i) + actual = [queue.get() for _ in range(20)] + + self.assertEqual(actual, expected) + + obj = [1, 2, 3] # lists are not shareable + queue.put(obj) + obj2 = queue.get() + self.assertEqual(obj, obj2) + self.assertIsNot(obj, obj2) + def test_put_get_same_interpreter(self): interp = interpreters.create() - interp.exec_sync(dedent(""" + interp.exec(dedent(""" from test.support.interpreters import queues queue = queues.create() orig = b'spam' - queue.put(orig) + queue.put(orig, syncobj=True) obj = queue.get() assert obj == orig, 'expected: obj == orig' assert obj is not orig, 'expected: obj is not orig' @@ -219,7 +298,7 @@ class TestQueueOps(TestBase): self.assertEqual(len(queues.list_all()), 2) obj1 = b'spam' - queue1.put(obj1) + queue1.put(obj1, syncobj=True) out = _run_output( interp, @@ -236,7 +315,7 @@ class TestQueueOps(TestBase): obj2 = b'eggs' print(id(obj2)) assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0' - queue2.put(obj2) + queue2.put(obj2, syncobj=True) assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1' """)) self.assertEqual(len(queues.list_all()), 2) @@ -258,8 +337,8 @@ class TestQueueOps(TestBase): queue = queues.Queue({queue.id}) obj1 = b'spam' obj2 = b'eggs' - queue.put(obj1) - queue.put(obj2) + queue.put(obj1, syncobj=True) + queue.put(obj2, syncobj=True) """)) self.assertEqual(queue.qsize(), 2) @@ -281,12 +360,12 @@ class TestQueueOps(TestBase): break except queues.QueueEmpty: continue - queue2.put(obj) + queue2.put(obj, syncobj=True) t = threading.Thread(target=f) t.start() orig = b'spam' - queue1.put(orig) + queue1.put(orig, syncobj=True) obj = queue2.get() t.join() diff --git a/Lib/test/test_interpreters/utils.py b/Lib/test/test_interpreters/utils.py index 3a37ed0..973d05d 100644 --- a/Lib/test/test_interpreters/utils.py +++ b/Lib/test/test_interpreters/utils.py @@ -4,8 +4,9 @@ import os.path import subprocess import sys import tempfile -import threading from textwrap import dedent +import threading +import types import unittest from test import support @@ -41,7 +42,7 @@ def _run_output(interp, request, init=None): with rpipe: if init: interp.prepare_main(init) - interp.exec_sync(script) + interp.exec(script) return rpipe.read() @@ -49,7 +50,7 @@ def _run_output(interp, request, init=None): def _running(interp): r, w = os.pipe() def run(): - interp.exec_sync(dedent(f""" + interp.exec(dedent(f""" # wait for "signal" with open({r}) as rpipe: rpipe.read() @@ -84,6 +85,18 @@ class TestBase(unittest.TestCase): self.addCleanup(lambda: os_helper.rmtree(tempdir)) return tempdir + @contextlib.contextmanager + def captured_thread_exception(self): + ctx = types.SimpleNamespace(caught=None) + def excepthook(args): + ctx.caught = args + orig_excepthook = threading.excepthook + threading.excepthook = excepthook + try: + yield ctx + finally: + threading.excepthook = orig_excepthook + def make_script(self, filename, dirname=None, text=None): if text: text = dedent(text) diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index 71671a5..38dcabd 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -729,7 +729,7 @@ class SysModuleTest(unittest.TestCase): self.assertIs(t, s) interp = interpreters.create() - interp.exec_sync(textwrap.dedent(f''' + interp.exec(textwrap.dedent(f''' import sys t = sys.intern({s!r}) assert id(t) != {id(s)}, (id(t), {id(s)}) @@ -744,7 +744,7 @@ class SysModuleTest(unittest.TestCase): t = sys.intern(s) interp = interpreters.create() - interp.exec_sync(textwrap.dedent(f''' + interp.exec(textwrap.dedent(f''' import sys t = sys.intern({s!r}) assert id(t) == {id(t)}, (id(t), {id(t)}) diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 1ab223b..3b5c37c 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -1478,7 +1478,7 @@ class SubinterpThreadingTests(BaseTestCase): DONE = b'D' interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os import threading import time |