diff options
author | Eric Snow <ericsnowcurrently@gmail.com> | 2023-12-12 15:24:31 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-12 15:24:31 (GMT) |
commit | 86a77f4e1a5ceaff1036b0072521e12752b5df47 (patch) | |
tree | cecc78dab93112a3a92ae66fc0156630408063b3 /Lib/test/test_interpreters.py | |
parent | f26bfe4b25f7e5a4f68fcac26207b7175abad208 (diff) | |
download | cpython-86a77f4e1a5ceaff1036b0072521e12752b5df47.zip cpython-86a77f4e1a5ceaff1036b0072521e12752b5df47.tar.gz cpython-86a77f4e1a5ceaff1036b0072521e12752b5df47.tar.bz2 |
gh-76785: Fixes for test.support.interpreters (gh-112982)
This involves a number of changes for PEP 734.
Diffstat (limited to 'Lib/test/test_interpreters.py')
-rw-r--r-- | Lib/test/test_interpreters.py | 1136 |
1 files changed, 0 insertions, 1136 deletions
diff --git a/Lib/test/test_interpreters.py b/Lib/test/test_interpreters.py deleted file mode 100644 index 5663706..0000000 --- a/Lib/test/test_interpreters.py +++ /dev/null @@ -1,1136 +0,0 @@ -import contextlib -import json -import os -import os.path -import sys -import threading -from textwrap import dedent -import unittest -import time - -from test import support -from test.support import import_helper -from test.support import threading_helper -from test.support import os_helper -_interpreters = import_helper.import_module('_xxsubinterpreters') -_channels = import_helper.import_module('_xxinterpchannels') -from test.support import interpreters - - -def _captured_script(script): - r, w = os.pipe() - indented = script.replace('\n', '\n ') - wrapped = dedent(f""" - import contextlib - with open({w}, 'w', encoding='utf-8') as spipe: - with contextlib.redirect_stdout(spipe): - {indented} - """) - return wrapped, open(r, encoding='utf-8') - - -def clean_up_interpreters(): - for interp in interpreters.list_all(): - if interp.id == 0: # main - continue - try: - interp.close() - except RuntimeError: - pass # already destroyed - - -def _run_output(interp, request, channels=None): - script, rpipe = _captured_script(request) - with rpipe: - interp.run(script, channels=channels) - return rpipe.read() - - -@contextlib.contextmanager -def _running(interp): - r, w = os.pipe() - def run(): - interp.run(dedent(f""" - # wait for "signal" - with open({r}) as rpipe: - rpipe.read() - """)) - - t = threading.Thread(target=run) - t.start() - - yield - - with open(w, 'w') as spipe: - spipe.write('done') - t.join() - - -class TestBase(unittest.TestCase): - - def pipe(self): - def ensure_closed(fd): - try: - os.close(fd) - except OSError: - pass - r, w = os.pipe() - self.addCleanup(lambda: ensure_closed(r)) - self.addCleanup(lambda: ensure_closed(w)) - return r, w - - def tearDown(self): - clean_up_interpreters() - - -class CreateTests(TestBase): - - def test_in_main(self): - interp = interpreters.create() - self.assertIsInstance(interp, interpreters.Interpreter) - self.assertIn(interp, interpreters.list_all()) - - def test_in_thread(self): - lock = threading.Lock() - interp = None - def f(): - nonlocal interp - interp = interpreters.create() - lock.acquire() - lock.release() - t = threading.Thread(target=f) - with lock: - t.start() - t.join() - self.assertIn(interp, interpreters.list_all()) - - def test_in_subinterpreter(self): - main, = interpreters.list_all() - interp = interpreters.create() - out = _run_output(interp, dedent(""" - from test.support import interpreters - interp = interpreters.create() - print(interp.id) - """)) - interp2 = interpreters.Interpreter(int(out)) - self.assertEqual(interpreters.list_all(), [main, interp, interp2]) - - def test_after_destroy_all(self): - before = set(interpreters.list_all()) - # Create 3 subinterpreters. - interp_lst = [] - for _ in range(3): - interps = interpreters.create() - interp_lst.append(interps) - # Now destroy them. - for interp in interp_lst: - interp.close() - # Finally, create another. - interp = interpreters.create() - self.assertEqual(set(interpreters.list_all()), before | {interp}) - - def test_after_destroy_some(self): - before = set(interpreters.list_all()) - # Create 3 subinterpreters. - interp1 = interpreters.create() - interp2 = interpreters.create() - interp3 = interpreters.create() - # Now destroy 2 of them. - interp1.close() - interp2.close() - # Finally, create another. - interp = interpreters.create() - self.assertEqual(set(interpreters.list_all()), before | {interp3, interp}) - - -class GetCurrentTests(TestBase): - - def test_main(self): - main = interpreters.get_main() - current = interpreters.get_current() - self.assertEqual(current, main) - - def test_subinterpreter(self): - main = _interpreters.get_main() - interp = interpreters.create() - out = _run_output(interp, dedent(""" - from test.support import interpreters - cur = interpreters.get_current() - print(cur.id) - """)) - current = interpreters.Interpreter(int(out)) - self.assertNotEqual(current, main) - - -class ListAllTests(TestBase): - - def test_initial(self): - interps = interpreters.list_all() - self.assertEqual(1, len(interps)) - - def test_after_creating(self): - main = interpreters.get_current() - first = interpreters.create() - second = interpreters.create() - - ids = [] - for interp in interpreters.list_all(): - ids.append(interp.id) - - self.assertEqual(ids, [main.id, first.id, second.id]) - - def test_after_destroying(self): - main = interpreters.get_current() - first = interpreters.create() - second = interpreters.create() - first.close() - - ids = [] - for interp in interpreters.list_all(): - ids.append(interp.id) - - self.assertEqual(ids, [main.id, second.id]) - - -class TestInterpreterAttrs(TestBase): - - def test_id_type(self): - main = interpreters.get_main() - current = interpreters.get_current() - interp = interpreters.create() - self.assertIsInstance(main.id, _interpreters.InterpreterID) - self.assertIsInstance(current.id, _interpreters.InterpreterID) - self.assertIsInstance(interp.id, _interpreters.InterpreterID) - - def test_main_id(self): - main = interpreters.get_main() - self.assertEqual(main.id, 0) - - def test_custom_id(self): - interp = interpreters.Interpreter(1) - self.assertEqual(interp.id, 1) - - with self.assertRaises(TypeError): - interpreters.Interpreter('1') - - def test_id_readonly(self): - interp = interpreters.Interpreter(1) - with self.assertRaises(AttributeError): - interp.id = 2 - - @unittest.skip('not ready yet (see bpo-32604)') - def test_main_isolated(self): - main = interpreters.get_main() - self.assertFalse(main.isolated) - - @unittest.skip('not ready yet (see bpo-32604)') - def test_subinterpreter_isolated_default(self): - interp = interpreters.create() - self.assertFalse(interp.isolated) - - def test_subinterpreter_isolated_explicit(self): - interp1 = interpreters.create(isolated=True) - interp2 = interpreters.create(isolated=False) - self.assertTrue(interp1.isolated) - self.assertFalse(interp2.isolated) - - @unittest.skip('not ready yet (see bpo-32604)') - def test_custom_isolated_default(self): - interp = interpreters.Interpreter(1) - self.assertFalse(interp.isolated) - - def test_custom_isolated_explicit(self): - interp1 = interpreters.Interpreter(1, isolated=True) - interp2 = interpreters.Interpreter(1, isolated=False) - self.assertTrue(interp1.isolated) - self.assertFalse(interp2.isolated) - - def test_isolated_readonly(self): - interp = interpreters.Interpreter(1) - with self.assertRaises(AttributeError): - interp.isolated = True - - def test_equality(self): - interp1 = interpreters.create() - interp2 = interpreters.create() - self.assertEqual(interp1, interp1) - self.assertNotEqual(interp1, interp2) - - -class TestInterpreterIsRunning(TestBase): - - def test_main(self): - main = interpreters.get_main() - self.assertTrue(main.is_running()) - - @unittest.skip('Fails on FreeBSD') - def test_subinterpreter(self): - interp = interpreters.create() - self.assertFalse(interp.is_running()) - - with _running(interp): - self.assertTrue(interp.is_running()) - self.assertFalse(interp.is_running()) - - def test_finished(self): - r, w = self.pipe() - interp = interpreters.create() - interp.run(f"""if True: - import os - os.write({w}, b'x') - """) - self.assertFalse(interp.is_running()) - self.assertEqual(os.read(r, 1), b'x') - - def test_from_subinterpreter(self): - interp = interpreters.create() - out = _run_output(interp, dedent(f""" - import _xxsubinterpreters as _interpreters - if _interpreters.is_running({interp.id}): - print(True) - else: - print(False) - """)) - self.assertEqual(out.strip(), 'True') - - def test_already_destroyed(self): - interp = interpreters.create() - interp.close() - with self.assertRaises(RuntimeError): - interp.is_running() - - def test_does_not_exist(self): - interp = interpreters.Interpreter(1_000_000) - with self.assertRaises(RuntimeError): - interp.is_running() - - def test_bad_id(self): - interp = interpreters.Interpreter(-1) - with self.assertRaises(ValueError): - interp.is_running() - - def test_with_only_background_threads(self): - r_interp, w_interp = self.pipe() - r_thread, w_thread = self.pipe() - - DONE = b'D' - FINISHED = b'F' - - interp = interpreters.create() - interp.run(f"""if True: - import os - import threading - - def task(): - v = os.read({r_thread}, 1) - assert v == {DONE!r} - os.write({w_interp}, {FINISHED!r}) - t = threading.Thread(target=task) - t.start() - """) - self.assertFalse(interp.is_running()) - - os.write(w_thread, DONE) - interp.run('t.join()') - self.assertEqual(os.read(r_interp, 1), FINISHED) - - -class TestInterpreterClose(TestBase): - - def test_basic(self): - main = interpreters.get_main() - interp1 = interpreters.create() - interp2 = interpreters.create() - interp3 = interpreters.create() - self.assertEqual(set(interpreters.list_all()), - {main, interp1, interp2, interp3}) - interp2.close() - self.assertEqual(set(interpreters.list_all()), - {main, interp1, interp3}) - - def test_all(self): - before = set(interpreters.list_all()) - interps = set() - for _ in range(3): - interp = interpreters.create() - interps.add(interp) - self.assertEqual(set(interpreters.list_all()), before | interps) - for interp in interps: - interp.close() - self.assertEqual(set(interpreters.list_all()), before) - - def test_main(self): - main, = interpreters.list_all() - with self.assertRaises(RuntimeError): - main.close() - - def f(): - with self.assertRaises(RuntimeError): - main.close() - - t = threading.Thread(target=f) - t.start() - t.join() - - def test_already_destroyed(self): - interp = interpreters.create() - interp.close() - with self.assertRaises(RuntimeError): - interp.close() - - def test_does_not_exist(self): - interp = interpreters.Interpreter(1_000_000) - with self.assertRaises(RuntimeError): - interp.close() - - def test_bad_id(self): - interp = interpreters.Interpreter(-1) - with self.assertRaises(ValueError): - interp.close() - - def test_from_current(self): - main, = interpreters.list_all() - interp = interpreters.create() - out = _run_output(interp, dedent(f""" - from test.support import interpreters - interp = interpreters.Interpreter({int(interp.id)}) - try: - interp.close() - except RuntimeError: - print('failed') - """)) - self.assertEqual(out.strip(), 'failed') - self.assertEqual(set(interpreters.list_all()), {main, interp}) - - def test_from_sibling(self): - main, = interpreters.list_all() - interp1 = interpreters.create() - interp2 = interpreters.create() - self.assertEqual(set(interpreters.list_all()), - {main, interp1, interp2}) - interp1.run(dedent(f""" - from test.support import interpreters - interp2 = interpreters.Interpreter(int({interp2.id})) - interp2.close() - interp3 = interpreters.create() - interp3.close() - """)) - self.assertEqual(set(interpreters.list_all()), {main, interp1}) - - def test_from_other_thread(self): - interp = interpreters.create() - def f(): - interp.close() - - t = threading.Thread(target=f) - t.start() - t.join() - - @unittest.skip('Fails on FreeBSD') - def test_still_running(self): - main, = interpreters.list_all() - interp = interpreters.create() - with _running(interp): - with self.assertRaises(RuntimeError): - interp.close() - self.assertTrue(interp.is_running()) - - def test_subthreads_still_running(self): - r_interp, w_interp = self.pipe() - r_thread, w_thread = self.pipe() - - FINISHED = b'F' - - interp = interpreters.create() - interp.run(f"""if True: - import os - import threading - import time - - done = False - - def notify_fini(): - global done - done = True - t.join() - threading._register_atexit(notify_fini) - - def task(): - while not done: - time.sleep(0.1) - os.write({w_interp}, {FINISHED!r}) - t = threading.Thread(target=task) - t.start() - """) - interp.close() - - self.assertEqual(os.read(r_interp, 1), FINISHED) - - -class TestInterpreterRun(TestBase): - - def test_success(self): - interp = interpreters.create() - script, file = _captured_script('print("it worked!", end="")') - with file: - interp.run(script) - out = file.read() - - self.assertEqual(out, 'it worked!') - - def test_failure(self): - interp = interpreters.create() - with self.assertRaises(interpreters.RunFailedError): - interp.run('raise Exception') - - def test_in_thread(self): - interp = interpreters.create() - script, file = _captured_script('print("it worked!", end="")') - with file: - def f(): - interp.run(script) - - t = threading.Thread(target=f) - t.start() - t.join() - out = file.read() - - self.assertEqual(out, 'it worked!') - - @support.requires_fork() - def test_fork(self): - interp = interpreters.create() - import tempfile - with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file: - file.write('') - file.flush() - - expected = 'spam spam spam spam spam' - script = dedent(f""" - import os - try: - os.fork() - except RuntimeError: - with open('{file.name}', 'w', encoding='utf-8') as out: - out.write('{expected}') - """) - interp.run(script) - - file.seek(0) - content = file.read() - self.assertEqual(content, expected) - - @unittest.skip('Fails on FreeBSD') - def test_already_running(self): - interp = interpreters.create() - with _running(interp): - with self.assertRaises(RuntimeError): - interp.run('print("spam")') - - def test_does_not_exist(self): - interp = interpreters.Interpreter(1_000_000) - with self.assertRaises(RuntimeError): - interp.run('print("spam")') - - def test_bad_id(self): - interp = interpreters.Interpreter(-1) - with self.assertRaises(ValueError): - interp.run('print("spam")') - - def test_bad_script(self): - interp = interpreters.create() - with self.assertRaises(TypeError): - interp.run(10) - - def test_bytes_for_script(self): - interp = interpreters.create() - with self.assertRaises(TypeError): - interp.run(b'print("spam")') - - def test_with_background_threads_still_running(self): - r_interp, w_interp = self.pipe() - r_thread, w_thread = self.pipe() - - RAN = b'R' - DONE = b'D' - FINISHED = b'F' - - interp = interpreters.create() - interp.run(f"""if True: - import os - import threading - - def task(): - v = os.read({r_thread}, 1) - assert v == {DONE!r} - os.write({w_interp}, {FINISHED!r}) - t = threading.Thread(target=task) - t.start() - os.write({w_interp}, {RAN!r}) - """) - interp.run(f"""if True: - os.write({w_interp}, {RAN!r}) - """) - - os.write(w_thread, DONE) - interp.run('t.join()') - self.assertEqual(os.read(r_interp, 1), RAN) - self.assertEqual(os.read(r_interp, 1), RAN) - self.assertEqual(os.read(r_interp, 1), FINISHED) - - # test_xxsubinterpreters covers the remaining Interpreter.run() behavior. - - -class StressTests(TestBase): - - # In these tests we generally want a lot of interpreters, - # but not so many that any test takes too long. - - @support.requires_resource('cpu') - def test_create_many_sequential(self): - alive = [] - for _ in range(100): - interp = interpreters.create() - alive.append(interp) - - @support.requires_resource('cpu') - def test_create_many_threaded(self): - alive = [] - def task(): - interp = interpreters.create() - alive.append(interp) - threads = (threading.Thread(target=task) for _ in range(200)) - with threading_helper.start_threads(threads): - pass - - -class StartupTests(TestBase): - - # We want to ensure the initial state of subinterpreters - # matches expectations. - - _subtest_count = 0 - - @contextlib.contextmanager - def subTest(self, *args): - with super().subTest(*args) as ctx: - self._subtest_count += 1 - try: - yield ctx - finally: - if self._debugged_in_subtest: - if self._subtest_count == 1: - # The first subtest adds a leading newline, so we - # compensate here by not printing a trailing newline. - print('### end subtest debug ###', end='') - else: - print('### end subtest debug ###') - self._debugged_in_subtest = False - - def debug(self, msg, *, header=None): - if header: - self._debug(f'--- {header} ---') - if msg: - if msg.endswith(os.linesep): - self._debug(msg[:-len(os.linesep)]) - else: - self._debug(msg) - self._debug('<no newline>') - self._debug('------') - else: - self._debug(msg) - - _debugged = False - _debugged_in_subtest = False - def _debug(self, msg): - if not self._debugged: - print() - self._debugged = True - if self._subtest is not None: - if True: - if not self._debugged_in_subtest: - self._debugged_in_subtest = True - print('### start subtest debug ###') - print(msg) - else: - print(msg) - - def create_temp_dir(self): - import tempfile - tmp = tempfile.mkdtemp(prefix='test_interpreters_') - tmp = os.path.realpath(tmp) - self.addCleanup(os_helper.rmtree, tmp) - return tmp - - def write_script(self, *path, text): - filename = os.path.join(*path) - dirname = os.path.dirname(filename) - if dirname: - os.makedirs(dirname, exist_ok=True) - with open(filename, 'w', encoding='utf-8') as outfile: - outfile.write(dedent(text)) - return filename - - @support.requires_subprocess() - def run_python(self, argv, *, cwd=None): - # This method is inspired by - # EmbeddingTestsMixin.run_embedded_interpreter() in test_embed.py. - import shlex - import subprocess - if isinstance(argv, str): - argv = shlex.split(argv) - argv = [sys.executable, *argv] - try: - proc = subprocess.run( - argv, - cwd=cwd, - capture_output=True, - text=True, - ) - except Exception as exc: - self.debug(f'# cmd: {shlex.join(argv)}') - if isinstance(exc, FileNotFoundError) and not exc.filename: - if os.path.exists(argv[0]): - exists = 'exists' - else: - exists = 'does not exist' - self.debug(f'{argv[0]} {exists}') - raise # re-raise - assert proc.stderr == '' or proc.returncode != 0, proc.stderr - if proc.returncode != 0 and support.verbose: - self.debug(f'# python3 {shlex.join(argv[1:])} failed:') - self.debug(proc.stdout, header='stdout') - self.debug(proc.stderr, header='stderr') - self.assertEqual(proc.returncode, 0) - self.assertEqual(proc.stderr, '') - return proc.stdout - - def test_sys_path_0(self): - # The main interpreter's sys.path[0] should be used by subinterpreters. - script = ''' - import sys - from test.support import interpreters - - orig = sys.path[0] - - interp = interpreters.create() - interp.run(f"""if True: - import json - import sys - print(json.dumps({{ - 'main': {orig!r}, - 'sub': sys.path[0], - }}, indent=4), flush=True) - """) - ''' - # <tmp>/ - # pkg/ - # __init__.py - # __main__.py - # script.py - # script.py - cwd = self.create_temp_dir() - self.write_script(cwd, 'pkg', '__init__.py', text='') - self.write_script(cwd, 'pkg', '__main__.py', text=script) - self.write_script(cwd, 'pkg', 'script.py', text=script) - self.write_script(cwd, 'script.py', text=script) - - cases = [ - ('script.py', cwd), - ('-m script', cwd), - ('-m pkg', cwd), - ('-m pkg.script', cwd), - ('-c "import script"', ''), - ] - for argv, expected in cases: - with self.subTest(f'python3 {argv}'): - out = self.run_python(argv, cwd=cwd) - data = json.loads(out) - sp0_main, sp0_sub = data['main'], data['sub'] - self.assertEqual(sp0_sub, sp0_main) - self.assertEqual(sp0_sub, expected) - # XXX Also check them all with the -P cmdline flag? - - -class FinalizationTests(TestBase): - - def test_gh_109793(self): - import subprocess - argv = [sys.executable, '-c', '''if True: - import _xxsubinterpreters as _interpreters - interpid = _interpreters.create() - raise Exception - '''] - proc = subprocess.run(argv, capture_output=True, text=True) - self.assertIn('Traceback', proc.stderr) - if proc.returncode == 0 and support.verbose: - print() - print("--- cmd unexpected succeeded ---") - print(f"stdout:\n{proc.stdout}") - print(f"stderr:\n{proc.stderr}") - print("------") - self.assertEqual(proc.returncode, 1) - - -class TestIsShareable(TestBase): - - def test_default_shareables(self): - shareables = [ - # singletons - None, - # builtin objects - b'spam', - 'spam', - 10, - -10, - True, - False, - 100.0, - (), - (1, ('spam', 'eggs'), True), - ] - for obj in shareables: - with self.subTest(obj): - shareable = interpreters.is_shareable(obj) - self.assertTrue(shareable) - - def test_not_shareable(self): - class Cheese: - def __init__(self, name): - self.name = name - def __str__(self): - return self.name - - class SubBytes(bytes): - """A subclass of a shareable type.""" - - not_shareables = [ - # singletons - NotImplemented, - ..., - # builtin types and objects - type, - object, - object(), - Exception(), - # user-defined types and objects - Cheese, - Cheese('Wensleydale'), - SubBytes(b'spam'), - ] - for obj in not_shareables: - with self.subTest(repr(obj)): - self.assertFalse( - interpreters.is_shareable(obj)) - - -class TestChannels(TestBase): - - def test_create(self): - r, s = interpreters.create_channel() - self.assertIsInstance(r, interpreters.RecvChannel) - self.assertIsInstance(s, interpreters.SendChannel) - - def test_list_all(self): - self.assertEqual(interpreters.list_all_channels(), []) - created = set() - for _ in range(3): - ch = interpreters.create_channel() - created.add(ch) - after = set(interpreters.list_all_channels()) - self.assertEqual(after, created) - - def test_shareable(self): - rch, sch = interpreters.create_channel() - - self.assertTrue( - interpreters.is_shareable(rch)) - self.assertTrue( - interpreters.is_shareable(sch)) - - sch.send_nowait(rch) - sch.send_nowait(sch) - rch2 = rch.recv() - sch2 = rch.recv() - - self.assertEqual(rch2, rch) - self.assertEqual(sch2, sch) - - def test_is_closed(self): - rch, sch = interpreters.create_channel() - rbefore = rch.is_closed - sbefore = sch.is_closed - rch.close() - rafter = rch.is_closed - safter = sch.is_closed - - self.assertFalse(rbefore) - self.assertFalse(sbefore) - self.assertTrue(rafter) - self.assertTrue(safter) - - -class TestRecvChannelAttrs(TestBase): - - def test_id_type(self): - rch, _ = interpreters.create_channel() - self.assertIsInstance(rch.id, _channels.ChannelID) - - def test_custom_id(self): - rch = interpreters.RecvChannel(1) - self.assertEqual(rch.id, 1) - - with self.assertRaises(TypeError): - interpreters.RecvChannel('1') - - def test_id_readonly(self): - rch = interpreters.RecvChannel(1) - with self.assertRaises(AttributeError): - rch.id = 2 - - def test_equality(self): - ch1, _ = interpreters.create_channel() - ch2, _ = interpreters.create_channel() - self.assertEqual(ch1, ch1) - self.assertNotEqual(ch1, ch2) - - -class TestSendChannelAttrs(TestBase): - - def test_id_type(self): - _, sch = interpreters.create_channel() - self.assertIsInstance(sch.id, _channels.ChannelID) - - def test_custom_id(self): - sch = interpreters.SendChannel(1) - self.assertEqual(sch.id, 1) - - with self.assertRaises(TypeError): - interpreters.SendChannel('1') - - def test_id_readonly(self): - sch = interpreters.SendChannel(1) - with self.assertRaises(AttributeError): - sch.id = 2 - - def test_equality(self): - _, ch1 = interpreters.create_channel() - _, ch2 = interpreters.create_channel() - self.assertEqual(ch1, ch1) - self.assertNotEqual(ch1, ch2) - - -class TestSendRecv(TestBase): - - def test_send_recv_main(self): - r, s = interpreters.create_channel() - orig = b'spam' - s.send_nowait(orig) - obj = r.recv() - - self.assertEqual(obj, orig) - self.assertIsNot(obj, orig) - - def test_send_recv_same_interpreter(self): - interp = interpreters.create() - interp.run(dedent(""" - from test.support import interpreters - r, s = interpreters.create_channel() - orig = b'spam' - s.send_nowait(orig) - obj = r.recv() - assert obj == orig, 'expected: obj == orig' - assert obj is not orig, 'expected: obj is not orig' - """)) - - @unittest.skip('broken (see BPO-...)') - def test_send_recv_different_interpreters(self): - r1, s1 = interpreters.create_channel() - r2, s2 = interpreters.create_channel() - orig1 = b'spam' - s1.send_nowait(orig1) - out = _run_output( - interpreters.create(), - dedent(f""" - obj1 = r.recv() - assert obj1 == b'spam', 'expected: obj1 == orig1' - # When going to another interpreter we get a copy. - assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1' - orig2 = b'eggs' - print(id(orig2)) - s.send_nowait(orig2) - """), - channels=dict(r=r1, s=s2), - ) - obj2 = r2.recv() - - self.assertEqual(obj2, b'eggs') - self.assertNotEqual(id(obj2), int(out)) - - def test_send_recv_different_threads(self): - r, s = interpreters.create_channel() - - def f(): - while True: - try: - obj = r.recv() - break - except interpreters.ChannelEmptyError: - time.sleep(0.1) - s.send(obj) - t = threading.Thread(target=f) - t.start() - - orig = b'spam' - s.send(orig) - obj = r.recv() - t.join() - - self.assertEqual(obj, orig) - self.assertIsNot(obj, orig) - - def test_send_recv_nowait_main(self): - r, s = interpreters.create_channel() - orig = b'spam' - s.send_nowait(orig) - obj = r.recv_nowait() - - self.assertEqual(obj, orig) - self.assertIsNot(obj, orig) - - def test_send_recv_nowait_main_with_default(self): - r, _ = interpreters.create_channel() - obj = r.recv_nowait(None) - - self.assertIsNone(obj) - - def test_send_recv_nowait_same_interpreter(self): - interp = interpreters.create() - interp.run(dedent(""" - from test.support import interpreters - r, s = interpreters.create_channel() - orig = b'spam' - s.send_nowait(orig) - obj = r.recv_nowait() - assert obj == orig, 'expected: obj == orig' - # When going back to the same interpreter we get the same object. - assert obj is not orig, 'expected: obj is not orig' - """)) - - @unittest.skip('broken (see BPO-...)') - def test_send_recv_nowait_different_interpreters(self): - r1, s1 = interpreters.create_channel() - r2, s2 = interpreters.create_channel() - orig1 = b'spam' - s1.send_nowait(orig1) - out = _run_output( - interpreters.create(), - dedent(f""" - obj1 = r.recv_nowait() - assert obj1 == b'spam', 'expected: obj1 == orig1' - # When going to another interpreter we get a copy. - assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1' - orig2 = b'eggs' - print(id(orig2)) - s.send_nowait(orig2) - """), - channels=dict(r=r1, s=s2), - ) - obj2 = r2.recv_nowait() - - self.assertEqual(obj2, b'eggs') - self.assertNotEqual(id(obj2), int(out)) - - def test_recv_timeout(self): - r, _ = interpreters.create_channel() - with self.assertRaises(TimeoutError): - r.recv(timeout=1) - - def test_recv_channel_does_not_exist(self): - ch = interpreters.RecvChannel(1_000_000) - with self.assertRaises(interpreters.ChannelNotFoundError): - ch.recv() - - def test_send_channel_does_not_exist(self): - ch = interpreters.SendChannel(1_000_000) - with self.assertRaises(interpreters.ChannelNotFoundError): - ch.send(b'spam') - - def test_recv_nowait_channel_does_not_exist(self): - ch = interpreters.RecvChannel(1_000_000) - with self.assertRaises(interpreters.ChannelNotFoundError): - ch.recv_nowait() - - def test_send_nowait_channel_does_not_exist(self): - ch = interpreters.SendChannel(1_000_000) - with self.assertRaises(interpreters.ChannelNotFoundError): - ch.send_nowait(b'spam') - - def test_recv_nowait_empty(self): - ch, _ = interpreters.create_channel() - with self.assertRaises(interpreters.ChannelEmptyError): - ch.recv_nowait() - - def test_recv_nowait_default(self): - default = object() - rch, sch = interpreters.create_channel() - obj1 = rch.recv_nowait(default) - sch.send_nowait(None) - sch.send_nowait(1) - sch.send_nowait(b'spam') - sch.send_nowait(b'eggs') - obj2 = rch.recv_nowait(default) - obj3 = rch.recv_nowait(default) - obj4 = rch.recv_nowait() - obj5 = rch.recv_nowait(default) - obj6 = rch.recv_nowait(default) - - self.assertIs(obj1, default) - self.assertIs(obj2, None) - self.assertEqual(obj3, 1) - self.assertEqual(obj4, b'spam') - self.assertEqual(obj5, b'eggs') - self.assertIs(obj6, default) - - def test_send_buffer(self): - buf = bytearray(b'spamspamspam') - obj = None - rch, sch = interpreters.create_channel() - - def f(): - nonlocal obj - while True: - try: - obj = rch.recv() - break - except interpreters.ChannelEmptyError: - time.sleep(0.1) - t = threading.Thread(target=f) - t.start() - - sch.send_buffer(buf) - t.join() - - self.assertIsNot(obj, buf) - self.assertIsInstance(obj, memoryview) - self.assertEqual(obj, buf) - - buf[4:8] = b'eggs' - self.assertEqual(obj, buf) - obj[4:8] = b'ham.' - self.assertEqual(obj, buf) - - def test_send_buffer_nowait(self): - buf = bytearray(b'spamspamspam') - rch, sch = interpreters.create_channel() - sch.send_buffer_nowait(buf) - obj = rch.recv() - - self.assertIsNot(obj, buf) - self.assertIsInstance(obj, memoryview) - self.assertEqual(obj, buf) - - buf[4:8] = b'eggs' - self.assertEqual(obj, buf) - obj[4:8] = b'ham.' - self.assertEqual(obj, buf) |