summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_interpreters
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_interpreters')
-rw-r--r--Lib/test/test_interpreters/__init__.py5
-rw-r--r--Lib/test/test_interpreters/__main__.py4
-rw-r--r--Lib/test/test_interpreters/test_api.py642
-rw-r--r--Lib/test/test_interpreters/test_channels.py328
-rw-r--r--Lib/test/test_interpreters/test_lifecycle.py189
-rw-r--r--Lib/test/test_interpreters/test_queues.py233
-rw-r--r--Lib/test/test_interpreters/test_stress.py38
-rw-r--r--Lib/test/test_interpreters/utils.py73
8 files changed, 1512 insertions, 0 deletions
diff --git a/Lib/test/test_interpreters/__init__.py b/Lib/test/test_interpreters/__init__.py
new file mode 100644
index 0000000..4b16ecc
--- /dev/null
+++ b/Lib/test/test_interpreters/__init__.py
@@ -0,0 +1,5 @@
+import os
+from test.support import load_package_tests
+
+def load_tests(*args):
+ return load_package_tests(os.path.dirname(__file__), *args)
diff --git a/Lib/test/test_interpreters/__main__.py b/Lib/test/test_interpreters/__main__.py
new file mode 100644
index 0000000..8641229
--- /dev/null
+++ b/Lib/test/test_interpreters/__main__.py
@@ -0,0 +1,4 @@
+from . import load_tests
+import unittest
+
+nittest.main()
diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py
new file mode 100644
index 0000000..e4ae9d0
--- /dev/null
+++ b/Lib/test/test_interpreters/test_api.py
@@ -0,0 +1,642 @@
+import os
+import threading
+from textwrap import dedent
+import unittest
+
+from test import support
+from test.support import import_helper
+# Raise SkipTest if subinterpreters not supported.
+import_helper.import_module('_xxsubinterpreters')
+from test.support import interpreters
+from test.support.interpreters import InterpreterNotFoundError
+from .utils import _captured_script, _run_output, _running, TestBase
+
+
+class ModuleTests(TestBase):
+
+ def test_queue_aliases(self):
+ first = [
+ interpreters.create_queue,
+ interpreters.Queue,
+ interpreters.QueueEmpty,
+ interpreters.QueueFull,
+ ]
+ second = [
+ interpreters.create_queue,
+ interpreters.Queue,
+ interpreters.QueueEmpty,
+ interpreters.QueueFull,
+ ]
+ self.assertEqual(second, first)
+
+
+class CreateTests(TestBase):
+
+ def test_in_main(self):
+ interp = interpreters.create()
+ self.assertIsInstance(interp, interpreters.Interpreter)
+ self.assertIn(interp, interpreters.list_all())
+
+ def test_in_thread(self):
+ lock = threading.Lock()
+ interp = None
+ def f():
+ nonlocal interp
+ interp = interpreters.create()
+ lock.acquire()
+ lock.release()
+ t = threading.Thread(target=f)
+ with lock:
+ t.start()
+ t.join()
+ self.assertIn(interp, interpreters.list_all())
+
+ def test_in_subinterpreter(self):
+ main, = interpreters.list_all()
+ interp = interpreters.create()
+ out = _run_output(interp, dedent("""
+ from test.support import interpreters
+ interp = interpreters.create()
+ print(interp.id)
+ """))
+ interp2 = interpreters.Interpreter(int(out))
+ self.assertEqual(interpreters.list_all(), [main, interp, interp2])
+
+ def test_after_destroy_all(self):
+ before = set(interpreters.list_all())
+ # Create 3 subinterpreters.
+ interp_lst = []
+ for _ in range(3):
+ interps = interpreters.create()
+ interp_lst.append(interps)
+ # Now destroy them.
+ for interp in interp_lst:
+ interp.close()
+ # Finally, create another.
+ interp = interpreters.create()
+ self.assertEqual(set(interpreters.list_all()), before | {interp})
+
+ def test_after_destroy_some(self):
+ before = set(interpreters.list_all())
+ # Create 3 subinterpreters.
+ interp1 = interpreters.create()
+ interp2 = interpreters.create()
+ interp3 = interpreters.create()
+ # Now destroy 2 of them.
+ interp1.close()
+ interp2.close()
+ # Finally, create another.
+ interp = interpreters.create()
+ self.assertEqual(set(interpreters.list_all()), before | {interp3, interp})
+
+
+class GetMainTests(TestBase):
+
+ def test_id(self):
+ main = interpreters.get_main()
+ self.assertEqual(main.id, 0)
+
+ def test_current(self):
+ main = interpreters.get_main()
+ current = interpreters.get_current()
+ self.assertIs(main, current)
+
+ def test_idempotent(self):
+ main1 = interpreters.get_main()
+ main2 = interpreters.get_main()
+ self.assertIs(main1, main2)
+
+
+class GetCurrentTests(TestBase):
+
+ def test_main(self):
+ main = interpreters.get_main()
+ current = interpreters.get_current()
+ self.assertEqual(current, main)
+
+ def test_subinterpreter(self):
+ main = interpreters.get_main()
+ interp = interpreters.create()
+ out = _run_output(interp, dedent("""
+ from test.support import interpreters
+ cur = interpreters.get_current()
+ print(cur.id)
+ """))
+ current = interpreters.Interpreter(int(out))
+ self.assertEqual(current, interp)
+ self.assertNotEqual(current, main)
+
+ def test_idempotent(self):
+ with self.subTest('main'):
+ cur1 = interpreters.get_current()
+ cur2 = interpreters.get_current()
+ self.assertIs(cur1, cur2)
+
+ with self.subTest('subinterpreter'):
+ interp = interpreters.create()
+ out = _run_output(interp, dedent("""
+ from test.support import interpreters
+ cur = interpreters.get_current()
+ print(id(cur))
+ cur = interpreters.get_current()
+ print(id(cur))
+ """))
+ objid1, objid2 = (int(v) for v in out.splitlines())
+ self.assertEqual(objid1, objid2)
+
+ with self.subTest('per-interpreter'):
+ interp = interpreters.create()
+ out = _run_output(interp, dedent("""
+ from test.support import interpreters
+ cur = interpreters.get_current()
+ print(id(cur))
+ """))
+ id1 = int(out)
+ id2 = id(interp)
+ self.assertNotEqual(id1, id2)
+
+
+class ListAllTests(TestBase):
+
+ def test_initial(self):
+ interps = interpreters.list_all()
+ self.assertEqual(1, len(interps))
+
+ def test_after_creating(self):
+ main = interpreters.get_current()
+ first = interpreters.create()
+ second = interpreters.create()
+
+ ids = []
+ for interp in interpreters.list_all():
+ ids.append(interp.id)
+
+ self.assertEqual(ids, [main.id, first.id, second.id])
+
+ def test_after_destroying(self):
+ main = interpreters.get_current()
+ first = interpreters.create()
+ second = interpreters.create()
+ first.close()
+
+ ids = []
+ for interp in interpreters.list_all():
+ ids.append(interp.id)
+
+ self.assertEqual(ids, [main.id, second.id])
+
+ def test_idempotent(self):
+ main = interpreters.get_current()
+ first = interpreters.create()
+ second = interpreters.create()
+ expected = [main, first, second]
+
+ actual = interpreters.list_all()
+
+ self.assertEqual(actual, expected)
+ for interp1, interp2 in zip(actual, expected):
+ self.assertIs(interp1, interp2)
+
+
+class InterpreterObjectTests(TestBase):
+
+ def test_init_int(self):
+ interpid = interpreters.get_current().id
+ interp = interpreters.Interpreter(interpid)
+ self.assertEqual(interp.id, interpid)
+
+ def test_init_interpreter_id(self):
+ interpid = interpreters.get_current()._id
+ interp = interpreters.Interpreter(interpid)
+ self.assertEqual(interp._id, interpid)
+
+ def test_init_unsupported(self):
+ actualid = interpreters.get_current().id
+ for interpid in [
+ str(actualid),
+ float(actualid),
+ object(),
+ None,
+ '',
+ ]:
+ with self.subTest(repr(interpid)):
+ with self.assertRaises(TypeError):
+ interpreters.Interpreter(interpid)
+
+ def test_idempotent(self):
+ main = interpreters.get_main()
+ interp = interpreters.Interpreter(main.id)
+ self.assertIs(interp, main)
+
+ def test_init_does_not_exist(self):
+ with self.assertRaises(InterpreterNotFoundError):
+ interpreters.Interpreter(1_000_000)
+
+ def test_init_bad_id(self):
+ with self.assertRaises(ValueError):
+ interpreters.Interpreter(-1)
+
+ def test_id_type(self):
+ main = interpreters.get_main()
+ current = interpreters.get_current()
+ interp = interpreters.create()
+ self.assertIsInstance(main.id, int)
+ self.assertIsInstance(current.id, int)
+ self.assertIsInstance(interp.id, int)
+
+ def test_id_readonly(self):
+ interp = interpreters.create()
+ with self.assertRaises(AttributeError):
+ interp.id = 1_000_000
+
+ def test_hashable(self):
+ interp = interpreters.create()
+ expected = hash(interp.id)
+ actual = hash(interp)
+ self.assertEqual(actual, expected)
+
+ def test_equality(self):
+ interp1 = interpreters.create()
+ interp2 = interpreters.create()
+ self.assertEqual(interp1, interp1)
+ self.assertNotEqual(interp1, interp2)
+
+
+class TestInterpreterIsRunning(TestBase):
+
+ def test_main(self):
+ main = interpreters.get_main()
+ self.assertTrue(main.is_running())
+
+ @unittest.skip('Fails on FreeBSD')
+ def test_subinterpreter(self):
+ interp = interpreters.create()
+ self.assertFalse(interp.is_running())
+
+ with _running(interp):
+ self.assertTrue(interp.is_running())
+ self.assertFalse(interp.is_running())
+
+ def test_finished(self):
+ r, w = self.pipe()
+ interp = interpreters.create()
+ interp.exec_sync(f"""if True:
+ import os
+ os.write({w}, b'x')
+ """)
+ self.assertFalse(interp.is_running())
+ self.assertEqual(os.read(r, 1), b'x')
+
+ def test_from_subinterpreter(self):
+ interp = interpreters.create()
+ out = _run_output(interp, dedent(f"""
+ import _xxsubinterpreters as _interpreters
+ if _interpreters.is_running({interp.id}):
+ print(True)
+ else:
+ print(False)
+ """))
+ self.assertEqual(out.strip(), 'True')
+
+ def test_already_destroyed(self):
+ interp = interpreters.create()
+ interp.close()
+ with self.assertRaises(InterpreterNotFoundError):
+ interp.is_running()
+
+ def test_with_only_background_threads(self):
+ r_interp, w_interp = self.pipe()
+ r_thread, w_thread = self.pipe()
+
+ DONE = b'D'
+ FINISHED = b'F'
+
+ interp = interpreters.create()
+ interp.exec_sync(f"""if True:
+ import os
+ import threading
+
+ def task():
+ v = os.read({r_thread}, 1)
+ assert v == {DONE!r}
+ os.write({w_interp}, {FINISHED!r})
+ t = threading.Thread(target=task)
+ t.start()
+ """)
+ self.assertFalse(interp.is_running())
+
+ os.write(w_thread, DONE)
+ interp.exec_sync('t.join()')
+ self.assertEqual(os.read(r_interp, 1), FINISHED)
+
+
+class TestInterpreterClose(TestBase):
+
+ def test_basic(self):
+ main = interpreters.get_main()
+ interp1 = interpreters.create()
+ interp2 = interpreters.create()
+ interp3 = interpreters.create()
+ self.assertEqual(set(interpreters.list_all()),
+ {main, interp1, interp2, interp3})
+ interp2.close()
+ self.assertEqual(set(interpreters.list_all()),
+ {main, interp1, interp3})
+
+ def test_all(self):
+ before = set(interpreters.list_all())
+ interps = set()
+ for _ in range(3):
+ interp = interpreters.create()
+ interps.add(interp)
+ self.assertEqual(set(interpreters.list_all()), before | interps)
+ for interp in interps:
+ interp.close()
+ self.assertEqual(set(interpreters.list_all()), before)
+
+ def test_main(self):
+ main, = interpreters.list_all()
+ with self.assertRaises(RuntimeError):
+ main.close()
+
+ def f():
+ with self.assertRaises(RuntimeError):
+ main.close()
+
+ t = threading.Thread(target=f)
+ t.start()
+ t.join()
+
+ def test_already_destroyed(self):
+ interp = interpreters.create()
+ interp.close()
+ with self.assertRaises(InterpreterNotFoundError):
+ interp.close()
+
+ def test_from_current(self):
+ main, = interpreters.list_all()
+ interp = interpreters.create()
+ out = _run_output(interp, dedent(f"""
+ from test.support import interpreters
+ interp = interpreters.Interpreter({interp.id})
+ try:
+ interp.close()
+ except RuntimeError:
+ print('failed')
+ """))
+ self.assertEqual(out.strip(), 'failed')
+ self.assertEqual(set(interpreters.list_all()), {main, interp})
+
+ def test_from_sibling(self):
+ main, = interpreters.list_all()
+ interp1 = interpreters.create()
+ interp2 = interpreters.create()
+ self.assertEqual(set(interpreters.list_all()),
+ {main, interp1, interp2})
+ interp1.exec_sync(dedent(f"""
+ from test.support import interpreters
+ interp2 = interpreters.Interpreter({interp2.id})
+ interp2.close()
+ interp3 = interpreters.create()
+ interp3.close()
+ """))
+ self.assertEqual(set(interpreters.list_all()), {main, interp1})
+
+ def test_from_other_thread(self):
+ interp = interpreters.create()
+ def f():
+ interp.close()
+
+ t = threading.Thread(target=f)
+ t.start()
+ t.join()
+
+ @unittest.skip('Fails on FreeBSD')
+ def test_still_running(self):
+ main, = interpreters.list_all()
+ interp = interpreters.create()
+ with _running(interp):
+ with self.assertRaises(RuntimeError):
+ interp.close()
+ self.assertTrue(interp.is_running())
+
+ def test_subthreads_still_running(self):
+ r_interp, w_interp = self.pipe()
+ r_thread, w_thread = self.pipe()
+
+ FINISHED = b'F'
+
+ interp = interpreters.create()
+ interp.exec_sync(f"""if True:
+ import os
+ import threading
+ import time
+
+ done = False
+
+ def notify_fini():
+ global done
+ done = True
+ t.join()
+ threading._register_atexit(notify_fini)
+
+ def task():
+ while not done:
+ time.sleep(0.1)
+ os.write({w_interp}, {FINISHED!r})
+ t = threading.Thread(target=task)
+ t.start()
+ """)
+ interp.close()
+
+ self.assertEqual(os.read(r_interp, 1), FINISHED)
+
+
+class TestInterpreterExecSync(TestBase):
+
+ def test_success(self):
+ interp = interpreters.create()
+ script, file = _captured_script('print("it worked!", end="")')
+ with file:
+ interp.exec_sync(script)
+ out = file.read()
+
+ self.assertEqual(out, 'it worked!')
+
+ def test_failure(self):
+ interp = interpreters.create()
+ with self.assertRaises(interpreters.ExecFailure):
+ interp.exec_sync('raise Exception')
+
+ def test_in_thread(self):
+ interp = interpreters.create()
+ script, file = _captured_script('print("it worked!", end="")')
+ with file:
+ def f():
+ interp.exec_sync(script)
+
+ t = threading.Thread(target=f)
+ t.start()
+ t.join()
+ out = file.read()
+
+ self.assertEqual(out, 'it worked!')
+
+ @support.requires_fork()
+ def test_fork(self):
+ interp = interpreters.create()
+ import tempfile
+ with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file:
+ file.write('')
+ file.flush()
+
+ expected = 'spam spam spam spam spam'
+ script = dedent(f"""
+ import os
+ try:
+ os.fork()
+ except RuntimeError:
+ with open('{file.name}', 'w', encoding='utf-8') as out:
+ out.write('{expected}')
+ """)
+ interp.exec_sync(script)
+
+ file.seek(0)
+ content = file.read()
+ self.assertEqual(content, expected)
+
+ @unittest.skip('Fails on FreeBSD')
+ def test_already_running(self):
+ interp = interpreters.create()
+ with _running(interp):
+ with self.assertRaises(RuntimeError):
+ interp.exec_sync('print("spam")')
+
+ def test_bad_script(self):
+ interp = interpreters.create()
+ with self.assertRaises(TypeError):
+ interp.exec_sync(10)
+
+ def test_bytes_for_script(self):
+ interp = interpreters.create()
+ with self.assertRaises(TypeError):
+ interp.exec_sync(b'print("spam")')
+
+ def test_with_background_threads_still_running(self):
+ r_interp, w_interp = self.pipe()
+ r_thread, w_thread = self.pipe()
+
+ RAN = b'R'
+ DONE = b'D'
+ FINISHED = b'F'
+
+ interp = interpreters.create()
+ interp.exec_sync(f"""if True:
+ import os
+ import threading
+
+ def task():
+ v = os.read({r_thread}, 1)
+ assert v == {DONE!r}
+ os.write({w_interp}, {FINISHED!r})
+ t = threading.Thread(target=task)
+ t.start()
+ os.write({w_interp}, {RAN!r})
+ """)
+ interp.exec_sync(f"""if True:
+ os.write({w_interp}, {RAN!r})
+ """)
+
+ os.write(w_thread, DONE)
+ interp.exec_sync('t.join()')
+ self.assertEqual(os.read(r_interp, 1), RAN)
+ self.assertEqual(os.read(r_interp, 1), RAN)
+ self.assertEqual(os.read(r_interp, 1), FINISHED)
+
+ # test_xxsubinterpreters covers the remaining
+ # Interpreter.exec_sync() behavior.
+
+
+class TestInterpreterRun(TestBase):
+
+ def test_success(self):
+ interp = interpreters.create()
+ script, file = _captured_script('print("it worked!", end="")')
+ with file:
+ t = interp.run(script)
+ t.join()
+ out = file.read()
+
+ self.assertEqual(out, 'it worked!')
+
+ def test_failure(self):
+ caught = False
+ def excepthook(args):
+ nonlocal caught
+ caught = True
+ threading.excepthook = excepthook
+ try:
+ interp = interpreters.create()
+ t = interp.run('raise Exception')
+ t.join()
+
+ self.assertTrue(caught)
+ except BaseException:
+ threading.excepthook = threading.__excepthook__
+
+
+class TestIsShareable(TestBase):
+
+ def test_default_shareables(self):
+ shareables = [
+ # singletons
+ None,
+ # builtin objects
+ b'spam',
+ 'spam',
+ 10,
+ -10,
+ True,
+ False,
+ 100.0,
+ (),
+ (1, ('spam', 'eggs'), True),
+ ]
+ for obj in shareables:
+ with self.subTest(obj):
+ shareable = interpreters.is_shareable(obj)
+ self.assertTrue(shareable)
+
+ def test_not_shareable(self):
+ class Cheese:
+ def __init__(self, name):
+ self.name = name
+ def __str__(self):
+ return self.name
+
+ class SubBytes(bytes):
+ """A subclass of a shareable type."""
+
+ not_shareables = [
+ # singletons
+ NotImplemented,
+ ...,
+ # builtin types and objects
+ type,
+ object,
+ object(),
+ Exception(),
+ # user-defined types and objects
+ Cheese,
+ Cheese('Wensleydale'),
+ SubBytes(b'spam'),
+ ]
+ for obj in not_shareables:
+ with self.subTest(repr(obj)):
+ self.assertFalse(
+ interpreters.is_shareable(obj))
+
+
+if __name__ == '__main__':
+ # Test needs to be a package, so we can do relative imports.
+ unittest.main()
diff --git a/Lib/test/test_interpreters/test_channels.py b/Lib/test/test_interpreters/test_channels.py
new file mode 100644
index 0000000..3c3e188
--- /dev/null
+++ b/Lib/test/test_interpreters/test_channels.py
@@ -0,0 +1,328 @@
+import threading
+from textwrap import dedent
+import unittest
+import time
+
+from test.support import import_helper
+# Raise SkipTest if subinterpreters not supported.
+_channels = import_helper.import_module('_xxinterpchannels')
+from test.support import interpreters
+from test.support.interpreters import channels
+from .utils import _run_output, TestBase
+
+
+class TestChannels(TestBase):
+
+ def test_create(self):
+ r, s = channels.create()
+ self.assertIsInstance(r, channels.RecvChannel)
+ self.assertIsInstance(s, channels.SendChannel)
+
+ def test_list_all(self):
+ self.assertEqual(channels.list_all(), [])
+ created = set()
+ for _ in range(3):
+ ch = channels.create()
+ created.add(ch)
+ after = set(channels.list_all())
+ self.assertEqual(after, created)
+
+ def test_shareable(self):
+ rch, sch = channels.create()
+
+ self.assertTrue(
+ interpreters.is_shareable(rch))
+ self.assertTrue(
+ interpreters.is_shareable(sch))
+
+ sch.send_nowait(rch)
+ sch.send_nowait(sch)
+ rch2 = rch.recv()
+ sch2 = rch.recv()
+
+ self.assertEqual(rch2, rch)
+ self.assertEqual(sch2, sch)
+
+ def test_is_closed(self):
+ rch, sch = channels.create()
+ rbefore = rch.is_closed
+ sbefore = sch.is_closed
+ rch.close()
+ rafter = rch.is_closed
+ safter = sch.is_closed
+
+ self.assertFalse(rbefore)
+ self.assertFalse(sbefore)
+ self.assertTrue(rafter)
+ self.assertTrue(safter)
+
+
+class TestRecvChannelAttrs(TestBase):
+
+ def test_id_type(self):
+ rch, _ = channels.create()
+ self.assertIsInstance(rch.id, _channels.ChannelID)
+
+ def test_custom_id(self):
+ rch = channels.RecvChannel(1)
+ self.assertEqual(rch.id, 1)
+
+ with self.assertRaises(TypeError):
+ channels.RecvChannel('1')
+
+ def test_id_readonly(self):
+ rch = channels.RecvChannel(1)
+ with self.assertRaises(AttributeError):
+ rch.id = 2
+
+ def test_equality(self):
+ ch1, _ = channels.create()
+ ch2, _ = channels.create()
+ self.assertEqual(ch1, ch1)
+ self.assertNotEqual(ch1, ch2)
+
+
+class TestSendChannelAttrs(TestBase):
+
+ def test_id_type(self):
+ _, sch = channels.create()
+ self.assertIsInstance(sch.id, _channels.ChannelID)
+
+ def test_custom_id(self):
+ sch = channels.SendChannel(1)
+ self.assertEqual(sch.id, 1)
+
+ with self.assertRaises(TypeError):
+ channels.SendChannel('1')
+
+ def test_id_readonly(self):
+ sch = channels.SendChannel(1)
+ with self.assertRaises(AttributeError):
+ sch.id = 2
+
+ def test_equality(self):
+ _, ch1 = channels.create()
+ _, ch2 = channels.create()
+ self.assertEqual(ch1, ch1)
+ self.assertNotEqual(ch1, ch2)
+
+
+class TestSendRecv(TestBase):
+
+ def test_send_recv_main(self):
+ r, s = channels.create()
+ orig = b'spam'
+ s.send_nowait(orig)
+ obj = r.recv()
+
+ self.assertEqual(obj, orig)
+ self.assertIsNot(obj, orig)
+
+ def test_send_recv_same_interpreter(self):
+ interp = interpreters.create()
+ interp.exec_sync(dedent("""
+ from test.support.interpreters import channels
+ r, s = channels.create()
+ orig = b'spam'
+ s.send_nowait(orig)
+ obj = r.recv()
+ assert obj == orig, 'expected: obj == orig'
+ assert obj is not orig, 'expected: obj is not orig'
+ """))
+
+ @unittest.skip('broken (see BPO-...)')
+ def test_send_recv_different_interpreters(self):
+ r1, s1 = channels.create()
+ r2, s2 = channels.create()
+ orig1 = b'spam'
+ s1.send_nowait(orig1)
+ out = _run_output(
+ interpreters.create(),
+ dedent(f"""
+ obj1 = r.recv()
+ assert obj1 == b'spam', 'expected: obj1 == orig1'
+ # When going to another interpreter we get a copy.
+ assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
+ orig2 = b'eggs'
+ print(id(orig2))
+ s.send_nowait(orig2)
+ """),
+ channels=dict(r=r1, s=s2),
+ )
+ obj2 = r2.recv()
+
+ self.assertEqual(obj2, b'eggs')
+ self.assertNotEqual(id(obj2), int(out))
+
+ def test_send_recv_different_threads(self):
+ r, s = channels.create()
+
+ def f():
+ while True:
+ try:
+ obj = r.recv()
+ break
+ except channels.ChannelEmptyError:
+ time.sleep(0.1)
+ s.send(obj)
+ t = threading.Thread(target=f)
+ t.start()
+
+ orig = b'spam'
+ s.send(orig)
+ obj = r.recv()
+ t.join()
+
+ self.assertEqual(obj, orig)
+ self.assertIsNot(obj, orig)
+
+ def test_send_recv_nowait_main(self):
+ r, s = channels.create()
+ orig = b'spam'
+ s.send_nowait(orig)
+ obj = r.recv_nowait()
+
+ self.assertEqual(obj, orig)
+ self.assertIsNot(obj, orig)
+
+ def test_send_recv_nowait_main_with_default(self):
+ r, _ = channels.create()
+ obj = r.recv_nowait(None)
+
+ self.assertIsNone(obj)
+
+ def test_send_recv_nowait_same_interpreter(self):
+ interp = interpreters.create()
+ interp.exec_sync(dedent("""
+ from test.support.interpreters import channels
+ r, s = channels.create()
+ orig = b'spam'
+ s.send_nowait(orig)
+ obj = r.recv_nowait()
+ assert obj == orig, 'expected: obj == orig'
+ # When going back to the same interpreter we get the same object.
+ assert obj is not orig, 'expected: obj is not orig'
+ """))
+
+ @unittest.skip('broken (see BPO-...)')
+ def test_send_recv_nowait_different_interpreters(self):
+ r1, s1 = channels.create()
+ r2, s2 = channels.create()
+ orig1 = b'spam'
+ s1.send_nowait(orig1)
+ out = _run_output(
+ interpreters.create(),
+ dedent(f"""
+ obj1 = r.recv_nowait()
+ assert obj1 == b'spam', 'expected: obj1 == orig1'
+ # When going to another interpreter we get a copy.
+ assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
+ orig2 = b'eggs'
+ print(id(orig2))
+ s.send_nowait(orig2)
+ """),
+ channels=dict(r=r1, s=s2),
+ )
+ obj2 = r2.recv_nowait()
+
+ self.assertEqual(obj2, b'eggs')
+ self.assertNotEqual(id(obj2), int(out))
+
+ def test_recv_timeout(self):
+ r, _ = channels.create()
+ with self.assertRaises(TimeoutError):
+ r.recv(timeout=1)
+
+ def test_recv_channel_does_not_exist(self):
+ ch = channels.RecvChannel(1_000_000)
+ with self.assertRaises(channels.ChannelNotFoundError):
+ ch.recv()
+
+ def test_send_channel_does_not_exist(self):
+ ch = channels.SendChannel(1_000_000)
+ with self.assertRaises(channels.ChannelNotFoundError):
+ ch.send(b'spam')
+
+ def test_recv_nowait_channel_does_not_exist(self):
+ ch = channels.RecvChannel(1_000_000)
+ with self.assertRaises(channels.ChannelNotFoundError):
+ ch.recv_nowait()
+
+ def test_send_nowait_channel_does_not_exist(self):
+ ch = channels.SendChannel(1_000_000)
+ with self.assertRaises(channels.ChannelNotFoundError):
+ ch.send_nowait(b'spam')
+
+ def test_recv_nowait_empty(self):
+ ch, _ = channels.create()
+ with self.assertRaises(channels.ChannelEmptyError):
+ ch.recv_nowait()
+
+ def test_recv_nowait_default(self):
+ default = object()
+ rch, sch = channels.create()
+ obj1 = rch.recv_nowait(default)
+ sch.send_nowait(None)
+ sch.send_nowait(1)
+ sch.send_nowait(b'spam')
+ sch.send_nowait(b'eggs')
+ obj2 = rch.recv_nowait(default)
+ obj3 = rch.recv_nowait(default)
+ obj4 = rch.recv_nowait()
+ obj5 = rch.recv_nowait(default)
+ obj6 = rch.recv_nowait(default)
+
+ self.assertIs(obj1, default)
+ self.assertIs(obj2, None)
+ self.assertEqual(obj3, 1)
+ self.assertEqual(obj4, b'spam')
+ self.assertEqual(obj5, b'eggs')
+ self.assertIs(obj6, default)
+
+ def test_send_buffer(self):
+ buf = bytearray(b'spamspamspam')
+ obj = None
+ rch, sch = channels.create()
+
+ def f():
+ nonlocal obj
+ while True:
+ try:
+ obj = rch.recv()
+ break
+ except channels.ChannelEmptyError:
+ time.sleep(0.1)
+ t = threading.Thread(target=f)
+ t.start()
+
+ sch.send_buffer(buf)
+ t.join()
+
+ self.assertIsNot(obj, buf)
+ self.assertIsInstance(obj, memoryview)
+ self.assertEqual(obj, buf)
+
+ buf[4:8] = b'eggs'
+ self.assertEqual(obj, buf)
+ obj[4:8] = b'ham.'
+ self.assertEqual(obj, buf)
+
+ def test_send_buffer_nowait(self):
+ buf = bytearray(b'spamspamspam')
+ rch, sch = channels.create()
+ sch.send_buffer_nowait(buf)
+ obj = rch.recv()
+
+ self.assertIsNot(obj, buf)
+ self.assertIsInstance(obj, memoryview)
+ self.assertEqual(obj, buf)
+
+ buf[4:8] = b'eggs'
+ self.assertEqual(obj, buf)
+ obj[4:8] = b'ham.'
+ self.assertEqual(obj, buf)
+
+
+if __name__ == '__main__':
+ # Test needs to be a package, so we can do relative imports.
+ unittest.main()
diff --git a/Lib/test/test_interpreters/test_lifecycle.py b/Lib/test/test_interpreters/test_lifecycle.py
new file mode 100644
index 0000000..c2917d8
--- /dev/null
+++ b/Lib/test/test_interpreters/test_lifecycle.py
@@ -0,0 +1,189 @@
+import contextlib
+import json
+import os
+import os.path
+import sys
+from textwrap import dedent
+import unittest
+
+from test import support
+from test.support import import_helper
+from test.support import os_helper
+# Raise SkipTest if subinterpreters not supported.
+import_helper.import_module('_xxsubinterpreters')
+from .utils import TestBase
+
+
+class StartupTests(TestBase):
+
+ # We want to ensure the initial state of subinterpreters
+ # matches expectations.
+
+ _subtest_count = 0
+
+ @contextlib.contextmanager
+ def subTest(self, *args):
+ with super().subTest(*args) as ctx:
+ self._subtest_count += 1
+ try:
+ yield ctx
+ finally:
+ if self._debugged_in_subtest:
+ if self._subtest_count == 1:
+ # The first subtest adds a leading newline, so we
+ # compensate here by not printing a trailing newline.
+ print('### end subtest debug ###', end='')
+ else:
+ print('### end subtest debug ###')
+ self._debugged_in_subtest = False
+
+ def debug(self, msg, *, header=None):
+ if header:
+ self._debug(f'--- {header} ---')
+ if msg:
+ if msg.endswith(os.linesep):
+ self._debug(msg[:-len(os.linesep)])
+ else:
+ self._debug(msg)
+ self._debug('<no newline>')
+ self._debug('------')
+ else:
+ self._debug(msg)
+
+ _debugged = False
+ _debugged_in_subtest = False
+ def _debug(self, msg):
+ if not self._debugged:
+ print()
+ self._debugged = True
+ if self._subtest is not None:
+ if True:
+ if not self._debugged_in_subtest:
+ self._debugged_in_subtest = True
+ print('### start subtest debug ###')
+ print(msg)
+ else:
+ print(msg)
+
+ def create_temp_dir(self):
+ import tempfile
+ tmp = tempfile.mkdtemp(prefix='test_interpreters_')
+ tmp = os.path.realpath(tmp)
+ self.addCleanup(os_helper.rmtree, tmp)
+ return tmp
+
+ def write_script(self, *path, text):
+ filename = os.path.join(*path)
+ dirname = os.path.dirname(filename)
+ if dirname:
+ os.makedirs(dirname, exist_ok=True)
+ with open(filename, 'w', encoding='utf-8') as outfile:
+ outfile.write(dedent(text))
+ return filename
+
+ @support.requires_subprocess()
+ def run_python(self, argv, *, cwd=None):
+ # This method is inspired by
+ # EmbeddingTestsMixin.run_embedded_interpreter() in test_embed.py.
+ import shlex
+ import subprocess
+ if isinstance(argv, str):
+ argv = shlex.split(argv)
+ argv = [sys.executable, *argv]
+ try:
+ proc = subprocess.run(
+ argv,
+ cwd=cwd,
+ capture_output=True,
+ text=True,
+ )
+ except Exception as exc:
+ self.debug(f'# cmd: {shlex.join(argv)}')
+ if isinstance(exc, FileNotFoundError) and not exc.filename:
+ if os.path.exists(argv[0]):
+ exists = 'exists'
+ else:
+ exists = 'does not exist'
+ self.debug(f'{argv[0]} {exists}')
+ raise # re-raise
+ assert proc.stderr == '' or proc.returncode != 0, proc.stderr
+ if proc.returncode != 0 and support.verbose:
+ self.debug(f'# python3 {shlex.join(argv[1:])} failed:')
+ self.debug(proc.stdout, header='stdout')
+ self.debug(proc.stderr, header='stderr')
+ self.assertEqual(proc.returncode, 0)
+ self.assertEqual(proc.stderr, '')
+ return proc.stdout
+
+ def test_sys_path_0(self):
+ # The main interpreter's sys.path[0] should be used by subinterpreters.
+ script = '''
+ import sys
+ from test.support import interpreters
+
+ orig = sys.path[0]
+
+ interp = interpreters.create()
+ interp.exec_sync(f"""if True:
+ import json
+ import sys
+ print(json.dumps({{
+ 'main': {orig!r},
+ 'sub': sys.path[0],
+ }}, indent=4), flush=True)
+ """)
+ '''
+ # <tmp>/
+ # pkg/
+ # __init__.py
+ # __main__.py
+ # script.py
+ # script.py
+ cwd = self.create_temp_dir()
+ self.write_script(cwd, 'pkg', '__init__.py', text='')
+ self.write_script(cwd, 'pkg', '__main__.py', text=script)
+ self.write_script(cwd, 'pkg', 'script.py', text=script)
+ self.write_script(cwd, 'script.py', text=script)
+
+ cases = [
+ ('script.py', cwd),
+ ('-m script', cwd),
+ ('-m pkg', cwd),
+ ('-m pkg.script', cwd),
+ ('-c "import script"', ''),
+ ]
+ for argv, expected in cases:
+ with self.subTest(f'python3 {argv}'):
+ out = self.run_python(argv, cwd=cwd)
+ data = json.loads(out)
+ sp0_main, sp0_sub = data['main'], data['sub']
+ self.assertEqual(sp0_sub, sp0_main)
+ self.assertEqual(sp0_sub, expected)
+ # XXX Also check them all with the -P cmdline flag?
+
+
+class FinalizationTests(TestBase):
+
+ def test_gh_109793(self):
+ # Make sure finalization finishes and the correct error code
+ # is reported, even when subinterpreters get cleaned up at the end.
+ import subprocess
+ argv = [sys.executable, '-c', '''if True:
+ from test.support import interpreters
+ interp = interpreters.create()
+ raise Exception
+ ''']
+ proc = subprocess.run(argv, capture_output=True, text=True)
+ self.assertIn('Traceback', proc.stderr)
+ if proc.returncode == 0 and support.verbose:
+ print()
+ print("--- cmd unexpected succeeded ---")
+ print(f"stdout:\n{proc.stdout}")
+ print(f"stderr:\n{proc.stderr}")
+ print("------")
+ self.assertEqual(proc.returncode, 1)
+
+
+if __name__ == '__main__':
+ # Test needs to be a package, so we can do relative imports.
+ unittest.main()
diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py
new file mode 100644
index 0000000..2af90b1
--- /dev/null
+++ b/Lib/test/test_interpreters/test_queues.py
@@ -0,0 +1,233 @@
+import threading
+from textwrap import dedent
+import unittest
+import time
+
+from test.support import import_helper
+# Raise SkipTest if subinterpreters not supported.
+import_helper.import_module('_xxinterpchannels')
+#import_helper.import_module('_xxinterpqueues')
+from test.support import interpreters
+from test.support.interpreters import queues
+from .utils import _run_output, TestBase
+
+
+class QueueTests(TestBase):
+
+ def test_create(self):
+ with self.subTest('vanilla'):
+ queue = queues.create()
+ self.assertEqual(queue.maxsize, 0)
+
+ with self.subTest('small maxsize'):
+ queue = queues.create(3)
+ self.assertEqual(queue.maxsize, 3)
+
+ with self.subTest('big maxsize'):
+ queue = queues.create(100)
+ self.assertEqual(queue.maxsize, 100)
+
+ with self.subTest('no maxsize'):
+ queue = queues.create(0)
+ self.assertEqual(queue.maxsize, 0)
+
+ with self.subTest('negative maxsize'):
+ queue = queues.create(-1)
+ self.assertEqual(queue.maxsize, 0)
+
+ with self.subTest('bad maxsize'):
+ with self.assertRaises(TypeError):
+ queues.create('1')
+
+ @unittest.expectedFailure
+ def test_shareable(self):
+ queue1 = queues.create()
+ queue2 = queues.create()
+ queue1.put(queue2)
+ queue3 = queue1.get()
+ self.assertIs(queue3, queue1)
+
+ def test_id_type(self):
+ queue = queues.create()
+ self.assertIsInstance(queue.id, int)
+
+ def test_custom_id(self):
+ with self.assertRaises(queues.QueueNotFoundError):
+ queues.Queue(1_000_000)
+
+ def test_id_readonly(self):
+ queue = queues.create()
+ with self.assertRaises(AttributeError):
+ queue.id = 1_000_000
+
+ def test_maxsize_readonly(self):
+ queue = queues.create(10)
+ with self.assertRaises(AttributeError):
+ queue.maxsize = 1_000_000
+
+ def test_hashable(self):
+ queue = queues.create()
+ expected = hash(queue.id)
+ actual = hash(queue)
+ self.assertEqual(actual, expected)
+
+ def test_equality(self):
+ queue1 = queues.create()
+ queue2 = queues.create()
+ self.assertEqual(queue1, queue1)
+ self.assertNotEqual(queue1, queue2)
+
+
+class TestQueueOps(TestBase):
+
+ def test_empty(self):
+ queue = queues.create()
+ before = queue.empty()
+ queue.put(None)
+ during = queue.empty()
+ queue.get()
+ after = queue.empty()
+
+ self.assertIs(before, True)
+ self.assertIs(during, False)
+ self.assertIs(after, True)
+
+ def test_full(self):
+ expected = [False, False, False, True, False, False, False]
+ actual = []
+ queue = queues.create(3)
+ for _ in range(3):
+ actual.append(queue.full())
+ queue.put(None)
+ actual.append(queue.full())
+ for _ in range(3):
+ queue.get()
+ actual.append(queue.full())
+
+ self.assertEqual(actual, expected)
+
+ def test_qsize(self):
+ expected = [0, 1, 2, 3, 2, 3, 2, 1, 0, 1, 0]
+ actual = []
+ queue = queues.create()
+ for _ in range(3):
+ actual.append(queue.qsize())
+ queue.put(None)
+ actual.append(queue.qsize())
+ queue.get()
+ actual.append(queue.qsize())
+ queue.put(None)
+ actual.append(queue.qsize())
+ for _ in range(3):
+ queue.get()
+ actual.append(queue.qsize())
+ queue.put(None)
+ actual.append(queue.qsize())
+ queue.get()
+ actual.append(queue.qsize())
+
+ self.assertEqual(actual, expected)
+
+ def test_put_get_main(self):
+ expected = list(range(20))
+ queue = queues.create()
+ for i in range(20):
+ queue.put(i)
+ actual = [queue.get() for _ in range(20)]
+
+ self.assertEqual(actual, expected)
+
+ @unittest.expectedFailure
+ def test_put_timeout(self):
+ queue = queues.create(2)
+ queue.put(None)
+ queue.put(None)
+ with self.assertRaises(queues.QueueFull):
+ queue.put(None, timeout=0.1)
+ queue.get()
+ queue.put(None)
+
+ @unittest.expectedFailure
+ def test_put_nowait(self):
+ queue = queues.create(2)
+ queue.put_nowait(None)
+ queue.put_nowait(None)
+ with self.assertRaises(queues.QueueFull):
+ queue.put_nowait(None)
+ queue.get()
+ queue.put_nowait(None)
+
+ def test_get_timeout(self):
+ queue = queues.create()
+ with self.assertRaises(queues.QueueEmpty):
+ queue.get(timeout=0.1)
+
+ def test_get_nowait(self):
+ queue = queues.create()
+ with self.assertRaises(queues.QueueEmpty):
+ queue.get_nowait()
+
+ def test_put_get_same_interpreter(self):
+ interp = interpreters.create()
+ interp.exec_sync(dedent("""
+ from test.support.interpreters import queues
+ queue = queues.create()
+ orig = b'spam'
+ queue.put(orig)
+ obj = queue.get()
+ assert obj == orig, 'expected: obj == orig'
+ assert obj is not orig, 'expected: obj is not orig'
+ """))
+
+ @unittest.expectedFailure
+ def test_put_get_different_interpreters(self):
+ queue1 = queues.create()
+ queue2 = queues.create()
+ obj1 = b'spam'
+ queue1.put(obj1)
+ out = _run_output(
+ interpreters.create(),
+ dedent(f"""
+ import test.support.interpreters.queue as queues
+ queue1 = queues.Queue({queue1.id})
+ queue2 = queues.Queue({queue2.id})
+ obj = queue1.get()
+ assert obj == b'spam', 'expected: obj == obj1'
+ # When going to another interpreter we get a copy.
+ assert id(obj) != {id(obj1)}, 'expected: obj is not obj1'
+ obj2 = b'eggs'
+ print(id(obj2))
+ queue2.put(obj2)
+ """))
+ obj2 = queue2.get()
+
+ self.assertEqual(obj2, b'eggs')
+ self.assertNotEqual(id(obj2), int(out))
+
+ def test_put_get_different_threads(self):
+ queue1 = queues.create()
+ queue2 = queues.create()
+
+ def f():
+ while True:
+ try:
+ obj = queue1.get(timeout=0.1)
+ break
+ except queues.QueueEmpty:
+ continue
+ queue2.put(obj)
+ t = threading.Thread(target=f)
+ t.start()
+
+ orig = b'spam'
+ queue1.put(orig)
+ obj = queue2.get()
+ t.join()
+
+ self.assertEqual(obj, orig)
+ self.assertIsNot(obj, orig)
+
+
+if __name__ == '__main__':
+ # Test needs to be a package, so we can do relative imports.
+ unittest.main()
diff --git a/Lib/test/test_interpreters/test_stress.py b/Lib/test/test_interpreters/test_stress.py
new file mode 100644
index 0000000..3cc570b
--- /dev/null
+++ b/Lib/test/test_interpreters/test_stress.py
@@ -0,0 +1,38 @@
+import threading
+import unittest
+
+from test import support
+from test.support import import_helper
+from test.support import threading_helper
+# Raise SkipTest if subinterpreters not supported.
+import_helper.import_module('_xxsubinterpreters')
+from test.support import interpreters
+from .utils import TestBase
+
+
+class StressTests(TestBase):
+
+ # In these tests we generally want a lot of interpreters,
+ # but not so many that any test takes too long.
+
+ @support.requires_resource('cpu')
+ def test_create_many_sequential(self):
+ alive = []
+ for _ in range(100):
+ interp = interpreters.create()
+ alive.append(interp)
+
+ @support.requires_resource('cpu')
+ def test_create_many_threaded(self):
+ alive = []
+ def task():
+ interp = interpreters.create()
+ alive.append(interp)
+ threads = (threading.Thread(target=task) for _ in range(200))
+ with threading_helper.start_threads(threads):
+ pass
+
+
+if __name__ == '__main__':
+ # Test needs to be a package, so we can do relative imports.
+ unittest.main()
diff --git a/Lib/test/test_interpreters/utils.py b/Lib/test/test_interpreters/utils.py
new file mode 100644
index 0000000..623c873
--- /dev/null
+++ b/Lib/test/test_interpreters/utils.py
@@ -0,0 +1,73 @@
+import contextlib
+import os
+import threading
+from textwrap import dedent
+import unittest
+
+from test.support import interpreters
+
+
+def _captured_script(script):
+ r, w = os.pipe()
+ indented = script.replace('\n', '\n ')
+ wrapped = dedent(f"""
+ import contextlib
+ with open({w}, 'w', encoding='utf-8') as spipe:
+ with contextlib.redirect_stdout(spipe):
+ {indented}
+ """)
+ return wrapped, open(r, encoding='utf-8')
+
+
+def clean_up_interpreters():
+ for interp in interpreters.list_all():
+ if interp.id == 0: # main
+ continue
+ try:
+ interp.close()
+ except RuntimeError:
+ pass # already destroyed
+
+
+def _run_output(interp, request, channels=None):
+ script, rpipe = _captured_script(request)
+ with rpipe:
+ interp.exec_sync(script, channels=channels)
+ return rpipe.read()
+
+
+@contextlib.contextmanager
+def _running(interp):
+ r, w = os.pipe()
+ def run():
+ interp.exec_sync(dedent(f"""
+ # wait for "signal"
+ with open({r}) as rpipe:
+ rpipe.read()
+ """))
+
+ t = threading.Thread(target=run)
+ t.start()
+
+ yield
+
+ with open(w, 'w') as spipe:
+ spipe.write('done')
+ t.join()
+
+
+class TestBase(unittest.TestCase):
+
+ def pipe(self):
+ def ensure_closed(fd):
+ try:
+ os.close(fd)
+ except OSError:
+ pass
+ r, w = os.pipe()
+ self.addCleanup(lambda: ensure_closed(r))
+ self.addCleanup(lambda: ensure_closed(w))
+ return r, w
+
+ def tearDown(self):
+ clean_up_interpreters()