diff options
author | Eric Snow <ericsnowcurrently@gmail.com> | 2023-12-12 15:24:31 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-12 15:24:31 (GMT) |
commit | 86a77f4e1a5ceaff1036b0072521e12752b5df47 (patch) | |
tree | cecc78dab93112a3a92ae66fc0156630408063b3 /Lib/test/test_interpreters | |
parent | f26bfe4b25f7e5a4f68fcac26207b7175abad208 (diff) | |
download | cpython-86a77f4e1a5ceaff1036b0072521e12752b5df47.zip cpython-86a77f4e1a5ceaff1036b0072521e12752b5df47.tar.gz cpython-86a77f4e1a5ceaff1036b0072521e12752b5df47.tar.bz2 |
gh-76785: Fixes for test.support.interpreters (gh-112982)
This involves a number of changes for PEP 734.
Diffstat (limited to 'Lib/test/test_interpreters')
-rw-r--r-- | Lib/test/test_interpreters/__init__.py | 5 | ||||
-rw-r--r-- | Lib/test/test_interpreters/__main__.py | 4 | ||||
-rw-r--r-- | Lib/test/test_interpreters/test_api.py | 642 | ||||
-rw-r--r-- | Lib/test/test_interpreters/test_channels.py | 328 | ||||
-rw-r--r-- | Lib/test/test_interpreters/test_lifecycle.py | 189 | ||||
-rw-r--r-- | Lib/test/test_interpreters/test_queues.py | 233 | ||||
-rw-r--r-- | Lib/test/test_interpreters/test_stress.py | 38 | ||||
-rw-r--r-- | Lib/test/test_interpreters/utils.py | 73 |
8 files changed, 1512 insertions, 0 deletions
diff --git a/Lib/test/test_interpreters/__init__.py b/Lib/test/test_interpreters/__init__.py new file mode 100644 index 0000000..4b16ecc --- /dev/null +++ b/Lib/test/test_interpreters/__init__.py @@ -0,0 +1,5 @@ +import os +from test.support import load_package_tests + +def load_tests(*args): + return load_package_tests(os.path.dirname(__file__), *args) diff --git a/Lib/test/test_interpreters/__main__.py b/Lib/test/test_interpreters/__main__.py new file mode 100644 index 0000000..8641229 --- /dev/null +++ b/Lib/test/test_interpreters/__main__.py @@ -0,0 +1,4 @@ +from . import load_tests +import unittest + +nittest.main() diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py new file mode 100644 index 0000000..e4ae9d0 --- /dev/null +++ b/Lib/test/test_interpreters/test_api.py @@ -0,0 +1,642 @@ +import os +import threading +from textwrap import dedent +import unittest + +from test import support +from test.support import import_helper +# Raise SkipTest if subinterpreters not supported. +import_helper.import_module('_xxsubinterpreters') +from test.support import interpreters +from test.support.interpreters import InterpreterNotFoundError +from .utils import _captured_script, _run_output, _running, TestBase + + +class ModuleTests(TestBase): + + def test_queue_aliases(self): + first = [ + interpreters.create_queue, + interpreters.Queue, + interpreters.QueueEmpty, + interpreters.QueueFull, + ] + second = [ + interpreters.create_queue, + interpreters.Queue, + interpreters.QueueEmpty, + interpreters.QueueFull, + ] + self.assertEqual(second, first) + + +class CreateTests(TestBase): + + def test_in_main(self): + interp = interpreters.create() + self.assertIsInstance(interp, interpreters.Interpreter) + self.assertIn(interp, interpreters.list_all()) + + def test_in_thread(self): + lock = threading.Lock() + interp = None + def f(): + nonlocal interp + interp = interpreters.create() + lock.acquire() + lock.release() + t = threading.Thread(target=f) + with lock: + t.start() + t.join() + self.assertIn(interp, interpreters.list_all()) + + def test_in_subinterpreter(self): + main, = interpreters.list_all() + interp = interpreters.create() + out = _run_output(interp, dedent(""" + from test.support import interpreters + interp = interpreters.create() + print(interp.id) + """)) + interp2 = interpreters.Interpreter(int(out)) + self.assertEqual(interpreters.list_all(), [main, interp, interp2]) + + def test_after_destroy_all(self): + before = set(interpreters.list_all()) + # Create 3 subinterpreters. + interp_lst = [] + for _ in range(3): + interps = interpreters.create() + interp_lst.append(interps) + # Now destroy them. + for interp in interp_lst: + interp.close() + # Finally, create another. + interp = interpreters.create() + self.assertEqual(set(interpreters.list_all()), before | {interp}) + + def test_after_destroy_some(self): + before = set(interpreters.list_all()) + # Create 3 subinterpreters. + interp1 = interpreters.create() + interp2 = interpreters.create() + interp3 = interpreters.create() + # Now destroy 2 of them. + interp1.close() + interp2.close() + # Finally, create another. + interp = interpreters.create() + self.assertEqual(set(interpreters.list_all()), before | {interp3, interp}) + + +class GetMainTests(TestBase): + + def test_id(self): + main = interpreters.get_main() + self.assertEqual(main.id, 0) + + def test_current(self): + main = interpreters.get_main() + current = interpreters.get_current() + self.assertIs(main, current) + + def test_idempotent(self): + main1 = interpreters.get_main() + main2 = interpreters.get_main() + self.assertIs(main1, main2) + + +class GetCurrentTests(TestBase): + + def test_main(self): + main = interpreters.get_main() + current = interpreters.get_current() + self.assertEqual(current, main) + + def test_subinterpreter(self): + main = interpreters.get_main() + interp = interpreters.create() + out = _run_output(interp, dedent(""" + from test.support import interpreters + cur = interpreters.get_current() + print(cur.id) + """)) + current = interpreters.Interpreter(int(out)) + self.assertEqual(current, interp) + self.assertNotEqual(current, main) + + def test_idempotent(self): + with self.subTest('main'): + cur1 = interpreters.get_current() + cur2 = interpreters.get_current() + self.assertIs(cur1, cur2) + + with self.subTest('subinterpreter'): + interp = interpreters.create() + out = _run_output(interp, dedent(""" + from test.support import interpreters + cur = interpreters.get_current() + print(id(cur)) + cur = interpreters.get_current() + print(id(cur)) + """)) + objid1, objid2 = (int(v) for v in out.splitlines()) + self.assertEqual(objid1, objid2) + + with self.subTest('per-interpreter'): + interp = interpreters.create() + out = _run_output(interp, dedent(""" + from test.support import interpreters + cur = interpreters.get_current() + print(id(cur)) + """)) + id1 = int(out) + id2 = id(interp) + self.assertNotEqual(id1, id2) + + +class ListAllTests(TestBase): + + def test_initial(self): + interps = interpreters.list_all() + self.assertEqual(1, len(interps)) + + def test_after_creating(self): + main = interpreters.get_current() + first = interpreters.create() + second = interpreters.create() + + ids = [] + for interp in interpreters.list_all(): + ids.append(interp.id) + + self.assertEqual(ids, [main.id, first.id, second.id]) + + def test_after_destroying(self): + main = interpreters.get_current() + first = interpreters.create() + second = interpreters.create() + first.close() + + ids = [] + for interp in interpreters.list_all(): + ids.append(interp.id) + + self.assertEqual(ids, [main.id, second.id]) + + def test_idempotent(self): + main = interpreters.get_current() + first = interpreters.create() + second = interpreters.create() + expected = [main, first, second] + + actual = interpreters.list_all() + + self.assertEqual(actual, expected) + for interp1, interp2 in zip(actual, expected): + self.assertIs(interp1, interp2) + + +class InterpreterObjectTests(TestBase): + + def test_init_int(self): + interpid = interpreters.get_current().id + interp = interpreters.Interpreter(interpid) + self.assertEqual(interp.id, interpid) + + def test_init_interpreter_id(self): + interpid = interpreters.get_current()._id + interp = interpreters.Interpreter(interpid) + self.assertEqual(interp._id, interpid) + + def test_init_unsupported(self): + actualid = interpreters.get_current().id + for interpid in [ + str(actualid), + float(actualid), + object(), + None, + '', + ]: + with self.subTest(repr(interpid)): + with self.assertRaises(TypeError): + interpreters.Interpreter(interpid) + + def test_idempotent(self): + main = interpreters.get_main() + interp = interpreters.Interpreter(main.id) + self.assertIs(interp, main) + + def test_init_does_not_exist(self): + with self.assertRaises(InterpreterNotFoundError): + interpreters.Interpreter(1_000_000) + + def test_init_bad_id(self): + with self.assertRaises(ValueError): + interpreters.Interpreter(-1) + + def test_id_type(self): + main = interpreters.get_main() + current = interpreters.get_current() + interp = interpreters.create() + self.assertIsInstance(main.id, int) + self.assertIsInstance(current.id, int) + self.assertIsInstance(interp.id, int) + + def test_id_readonly(self): + interp = interpreters.create() + with self.assertRaises(AttributeError): + interp.id = 1_000_000 + + def test_hashable(self): + interp = interpreters.create() + expected = hash(interp.id) + actual = hash(interp) + self.assertEqual(actual, expected) + + def test_equality(self): + interp1 = interpreters.create() + interp2 = interpreters.create() + self.assertEqual(interp1, interp1) + self.assertNotEqual(interp1, interp2) + + +class TestInterpreterIsRunning(TestBase): + + def test_main(self): + main = interpreters.get_main() + self.assertTrue(main.is_running()) + + @unittest.skip('Fails on FreeBSD') + def test_subinterpreter(self): + interp = interpreters.create() + self.assertFalse(interp.is_running()) + + with _running(interp): + self.assertTrue(interp.is_running()) + self.assertFalse(interp.is_running()) + + def test_finished(self): + r, w = self.pipe() + interp = interpreters.create() + interp.exec_sync(f"""if True: + import os + os.write({w}, b'x') + """) + self.assertFalse(interp.is_running()) + self.assertEqual(os.read(r, 1), b'x') + + def test_from_subinterpreter(self): + interp = interpreters.create() + out = _run_output(interp, dedent(f""" + import _xxsubinterpreters as _interpreters + if _interpreters.is_running({interp.id}): + print(True) + else: + print(False) + """)) + self.assertEqual(out.strip(), 'True') + + def test_already_destroyed(self): + interp = interpreters.create() + interp.close() + with self.assertRaises(InterpreterNotFoundError): + interp.is_running() + + def test_with_only_background_threads(self): + r_interp, w_interp = self.pipe() + r_thread, w_thread = self.pipe() + + DONE = b'D' + FINISHED = b'F' + + interp = interpreters.create() + interp.exec_sync(f"""if True: + import os + import threading + + def task(): + v = os.read({r_thread}, 1) + assert v == {DONE!r} + os.write({w_interp}, {FINISHED!r}) + t = threading.Thread(target=task) + t.start() + """) + self.assertFalse(interp.is_running()) + + os.write(w_thread, DONE) + interp.exec_sync('t.join()') + self.assertEqual(os.read(r_interp, 1), FINISHED) + + +class TestInterpreterClose(TestBase): + + def test_basic(self): + main = interpreters.get_main() + interp1 = interpreters.create() + interp2 = interpreters.create() + interp3 = interpreters.create() + self.assertEqual(set(interpreters.list_all()), + {main, interp1, interp2, interp3}) + interp2.close() + self.assertEqual(set(interpreters.list_all()), + {main, interp1, interp3}) + + def test_all(self): + before = set(interpreters.list_all()) + interps = set() + for _ in range(3): + interp = interpreters.create() + interps.add(interp) + self.assertEqual(set(interpreters.list_all()), before | interps) + for interp in interps: + interp.close() + self.assertEqual(set(interpreters.list_all()), before) + + def test_main(self): + main, = interpreters.list_all() + with self.assertRaises(RuntimeError): + main.close() + + def f(): + with self.assertRaises(RuntimeError): + main.close() + + t = threading.Thread(target=f) + t.start() + t.join() + + def test_already_destroyed(self): + interp = interpreters.create() + interp.close() + with self.assertRaises(InterpreterNotFoundError): + interp.close() + + def test_from_current(self): + main, = interpreters.list_all() + interp = interpreters.create() + out = _run_output(interp, dedent(f""" + from test.support import interpreters + interp = interpreters.Interpreter({interp.id}) + try: + interp.close() + except RuntimeError: + print('failed') + """)) + self.assertEqual(out.strip(), 'failed') + self.assertEqual(set(interpreters.list_all()), {main, interp}) + + def test_from_sibling(self): + main, = interpreters.list_all() + interp1 = interpreters.create() + interp2 = interpreters.create() + self.assertEqual(set(interpreters.list_all()), + {main, interp1, interp2}) + interp1.exec_sync(dedent(f""" + from test.support import interpreters + interp2 = interpreters.Interpreter({interp2.id}) + interp2.close() + interp3 = interpreters.create() + interp3.close() + """)) + self.assertEqual(set(interpreters.list_all()), {main, interp1}) + + def test_from_other_thread(self): + interp = interpreters.create() + def f(): + interp.close() + + t = threading.Thread(target=f) + t.start() + t.join() + + @unittest.skip('Fails on FreeBSD') + def test_still_running(self): + main, = interpreters.list_all() + interp = interpreters.create() + with _running(interp): + with self.assertRaises(RuntimeError): + interp.close() + self.assertTrue(interp.is_running()) + + def test_subthreads_still_running(self): + r_interp, w_interp = self.pipe() + r_thread, w_thread = self.pipe() + + FINISHED = b'F' + + interp = interpreters.create() + interp.exec_sync(f"""if True: + import os + import threading + import time + + done = False + + def notify_fini(): + global done + done = True + t.join() + threading._register_atexit(notify_fini) + + def task(): + while not done: + time.sleep(0.1) + os.write({w_interp}, {FINISHED!r}) + t = threading.Thread(target=task) + t.start() + """) + interp.close() + + self.assertEqual(os.read(r_interp, 1), FINISHED) + + +class TestInterpreterExecSync(TestBase): + + def test_success(self): + interp = interpreters.create() + script, file = _captured_script('print("it worked!", end="")') + with file: + interp.exec_sync(script) + out = file.read() + + self.assertEqual(out, 'it worked!') + + def test_failure(self): + interp = interpreters.create() + with self.assertRaises(interpreters.ExecFailure): + interp.exec_sync('raise Exception') + + def test_in_thread(self): + interp = interpreters.create() + script, file = _captured_script('print("it worked!", end="")') + with file: + def f(): + interp.exec_sync(script) + + t = threading.Thread(target=f) + t.start() + t.join() + out = file.read() + + self.assertEqual(out, 'it worked!') + + @support.requires_fork() + def test_fork(self): + interp = interpreters.create() + import tempfile + with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file: + file.write('') + file.flush() + + expected = 'spam spam spam spam spam' + script = dedent(f""" + import os + try: + os.fork() + except RuntimeError: + with open('{file.name}', 'w', encoding='utf-8') as out: + out.write('{expected}') + """) + interp.exec_sync(script) + + file.seek(0) + content = file.read() + self.assertEqual(content, expected) + + @unittest.skip('Fails on FreeBSD') + def test_already_running(self): + interp = interpreters.create() + with _running(interp): + with self.assertRaises(RuntimeError): + interp.exec_sync('print("spam")') + + def test_bad_script(self): + interp = interpreters.create() + with self.assertRaises(TypeError): + interp.exec_sync(10) + + def test_bytes_for_script(self): + interp = interpreters.create() + with self.assertRaises(TypeError): + interp.exec_sync(b'print("spam")') + + def test_with_background_threads_still_running(self): + r_interp, w_interp = self.pipe() + r_thread, w_thread = self.pipe() + + RAN = b'R' + DONE = b'D' + FINISHED = b'F' + + interp = interpreters.create() + interp.exec_sync(f"""if True: + import os + import threading + + def task(): + v = os.read({r_thread}, 1) + assert v == {DONE!r} + os.write({w_interp}, {FINISHED!r}) + t = threading.Thread(target=task) + t.start() + os.write({w_interp}, {RAN!r}) + """) + interp.exec_sync(f"""if True: + os.write({w_interp}, {RAN!r}) + """) + + os.write(w_thread, DONE) + interp.exec_sync('t.join()') + self.assertEqual(os.read(r_interp, 1), RAN) + self.assertEqual(os.read(r_interp, 1), RAN) + self.assertEqual(os.read(r_interp, 1), FINISHED) + + # test_xxsubinterpreters covers the remaining + # Interpreter.exec_sync() behavior. + + +class TestInterpreterRun(TestBase): + + def test_success(self): + interp = interpreters.create() + script, file = _captured_script('print("it worked!", end="")') + with file: + t = interp.run(script) + t.join() + out = file.read() + + self.assertEqual(out, 'it worked!') + + def test_failure(self): + caught = False + def excepthook(args): + nonlocal caught + caught = True + threading.excepthook = excepthook + try: + interp = interpreters.create() + t = interp.run('raise Exception') + t.join() + + self.assertTrue(caught) + except BaseException: + threading.excepthook = threading.__excepthook__ + + +class TestIsShareable(TestBase): + + def test_default_shareables(self): + shareables = [ + # singletons + None, + # builtin objects + b'spam', + 'spam', + 10, + -10, + True, + False, + 100.0, + (), + (1, ('spam', 'eggs'), True), + ] + for obj in shareables: + with self.subTest(obj): + shareable = interpreters.is_shareable(obj) + self.assertTrue(shareable) + + def test_not_shareable(self): + class Cheese: + def __init__(self, name): + self.name = name + def __str__(self): + return self.name + + class SubBytes(bytes): + """A subclass of a shareable type.""" + + not_shareables = [ + # singletons + NotImplemented, + ..., + # builtin types and objects + type, + object, + object(), + Exception(), + # user-defined types and objects + Cheese, + Cheese('Wensleydale'), + SubBytes(b'spam'), + ] + for obj in not_shareables: + with self.subTest(repr(obj)): + self.assertFalse( + interpreters.is_shareable(obj)) + + +if __name__ == '__main__': + # Test needs to be a package, so we can do relative imports. + unittest.main() diff --git a/Lib/test/test_interpreters/test_channels.py b/Lib/test/test_interpreters/test_channels.py new file mode 100644 index 0000000..3c3e188 --- /dev/null +++ b/Lib/test/test_interpreters/test_channels.py @@ -0,0 +1,328 @@ +import threading +from textwrap import dedent +import unittest +import time + +from test.support import import_helper +# Raise SkipTest if subinterpreters not supported. +_channels = import_helper.import_module('_xxinterpchannels') +from test.support import interpreters +from test.support.interpreters import channels +from .utils import _run_output, TestBase + + +class TestChannels(TestBase): + + def test_create(self): + r, s = channels.create() + self.assertIsInstance(r, channels.RecvChannel) + self.assertIsInstance(s, channels.SendChannel) + + def test_list_all(self): + self.assertEqual(channels.list_all(), []) + created = set() + for _ in range(3): + ch = channels.create() + created.add(ch) + after = set(channels.list_all()) + self.assertEqual(after, created) + + def test_shareable(self): + rch, sch = channels.create() + + self.assertTrue( + interpreters.is_shareable(rch)) + self.assertTrue( + interpreters.is_shareable(sch)) + + sch.send_nowait(rch) + sch.send_nowait(sch) + rch2 = rch.recv() + sch2 = rch.recv() + + self.assertEqual(rch2, rch) + self.assertEqual(sch2, sch) + + def test_is_closed(self): + rch, sch = channels.create() + rbefore = rch.is_closed + sbefore = sch.is_closed + rch.close() + rafter = rch.is_closed + safter = sch.is_closed + + self.assertFalse(rbefore) + self.assertFalse(sbefore) + self.assertTrue(rafter) + self.assertTrue(safter) + + +class TestRecvChannelAttrs(TestBase): + + def test_id_type(self): + rch, _ = channels.create() + self.assertIsInstance(rch.id, _channels.ChannelID) + + def test_custom_id(self): + rch = channels.RecvChannel(1) + self.assertEqual(rch.id, 1) + + with self.assertRaises(TypeError): + channels.RecvChannel('1') + + def test_id_readonly(self): + rch = channels.RecvChannel(1) + with self.assertRaises(AttributeError): + rch.id = 2 + + def test_equality(self): + ch1, _ = channels.create() + ch2, _ = channels.create() + self.assertEqual(ch1, ch1) + self.assertNotEqual(ch1, ch2) + + +class TestSendChannelAttrs(TestBase): + + def test_id_type(self): + _, sch = channels.create() + self.assertIsInstance(sch.id, _channels.ChannelID) + + def test_custom_id(self): + sch = channels.SendChannel(1) + self.assertEqual(sch.id, 1) + + with self.assertRaises(TypeError): + channels.SendChannel('1') + + def test_id_readonly(self): + sch = channels.SendChannel(1) + with self.assertRaises(AttributeError): + sch.id = 2 + + def test_equality(self): + _, ch1 = channels.create() + _, ch2 = channels.create() + self.assertEqual(ch1, ch1) + self.assertNotEqual(ch1, ch2) + + +class TestSendRecv(TestBase): + + def test_send_recv_main(self): + r, s = channels.create() + orig = b'spam' + s.send_nowait(orig) + obj = r.recv() + + self.assertEqual(obj, orig) + self.assertIsNot(obj, orig) + + def test_send_recv_same_interpreter(self): + interp = interpreters.create() + interp.exec_sync(dedent(""" + from test.support.interpreters import channels + r, s = channels.create() + orig = b'spam' + s.send_nowait(orig) + obj = r.recv() + assert obj == orig, 'expected: obj == orig' + assert obj is not orig, 'expected: obj is not orig' + """)) + + @unittest.skip('broken (see BPO-...)') + def test_send_recv_different_interpreters(self): + r1, s1 = channels.create() + r2, s2 = channels.create() + orig1 = b'spam' + s1.send_nowait(orig1) + out = _run_output( + interpreters.create(), + dedent(f""" + obj1 = r.recv() + assert obj1 == b'spam', 'expected: obj1 == orig1' + # When going to another interpreter we get a copy. + assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1' + orig2 = b'eggs' + print(id(orig2)) + s.send_nowait(orig2) + """), + channels=dict(r=r1, s=s2), + ) + obj2 = r2.recv() + + self.assertEqual(obj2, b'eggs') + self.assertNotEqual(id(obj2), int(out)) + + def test_send_recv_different_threads(self): + r, s = channels.create() + + def f(): + while True: + try: + obj = r.recv() + break + except channels.ChannelEmptyError: + time.sleep(0.1) + s.send(obj) + t = threading.Thread(target=f) + t.start() + + orig = b'spam' + s.send(orig) + obj = r.recv() + t.join() + + self.assertEqual(obj, orig) + self.assertIsNot(obj, orig) + + def test_send_recv_nowait_main(self): + r, s = channels.create() + orig = b'spam' + s.send_nowait(orig) + obj = r.recv_nowait() + + self.assertEqual(obj, orig) + self.assertIsNot(obj, orig) + + def test_send_recv_nowait_main_with_default(self): + r, _ = channels.create() + obj = r.recv_nowait(None) + + self.assertIsNone(obj) + + def test_send_recv_nowait_same_interpreter(self): + interp = interpreters.create() + interp.exec_sync(dedent(""" + from test.support.interpreters import channels + r, s = channels.create() + orig = b'spam' + s.send_nowait(orig) + obj = r.recv_nowait() + assert obj == orig, 'expected: obj == orig' + # When going back to the same interpreter we get the same object. + assert obj is not orig, 'expected: obj is not orig' + """)) + + @unittest.skip('broken (see BPO-...)') + def test_send_recv_nowait_different_interpreters(self): + r1, s1 = channels.create() + r2, s2 = channels.create() + orig1 = b'spam' + s1.send_nowait(orig1) + out = _run_output( + interpreters.create(), + dedent(f""" + obj1 = r.recv_nowait() + assert obj1 == b'spam', 'expected: obj1 == orig1' + # When going to another interpreter we get a copy. + assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1' + orig2 = b'eggs' + print(id(orig2)) + s.send_nowait(orig2) + """), + channels=dict(r=r1, s=s2), + ) + obj2 = r2.recv_nowait() + + self.assertEqual(obj2, b'eggs') + self.assertNotEqual(id(obj2), int(out)) + + def test_recv_timeout(self): + r, _ = channels.create() + with self.assertRaises(TimeoutError): + r.recv(timeout=1) + + def test_recv_channel_does_not_exist(self): + ch = channels.RecvChannel(1_000_000) + with self.assertRaises(channels.ChannelNotFoundError): + ch.recv() + + def test_send_channel_does_not_exist(self): + ch = channels.SendChannel(1_000_000) + with self.assertRaises(channels.ChannelNotFoundError): + ch.send(b'spam') + + def test_recv_nowait_channel_does_not_exist(self): + ch = channels.RecvChannel(1_000_000) + with self.assertRaises(channels.ChannelNotFoundError): + ch.recv_nowait() + + def test_send_nowait_channel_does_not_exist(self): + ch = channels.SendChannel(1_000_000) + with self.assertRaises(channels.ChannelNotFoundError): + ch.send_nowait(b'spam') + + def test_recv_nowait_empty(self): + ch, _ = channels.create() + with self.assertRaises(channels.ChannelEmptyError): + ch.recv_nowait() + + def test_recv_nowait_default(self): + default = object() + rch, sch = channels.create() + obj1 = rch.recv_nowait(default) + sch.send_nowait(None) + sch.send_nowait(1) + sch.send_nowait(b'spam') + sch.send_nowait(b'eggs') + obj2 = rch.recv_nowait(default) + obj3 = rch.recv_nowait(default) + obj4 = rch.recv_nowait() + obj5 = rch.recv_nowait(default) + obj6 = rch.recv_nowait(default) + + self.assertIs(obj1, default) + self.assertIs(obj2, None) + self.assertEqual(obj3, 1) + self.assertEqual(obj4, b'spam') + self.assertEqual(obj5, b'eggs') + self.assertIs(obj6, default) + + def test_send_buffer(self): + buf = bytearray(b'spamspamspam') + obj = None + rch, sch = channels.create() + + def f(): + nonlocal obj + while True: + try: + obj = rch.recv() + break + except channels.ChannelEmptyError: + time.sleep(0.1) + t = threading.Thread(target=f) + t.start() + + sch.send_buffer(buf) + t.join() + + self.assertIsNot(obj, buf) + self.assertIsInstance(obj, memoryview) + self.assertEqual(obj, buf) + + buf[4:8] = b'eggs' + self.assertEqual(obj, buf) + obj[4:8] = b'ham.' + self.assertEqual(obj, buf) + + def test_send_buffer_nowait(self): + buf = bytearray(b'spamspamspam') + rch, sch = channels.create() + sch.send_buffer_nowait(buf) + obj = rch.recv() + + self.assertIsNot(obj, buf) + self.assertIsInstance(obj, memoryview) + self.assertEqual(obj, buf) + + buf[4:8] = b'eggs' + self.assertEqual(obj, buf) + obj[4:8] = b'ham.' + self.assertEqual(obj, buf) + + +if __name__ == '__main__': + # Test needs to be a package, so we can do relative imports. + unittest.main() diff --git a/Lib/test/test_interpreters/test_lifecycle.py b/Lib/test/test_interpreters/test_lifecycle.py new file mode 100644 index 0000000..c2917d8 --- /dev/null +++ b/Lib/test/test_interpreters/test_lifecycle.py @@ -0,0 +1,189 @@ +import contextlib +import json +import os +import os.path +import sys +from textwrap import dedent +import unittest + +from test import support +from test.support import import_helper +from test.support import os_helper +# Raise SkipTest if subinterpreters not supported. +import_helper.import_module('_xxsubinterpreters') +from .utils import TestBase + + +class StartupTests(TestBase): + + # We want to ensure the initial state of subinterpreters + # matches expectations. + + _subtest_count = 0 + + @contextlib.contextmanager + def subTest(self, *args): + with super().subTest(*args) as ctx: + self._subtest_count += 1 + try: + yield ctx + finally: + if self._debugged_in_subtest: + if self._subtest_count == 1: + # The first subtest adds a leading newline, so we + # compensate here by not printing a trailing newline. + print('### end subtest debug ###', end='') + else: + print('### end subtest debug ###') + self._debugged_in_subtest = False + + def debug(self, msg, *, header=None): + if header: + self._debug(f'--- {header} ---') + if msg: + if msg.endswith(os.linesep): + self._debug(msg[:-len(os.linesep)]) + else: + self._debug(msg) + self._debug('<no newline>') + self._debug('------') + else: + self._debug(msg) + + _debugged = False + _debugged_in_subtest = False + def _debug(self, msg): + if not self._debugged: + print() + self._debugged = True + if self._subtest is not None: + if True: + if not self._debugged_in_subtest: + self._debugged_in_subtest = True + print('### start subtest debug ###') + print(msg) + else: + print(msg) + + def create_temp_dir(self): + import tempfile + tmp = tempfile.mkdtemp(prefix='test_interpreters_') + tmp = os.path.realpath(tmp) + self.addCleanup(os_helper.rmtree, tmp) + return tmp + + def write_script(self, *path, text): + filename = os.path.join(*path) + dirname = os.path.dirname(filename) + if dirname: + os.makedirs(dirname, exist_ok=True) + with open(filename, 'w', encoding='utf-8') as outfile: + outfile.write(dedent(text)) + return filename + + @support.requires_subprocess() + def run_python(self, argv, *, cwd=None): + # This method is inspired by + # EmbeddingTestsMixin.run_embedded_interpreter() in test_embed.py. + import shlex + import subprocess + if isinstance(argv, str): + argv = shlex.split(argv) + argv = [sys.executable, *argv] + try: + proc = subprocess.run( + argv, + cwd=cwd, + capture_output=True, + text=True, + ) + except Exception as exc: + self.debug(f'# cmd: {shlex.join(argv)}') + if isinstance(exc, FileNotFoundError) and not exc.filename: + if os.path.exists(argv[0]): + exists = 'exists' + else: + exists = 'does not exist' + self.debug(f'{argv[0]} {exists}') + raise # re-raise + assert proc.stderr == '' or proc.returncode != 0, proc.stderr + if proc.returncode != 0 and support.verbose: + self.debug(f'# python3 {shlex.join(argv[1:])} failed:') + self.debug(proc.stdout, header='stdout') + self.debug(proc.stderr, header='stderr') + self.assertEqual(proc.returncode, 0) + self.assertEqual(proc.stderr, '') + return proc.stdout + + def test_sys_path_0(self): + # The main interpreter's sys.path[0] should be used by subinterpreters. + script = ''' + import sys + from test.support import interpreters + + orig = sys.path[0] + + interp = interpreters.create() + interp.exec_sync(f"""if True: + import json + import sys + print(json.dumps({{ + 'main': {orig!r}, + 'sub': sys.path[0], + }}, indent=4), flush=True) + """) + ''' + # <tmp>/ + # pkg/ + # __init__.py + # __main__.py + # script.py + # script.py + cwd = self.create_temp_dir() + self.write_script(cwd, 'pkg', '__init__.py', text='') + self.write_script(cwd, 'pkg', '__main__.py', text=script) + self.write_script(cwd, 'pkg', 'script.py', text=script) + self.write_script(cwd, 'script.py', text=script) + + cases = [ + ('script.py', cwd), + ('-m script', cwd), + ('-m pkg', cwd), + ('-m pkg.script', cwd), + ('-c "import script"', ''), + ] + for argv, expected in cases: + with self.subTest(f'python3 {argv}'): + out = self.run_python(argv, cwd=cwd) + data = json.loads(out) + sp0_main, sp0_sub = data['main'], data['sub'] + self.assertEqual(sp0_sub, sp0_main) + self.assertEqual(sp0_sub, expected) + # XXX Also check them all with the -P cmdline flag? + + +class FinalizationTests(TestBase): + + def test_gh_109793(self): + # Make sure finalization finishes and the correct error code + # is reported, even when subinterpreters get cleaned up at the end. + import subprocess + argv = [sys.executable, '-c', '''if True: + from test.support import interpreters + interp = interpreters.create() + raise Exception + '''] + proc = subprocess.run(argv, capture_output=True, text=True) + self.assertIn('Traceback', proc.stderr) + if proc.returncode == 0 and support.verbose: + print() + print("--- cmd unexpected succeeded ---") + print(f"stdout:\n{proc.stdout}") + print(f"stderr:\n{proc.stderr}") + print("------") + self.assertEqual(proc.returncode, 1) + + +if __name__ == '__main__': + # Test needs to be a package, so we can do relative imports. + unittest.main() diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py new file mode 100644 index 0000000..2af90b1 --- /dev/null +++ b/Lib/test/test_interpreters/test_queues.py @@ -0,0 +1,233 @@ +import threading +from textwrap import dedent +import unittest +import time + +from test.support import import_helper +# Raise SkipTest if subinterpreters not supported. +import_helper.import_module('_xxinterpchannels') +#import_helper.import_module('_xxinterpqueues') +from test.support import interpreters +from test.support.interpreters import queues +from .utils import _run_output, TestBase + + +class QueueTests(TestBase): + + def test_create(self): + with self.subTest('vanilla'): + queue = queues.create() + self.assertEqual(queue.maxsize, 0) + + with self.subTest('small maxsize'): + queue = queues.create(3) + self.assertEqual(queue.maxsize, 3) + + with self.subTest('big maxsize'): + queue = queues.create(100) + self.assertEqual(queue.maxsize, 100) + + with self.subTest('no maxsize'): + queue = queues.create(0) + self.assertEqual(queue.maxsize, 0) + + with self.subTest('negative maxsize'): + queue = queues.create(-1) + self.assertEqual(queue.maxsize, 0) + + with self.subTest('bad maxsize'): + with self.assertRaises(TypeError): + queues.create('1') + + @unittest.expectedFailure + def test_shareable(self): + queue1 = queues.create() + queue2 = queues.create() + queue1.put(queue2) + queue3 = queue1.get() + self.assertIs(queue3, queue1) + + def test_id_type(self): + queue = queues.create() + self.assertIsInstance(queue.id, int) + + def test_custom_id(self): + with self.assertRaises(queues.QueueNotFoundError): + queues.Queue(1_000_000) + + def test_id_readonly(self): + queue = queues.create() + with self.assertRaises(AttributeError): + queue.id = 1_000_000 + + def test_maxsize_readonly(self): + queue = queues.create(10) + with self.assertRaises(AttributeError): + queue.maxsize = 1_000_000 + + def test_hashable(self): + queue = queues.create() + expected = hash(queue.id) + actual = hash(queue) + self.assertEqual(actual, expected) + + def test_equality(self): + queue1 = queues.create() + queue2 = queues.create() + self.assertEqual(queue1, queue1) + self.assertNotEqual(queue1, queue2) + + +class TestQueueOps(TestBase): + + def test_empty(self): + queue = queues.create() + before = queue.empty() + queue.put(None) + during = queue.empty() + queue.get() + after = queue.empty() + + self.assertIs(before, True) + self.assertIs(during, False) + self.assertIs(after, True) + + def test_full(self): + expected = [False, False, False, True, False, False, False] + actual = [] + queue = queues.create(3) + for _ in range(3): + actual.append(queue.full()) + queue.put(None) + actual.append(queue.full()) + for _ in range(3): + queue.get() + actual.append(queue.full()) + + self.assertEqual(actual, expected) + + def test_qsize(self): + expected = [0, 1, 2, 3, 2, 3, 2, 1, 0, 1, 0] + actual = [] + queue = queues.create() + for _ in range(3): + actual.append(queue.qsize()) + queue.put(None) + actual.append(queue.qsize()) + queue.get() + actual.append(queue.qsize()) + queue.put(None) + actual.append(queue.qsize()) + for _ in range(3): + queue.get() + actual.append(queue.qsize()) + queue.put(None) + actual.append(queue.qsize()) + queue.get() + actual.append(queue.qsize()) + + self.assertEqual(actual, expected) + + def test_put_get_main(self): + expected = list(range(20)) + queue = queues.create() + for i in range(20): + queue.put(i) + actual = [queue.get() for _ in range(20)] + + self.assertEqual(actual, expected) + + @unittest.expectedFailure + def test_put_timeout(self): + queue = queues.create(2) + queue.put(None) + queue.put(None) + with self.assertRaises(queues.QueueFull): + queue.put(None, timeout=0.1) + queue.get() + queue.put(None) + + @unittest.expectedFailure + def test_put_nowait(self): + queue = queues.create(2) + queue.put_nowait(None) + queue.put_nowait(None) + with self.assertRaises(queues.QueueFull): + queue.put_nowait(None) + queue.get() + queue.put_nowait(None) + + def test_get_timeout(self): + queue = queues.create() + with self.assertRaises(queues.QueueEmpty): + queue.get(timeout=0.1) + + def test_get_nowait(self): + queue = queues.create() + with self.assertRaises(queues.QueueEmpty): + queue.get_nowait() + + def test_put_get_same_interpreter(self): + interp = interpreters.create() + interp.exec_sync(dedent(""" + from test.support.interpreters import queues + queue = queues.create() + orig = b'spam' + queue.put(orig) + obj = queue.get() + assert obj == orig, 'expected: obj == orig' + assert obj is not orig, 'expected: obj is not orig' + """)) + + @unittest.expectedFailure + def test_put_get_different_interpreters(self): + queue1 = queues.create() + queue2 = queues.create() + obj1 = b'spam' + queue1.put(obj1) + out = _run_output( + interpreters.create(), + dedent(f""" + import test.support.interpreters.queue as queues + queue1 = queues.Queue({queue1.id}) + queue2 = queues.Queue({queue2.id}) + obj = queue1.get() + assert obj == b'spam', 'expected: obj == obj1' + # When going to another interpreter we get a copy. + assert id(obj) != {id(obj1)}, 'expected: obj is not obj1' + obj2 = b'eggs' + print(id(obj2)) + queue2.put(obj2) + """)) + obj2 = queue2.get() + + self.assertEqual(obj2, b'eggs') + self.assertNotEqual(id(obj2), int(out)) + + def test_put_get_different_threads(self): + queue1 = queues.create() + queue2 = queues.create() + + def f(): + while True: + try: + obj = queue1.get(timeout=0.1) + break + except queues.QueueEmpty: + continue + queue2.put(obj) + t = threading.Thread(target=f) + t.start() + + orig = b'spam' + queue1.put(orig) + obj = queue2.get() + t.join() + + self.assertEqual(obj, orig) + self.assertIsNot(obj, orig) + + +if __name__ == '__main__': + # Test needs to be a package, so we can do relative imports. + unittest.main() diff --git a/Lib/test/test_interpreters/test_stress.py b/Lib/test/test_interpreters/test_stress.py new file mode 100644 index 0000000..3cc570b --- /dev/null +++ b/Lib/test/test_interpreters/test_stress.py @@ -0,0 +1,38 @@ +import threading +import unittest + +from test import support +from test.support import import_helper +from test.support import threading_helper +# Raise SkipTest if subinterpreters not supported. +import_helper.import_module('_xxsubinterpreters') +from test.support import interpreters +from .utils import TestBase + + +class StressTests(TestBase): + + # In these tests we generally want a lot of interpreters, + # but not so many that any test takes too long. + + @support.requires_resource('cpu') + def test_create_many_sequential(self): + alive = [] + for _ in range(100): + interp = interpreters.create() + alive.append(interp) + + @support.requires_resource('cpu') + def test_create_many_threaded(self): + alive = [] + def task(): + interp = interpreters.create() + alive.append(interp) + threads = (threading.Thread(target=task) for _ in range(200)) + with threading_helper.start_threads(threads): + pass + + +if __name__ == '__main__': + # Test needs to be a package, so we can do relative imports. + unittest.main() diff --git a/Lib/test/test_interpreters/utils.py b/Lib/test/test_interpreters/utils.py new file mode 100644 index 0000000..623c873 --- /dev/null +++ b/Lib/test/test_interpreters/utils.py @@ -0,0 +1,73 @@ +import contextlib +import os +import threading +from textwrap import dedent +import unittest + +from test.support import interpreters + + +def _captured_script(script): + r, w = os.pipe() + indented = script.replace('\n', '\n ') + wrapped = dedent(f""" + import contextlib + with open({w}, 'w', encoding='utf-8') as spipe: + with contextlib.redirect_stdout(spipe): + {indented} + """) + return wrapped, open(r, encoding='utf-8') + + +def clean_up_interpreters(): + for interp in interpreters.list_all(): + if interp.id == 0: # main + continue + try: + interp.close() + except RuntimeError: + pass # already destroyed + + +def _run_output(interp, request, channels=None): + script, rpipe = _captured_script(request) + with rpipe: + interp.exec_sync(script, channels=channels) + return rpipe.read() + + +@contextlib.contextmanager +def _running(interp): + r, w = os.pipe() + def run(): + interp.exec_sync(dedent(f""" + # wait for "signal" + with open({r}) as rpipe: + rpipe.read() + """)) + + t = threading.Thread(target=run) + t.start() + + yield + + with open(w, 'w') as spipe: + spipe.write('done') + t.join() + + +class TestBase(unittest.TestCase): + + def pipe(self): + def ensure_closed(fd): + try: + os.close(fd) + except OSError: + pass + r, w = os.pipe() + self.addCleanup(lambda: ensure_closed(r)) + self.addCleanup(lambda: ensure_closed(w)) + return r, w + + def tearDown(self): + clean_up_interpreters() |