summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
authorEric Snow <ericsnowcurrently@gmail.com>2024-04-11 00:37:01 (GMT)
committerGitHub <noreply@github.com>2024-04-11 00:37:01 (GMT)
commit993c3cca16ed00a0bfe467f7f26ac4f5f6dfb24c (patch)
tree765b64249a8406226b300c9fc8500baa1afec746 /Lib/test
parent0cc71bde001950d3634c235e2b0d24cda6ce7dce (diff)
downloadcpython-993c3cca16ed00a0bfe467f7f26ac4f5f6dfb24c.zip
cpython-993c3cca16ed00a0bfe467f7f26ac4f5f6dfb24c.tar.gz
cpython-993c3cca16ed00a0bfe467f7f26ac4f5f6dfb24c.tar.bz2
gh-76785: Add More Tests to test_interpreters.test_api (gh-117662)
In addition to the increase test coverage, this is a precursor to sorting out how we handle interpreters created directly via the C-API.
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/support/interpreters/__init__.py7
-rw-r--r--Lib/test/test__xxinterpchannels.py111
-rw-r--r--Lib/test/test__xxsubinterpreters.py338
-rw-r--r--Lib/test/test_capi/test_misc.py8
-rw-r--r--Lib/test/test_interpreters/test_api.py540
-rw-r--r--Lib/test/test_interpreters/utils.py523
6 files changed, 1251 insertions, 276 deletions
diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py
index 8be4ee7..60323c9 100644
--- a/Lib/test/support/interpreters/__init__.py
+++ b/Lib/test/support/interpreters/__init__.py
@@ -79,18 +79,19 @@ def create():
def list_all():
"""Return all existing interpreters."""
- return [Interpreter(id) for id in _interpreters.list_all()]
+ return [Interpreter(id)
+ for id, in _interpreters.list_all()]
def get_current():
"""Return the currently running interpreter."""
- id = _interpreters.get_current()
+ id, = _interpreters.get_current()
return Interpreter(id)
def get_main():
"""Return the main interpreter."""
- id = _interpreters.get_main()
+ id, = _interpreters.get_main()
return Interpreter(id)
diff --git a/Lib/test/test__xxinterpchannels.py b/Lib/test/test__xxinterpchannels.py
index c5d29bd..3db0cb7 100644
--- a/Lib/test/test__xxinterpchannels.py
+++ b/Lib/test/test__xxinterpchannels.py
@@ -9,7 +9,7 @@ import unittest
from test.support import import_helper
from test.test__xxsubinterpreters import (
- interpreters,
+ _interpreters,
_run_output,
clean_up_interpreters,
)
@@ -49,14 +49,15 @@ def run_interp(id, source, **shared):
def _run_interp(id, source, shared, _mainns={}):
source = dedent(source)
- main = interpreters.get_main()
+ main, *_ = _interpreters.get_main()
if main == id:
- if interpreters.get_current() != main:
+ cur, *_ = _interpreters.get_current()
+ if cur != main:
raise RuntimeError
# XXX Run a func?
exec(source, _mainns)
else:
- interpreters.run_string(id, source, shared)
+ _interpreters.run_string(id, source, shared)
class Interpreter(namedtuple('Interpreter', 'name id')):
@@ -71,7 +72,7 @@ class Interpreter(namedtuple('Interpreter', 'name id')):
raise NotImplementedError
def __new__(cls, name=None, id=None):
- main = interpreters.get_main()
+ main, *_ = _interpreters.get_main()
if id == main:
if not name:
name = 'main'
@@ -89,7 +90,7 @@ class Interpreter(namedtuple('Interpreter', 'name id')):
name = 'main'
id = main
else:
- id = interpreters.create()
+ id = _interpreters.create()
self = super().__new__(cls, name, id)
return self
@@ -370,7 +371,7 @@ class ChannelTests(TestBase):
self.assertEqual(set(after) - set(before), {id1, id2, id3})
def test_ids_global(self):
- id1 = interpreters.create()
+ id1 = _interpreters.create()
out = _run_output(id1, dedent("""
import _xxinterpchannels as _channels
cid = _channels.create()
@@ -378,7 +379,7 @@ class ChannelTests(TestBase):
"""))
cid1 = int(out.strip())
- id2 = interpreters.create()
+ id2 = _interpreters.create()
out = _run_output(id2, dedent("""
import _xxinterpchannels as _channels
cid = _channels.create()
@@ -390,7 +391,7 @@ class ChannelTests(TestBase):
def test_channel_list_interpreters_none(self):
"""Test listing interpreters for a channel with no associations."""
- # Test for channel with no associated interpreters.
+ # Test for channel with no associated _interpreters.
cid = channels.create()
send_interps = channels.list_interpreters(cid, send=True)
recv_interps = channels.list_interpreters(cid, send=False)
@@ -398,8 +399,8 @@ class ChannelTests(TestBase):
self.assertEqual(recv_interps, [])
def test_channel_list_interpreters_basic(self):
- """Test basic listing channel interpreters."""
- interp0 = interpreters.get_main()
+ """Test basic listing channel _interpreters."""
+ interp0, *_ = _interpreters.get_main()
cid = channels.create()
channels.send(cid, "send", blocking=False)
# Test for a channel that has one end associated to an interpreter.
@@ -408,7 +409,7 @@ class ChannelTests(TestBase):
self.assertEqual(send_interps, [interp0])
self.assertEqual(recv_interps, [])
- interp1 = interpreters.create()
+ interp1 = _interpreters.create()
_run_output(interp1, dedent(f"""
import _xxinterpchannels as _channels
obj = _channels.recv({cid})
@@ -421,10 +422,10 @@ class ChannelTests(TestBase):
def test_channel_list_interpreters_multiple(self):
"""Test listing interpreters for a channel with many associations."""
- interp0 = interpreters.get_main()
- interp1 = interpreters.create()
- interp2 = interpreters.create()
- interp3 = interpreters.create()
+ interp0, *_ = _interpreters.get_main()
+ interp1 = _interpreters.create()
+ interp2 = _interpreters.create()
+ interp3 = _interpreters.create()
cid = channels.create()
channels.send(cid, "send", blocking=False)
@@ -447,8 +448,8 @@ class ChannelTests(TestBase):
def test_channel_list_interpreters_destroyed(self):
"""Test listing channel interpreters with a destroyed interpreter."""
- interp0 = interpreters.get_main()
- interp1 = interpreters.create()
+ interp0, *_ = _interpreters.get_main()
+ interp1 = _interpreters.create()
cid = channels.create()
channels.send(cid, "send", blocking=False)
_run_output(interp1, dedent(f"""
@@ -461,7 +462,7 @@ class ChannelTests(TestBase):
self.assertEqual(send_interps, [interp0])
self.assertEqual(recv_interps, [interp1])
- interpreters.destroy(interp1)
+ _interpreters.destroy(interp1)
# Destroyed interpreter should not be listed.
send_interps = channels.list_interpreters(cid, send=True)
recv_interps = channels.list_interpreters(cid, send=False)
@@ -472,9 +473,9 @@ class ChannelTests(TestBase):
"""Test listing channel interpreters with a released channel."""
# Set up one channel with main interpreter on the send end and two
# subinterpreters on the receive end.
- interp0 = interpreters.get_main()
- interp1 = interpreters.create()
- interp2 = interpreters.create()
+ interp0, *_ = _interpreters.get_main()
+ interp1 = _interpreters.create()
+ interp2 = _interpreters.create()
cid = channels.create()
channels.send(cid, "data", blocking=False)
_run_output(interp1, dedent(f"""
@@ -494,7 +495,7 @@ class ChannelTests(TestBase):
# Release the main interpreter from the send end.
channels.release(cid, send=True)
- # Send end should have no associated interpreters.
+ # Send end should have no associated _interpreters.
send_interps = channels.list_interpreters(cid, send=True)
recv_interps = channels.list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 0)
@@ -513,8 +514,8 @@ class ChannelTests(TestBase):
def test_channel_list_interpreters_closed(self):
"""Test listing channel interpreters with a closed channel."""
- interp0 = interpreters.get_main()
- interp1 = interpreters.create()
+ interp0, *_ = _interpreters.get_main()
+ interp1 = _interpreters.create()
cid = channels.create()
# Put something in the channel so that it's not empty.
channels.send(cid, "send", blocking=False)
@@ -535,8 +536,8 @@ class ChannelTests(TestBase):
def test_channel_list_interpreters_closed_send_end(self):
"""Test listing channel interpreters with a channel's send end closed."""
- interp0 = interpreters.get_main()
- interp1 = interpreters.create()
+ interp0, *_ = _interpreters.get_main()
+ interp1 = _interpreters.create()
cid = channels.create()
# Put something in the channel so that it's not empty.
channels.send(cid, "send", blocking=False)
@@ -589,9 +590,9 @@ class ChannelTests(TestBase):
def test_run_string_arg_unresolved(self):
cid = channels.create()
- interp = interpreters.create()
+ interp = _interpreters.create()
- interpreters.set___main___attrs(interp, dict(cid=cid.send))
+ _interpreters.set___main___attrs(interp, dict(cid=cid.send))
out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(cid.end)
@@ -609,7 +610,7 @@ class ChannelTests(TestBase):
def test_run_string_arg_resolved(self):
cid = channels.create()
cid = channels._channel_id(cid, _resolve=True)
- interp = interpreters.create()
+ interp = _interpreters.create()
out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
@@ -635,7 +636,7 @@ class ChannelTests(TestBase):
self.assertIsNot(obj, orig)
def test_send_recv_same_interpreter(self):
- id1 = interpreters.create()
+ id1 = _interpreters.create()
out = _run_output(id1, dedent("""
import _xxinterpchannels as _channels
cid = _channels.create()
@@ -648,7 +649,7 @@ class ChannelTests(TestBase):
def test_send_recv_different_interpreters(self):
cid = channels.create()
- id1 = interpreters.create()
+ id1 = _interpreters.create()
out = _run_output(id1, dedent(f"""
import _xxinterpchannels as _channels
_channels.send({cid}, b'spam', blocking=False)
@@ -674,7 +675,7 @@ class ChannelTests(TestBase):
def test_send_recv_different_interpreters_and_threads(self):
cid = channels.create()
- id1 = interpreters.create()
+ id1 = _interpreters.create()
out = None
def f():
@@ -737,12 +738,12 @@ class ChannelTests(TestBase):
def test_recv_sending_interp_destroyed(self):
with self.subTest('closed'):
cid1 = channels.create()
- interp = interpreters.create()
- interpreters.run_string(interp, dedent(f"""
+ interp = _interpreters.create()
+ _interpreters.run_string(interp, dedent(f"""
import _xxinterpchannels as _channels
_channels.send({cid1}, b'spam', blocking=False)
"""))
- interpreters.destroy(interp)
+ _interpreters.destroy(interp)
with self.assertRaisesRegex(RuntimeError,
f'channel {cid1} is closed'):
@@ -750,13 +751,13 @@ class ChannelTests(TestBase):
del cid1
with self.subTest('still open'):
cid2 = channels.create()
- interp = interpreters.create()
- interpreters.run_string(interp, dedent(f"""
+ interp = _interpreters.create()
+ _interpreters.run_string(interp, dedent(f"""
import _xxinterpchannels as _channels
_channels.send({cid2}, b'spam', blocking=False)
"""))
channels.send(cid2, b'eggs', blocking=False)
- interpreters.destroy(interp)
+ _interpreters.destroy(interp)
channels.recv(cid2)
with self.assertRaisesRegex(RuntimeError,
@@ -1010,24 +1011,24 @@ class ChannelTests(TestBase):
def test_close_multiple_users(self):
cid = channels.create()
- id1 = interpreters.create()
- id2 = interpreters.create()
- interpreters.run_string(id1, dedent(f"""
+ id1 = _interpreters.create()
+ id2 = _interpreters.create()
+ _interpreters.run_string(id1, dedent(f"""
import _xxinterpchannels as _channels
_channels.send({cid}, b'spam', blocking=False)
"""))
- interpreters.run_string(id2, dedent(f"""
+ _interpreters.run_string(id2, dedent(f"""
import _xxinterpchannels as _channels
_channels.recv({cid})
"""))
channels.close(cid)
- excsnap = interpreters.run_string(id1, dedent(f"""
+ excsnap = _interpreters.run_string(id1, dedent(f"""
_channels.send({cid}, b'spam')
"""))
self.assertEqual(excsnap.type.__name__, 'ChannelClosedError')
- excsnap = interpreters.run_string(id2, dedent(f"""
+ excsnap = _interpreters.run_string(id2, dedent(f"""
_channels.send({cid}, b'spam')
"""))
self.assertEqual(excsnap.type.__name__, 'ChannelClosedError')
@@ -1154,8 +1155,8 @@ class ChannelTests(TestBase):
def test_close_by_unassociated_interp(self):
cid = channels.create()
channels.send(cid, b'spam', blocking=False)
- interp = interpreters.create()
- interpreters.run_string(interp, dedent(f"""
+ interp = _interpreters.create()
+ _interpreters.run_string(interp, dedent(f"""
import _xxinterpchannels as _channels
_channels.close({cid}, force=True)
"""))
@@ -1251,9 +1252,9 @@ class ChannelReleaseTests(TestBase):
def test_multiple_users(self):
cid = channels.create()
- id1 = interpreters.create()
- id2 = interpreters.create()
- interpreters.run_string(id1, dedent(f"""
+ id1 = _interpreters.create()
+ id2 = _interpreters.create()
+ _interpreters.run_string(id1, dedent(f"""
import _xxinterpchannels as _channels
_channels.send({cid}, b'spam', blocking=False)
"""))
@@ -1263,7 +1264,7 @@ class ChannelReleaseTests(TestBase):
_channels.release({cid})
print(repr(obj))
"""))
- interpreters.run_string(id1, dedent(f"""
+ _interpreters.run_string(id1, dedent(f"""
_channels.release({cid})
"""))
@@ -1310,8 +1311,8 @@ class ChannelReleaseTests(TestBase):
def test_by_unassociated_interp(self):
cid = channels.create()
channels.send(cid, b'spam', blocking=False)
- interp = interpreters.create()
- interpreters.run_string(interp, dedent(f"""
+ interp = _interpreters.create()
+ _interpreters.run_string(interp, dedent(f"""
import _xxinterpchannels as _channels
_channels.release({cid})
"""))
@@ -1325,8 +1326,8 @@ class ChannelReleaseTests(TestBase):
def test_close_if_unassociated(self):
# XXX Something's not right with this test...
cid = channels.create()
- interp = interpreters.create()
- interpreters.run_string(interp, dedent(f"""
+ interp = _interpreters.create()
+ _interpreters.run_string(interp, dedent(f"""
import _xxinterpchannels as _channels
obj = _channels.send({cid}, b'spam', blocking=False)
_channels.release({cid})
diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py
index 841077a..c8c964f 100644
--- a/Lib/test/test__xxsubinterpreters.py
+++ b/Lib/test/test__xxsubinterpreters.py
@@ -13,7 +13,7 @@ from test.support import os_helper
from test.support import script_helper
-interpreters = import_helper.import_module('_xxsubinterpreters')
+_interpreters = import_helper.import_module('_xxsubinterpreters')
_testinternalcapi = import_helper.import_module('_testinternalcapi')
from _xxsubinterpreters import InterpreterNotFoundError
@@ -36,7 +36,7 @@ def _captured_script(script):
def _run_output(interp, request):
script, rpipe = _captured_script(request)
with rpipe:
- interpreters.run_string(interp, script)
+ _interpreters.run_string(interp, script)
return rpipe.read()
@@ -47,7 +47,7 @@ def _wait_for_interp_to_run(interp, timeout=None):
if timeout is None:
timeout = support.SHORT_TIMEOUT
for _ in support.sleeping_retry(timeout, error=False):
- if interpreters.is_running(interp):
+ if _interpreters.is_running(interp):
break
else:
raise RuntimeError('interp is not running')
@@ -57,7 +57,7 @@ def _wait_for_interp_to_run(interp, timeout=None):
def _running(interp):
r, w = os.pipe()
def run():
- interpreters.run_string(interp, dedent(f"""
+ _interpreters.run_string(interp, dedent(f"""
# wait for "signal"
with open({r}, encoding="utf-8") as rpipe:
rpipe.read()
@@ -75,12 +75,12 @@ def _running(interp):
def clean_up_interpreters():
- for id in interpreters.list_all():
+ for id, *_ in _interpreters.list_all():
if id == 0: # main
continue
try:
- interpreters.destroy(id)
- except interpreters.InterpreterError:
+ _interpreters.destroy(id)
+ except _interpreters.InterpreterError:
pass # already destroyed
@@ -112,7 +112,7 @@ class IsShareableTests(unittest.TestCase):
for obj in shareables:
with self.subTest(obj):
self.assertTrue(
- interpreters.is_shareable(obj))
+ _interpreters.is_shareable(obj))
def test_not_shareable(self):
class Cheese:
@@ -141,7 +141,7 @@ class IsShareableTests(unittest.TestCase):
for obj in not_shareables:
with self.subTest(repr(obj)):
self.assertFalse(
- interpreters.is_shareable(obj))
+ _interpreters.is_shareable(obj))
class ShareableTypeTests(unittest.TestCase):
@@ -230,7 +230,7 @@ class ModuleTests(TestBase):
def test_import_in_interpreter(self):
_run_output(
- interpreters.create(),
+ _interpreters.create(),
'import _xxsubinterpreters as _interpreters',
)
@@ -241,45 +241,45 @@ class ModuleTests(TestBase):
class ListAllTests(TestBase):
def test_initial(self):
- main = interpreters.get_main()
- ids = interpreters.list_all()
+ main, *_ = _interpreters.get_main()
+ ids = [id for id, *_ in _interpreters.list_all()]
self.assertEqual(ids, [main])
def test_after_creating(self):
- main = interpreters.get_main()
- first = interpreters.create()
- second = interpreters.create()
- ids = interpreters.list_all()
+ main, *_ = _interpreters.get_main()
+ first = _interpreters.create()
+ second = _interpreters.create()
+ ids = [id for id, *_ in _interpreters.list_all()]
self.assertEqual(ids, [main, first, second])
def test_after_destroying(self):
- main = interpreters.get_main()
- first = interpreters.create()
- second = interpreters.create()
- interpreters.destroy(first)
- ids = interpreters.list_all()
+ main, *_ = _interpreters.get_main()
+ first = _interpreters.create()
+ second = _interpreters.create()
+ _interpreters.destroy(first)
+ ids = [id for id, *_ in _interpreters.list_all()]
self.assertEqual(ids, [main, second])
class GetCurrentTests(TestBase):
def test_main(self):
- main = interpreters.get_main()
- cur = interpreters.get_current()
+ main, *_ = _interpreters.get_main()
+ cur, *_ = _interpreters.get_current()
self.assertEqual(cur, main)
self.assertIsInstance(cur, int)
def test_subinterpreter(self):
- main = interpreters.get_main()
- interp = interpreters.create()
+ main, *_ = _interpreters.get_main()
+ interp = _interpreters.create()
out = _run_output(interp, dedent("""
import _xxsubinterpreters as _interpreters
- cur = _interpreters.get_current()
+ cur, *_ = _interpreters.get_current()
print(cur)
assert isinstance(cur, int)
"""))
cur = int(out.strip())
- _, expected = interpreters.list_all()
+ _, expected = [id for id, *_ in _interpreters.list_all()]
self.assertEqual(cur, expected)
self.assertNotEqual(cur, main)
@@ -287,17 +287,17 @@ class GetCurrentTests(TestBase):
class GetMainTests(TestBase):
def test_from_main(self):
- [expected] = interpreters.list_all()
- main = interpreters.get_main()
+ [expected] = [id for id, *_ in _interpreters.list_all()]
+ main, *_ = _interpreters.get_main()
self.assertEqual(main, expected)
self.assertIsInstance(main, int)
def test_from_subinterpreter(self):
- [expected] = interpreters.list_all()
- interp = interpreters.create()
+ [expected] = [id for id, *_ in _interpreters.list_all()]
+ interp = _interpreters.create()
out = _run_output(interp, dedent("""
import _xxsubinterpreters as _interpreters
- main = _interpreters.get_main()
+ main, *_ = _interpreters.get_main()
print(main)
assert isinstance(main, int)
"""))
@@ -308,20 +308,20 @@ class GetMainTests(TestBase):
class IsRunningTests(TestBase):
def test_main(self):
- main = interpreters.get_main()
- self.assertTrue(interpreters.is_running(main))
+ main, *_ = _interpreters.get_main()
+ self.assertTrue(_interpreters.is_running(main))
@unittest.skip('Fails on FreeBSD')
def test_subinterpreter(self):
- interp = interpreters.create()
- self.assertFalse(interpreters.is_running(interp))
+ interp = _interpreters.create()
+ self.assertFalse(_interpreters.is_running(interp))
with _running(interp):
- self.assertTrue(interpreters.is_running(interp))
- self.assertFalse(interpreters.is_running(interp))
+ self.assertTrue(_interpreters.is_running(interp))
+ self.assertFalse(_interpreters.is_running(interp))
def test_from_subinterpreter(self):
- interp = interpreters.create()
+ interp = _interpreters.create()
out = _run_output(interp, dedent(f"""
import _xxsubinterpreters as _interpreters
if _interpreters.is_running({interp}):
@@ -332,34 +332,35 @@ class IsRunningTests(TestBase):
self.assertEqual(out.strip(), 'True')
def test_already_destroyed(self):
- interp = interpreters.create()
- interpreters.destroy(interp)
+ interp = _interpreters.create()
+ _interpreters.destroy(interp)
with self.assertRaises(InterpreterNotFoundError):
- interpreters.is_running(interp)
+ _interpreters.is_running(interp)
def test_does_not_exist(self):
with self.assertRaises(InterpreterNotFoundError):
- interpreters.is_running(1_000_000)
+ _interpreters.is_running(1_000_000)
def test_bad_id(self):
with self.assertRaises(ValueError):
- interpreters.is_running(-1)
+ _interpreters.is_running(-1)
class CreateTests(TestBase):
def test_in_main(self):
- id = interpreters.create()
+ id = _interpreters.create()
self.assertIsInstance(id, int)
- self.assertIn(id, interpreters.list_all())
+ after = [id for id, *_ in _interpreters.list_all()]
+ self.assertIn(id, after)
@unittest.skip('enable this test when working on pystate.c')
def test_unique_id(self):
seen = set()
for _ in range(100):
- id = interpreters.create()
- interpreters.destroy(id)
+ id = _interpreters.create()
+ _interpreters.destroy(id)
seen.add(id)
self.assertEqual(len(seen), 100)
@@ -369,7 +370,7 @@ class CreateTests(TestBase):
id = None
def f():
nonlocal id
- id = interpreters.create()
+ id = _interpreters.create()
lock.acquire()
lock.release()
@@ -377,11 +378,12 @@ class CreateTests(TestBase):
with lock:
t.start()
t.join()
- self.assertIn(id, interpreters.list_all())
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertIn(id, after)
def test_in_subinterpreter(self):
- main, = interpreters.list_all()
- id1 = interpreters.create()
+ main, = [id for id, *_ in _interpreters.list_all()]
+ id1 = _interpreters.create()
out = _run_output(id1, dedent("""
import _xxsubinterpreters as _interpreters
id = _interpreters.create()
@@ -390,11 +392,12 @@ class CreateTests(TestBase):
"""))
id2 = int(out.strip())
- self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(after, {main, id1, id2})
def test_in_threaded_subinterpreter(self):
- main, = interpreters.list_all()
- id1 = interpreters.create()
+ main, = [id for id, *_ in _interpreters.list_all()]
+ id1 = _interpreters.create()
id2 = None
def f():
nonlocal id2
@@ -409,144 +412,155 @@ class CreateTests(TestBase):
t.start()
t.join()
- self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(after, {main, id1, id2})
def test_after_destroy_all(self):
- before = set(interpreters.list_all())
+ before = set(id for id, *_ in _interpreters.list_all())
# Create 3 subinterpreters.
ids = []
for _ in range(3):
- id = interpreters.create()
+ id = _interpreters.create()
ids.append(id)
# Now destroy them.
for id in ids:
- interpreters.destroy(id)
+ _interpreters.destroy(id)
# Finally, create another.
- id = interpreters.create()
- self.assertEqual(set(interpreters.list_all()), before | {id})
+ id = _interpreters.create()
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(after, before | {id})
def test_after_destroy_some(self):
- before = set(interpreters.list_all())
+ before = set(id for id, *_ in _interpreters.list_all())
# Create 3 subinterpreters.
- id1 = interpreters.create()
- id2 = interpreters.create()
- id3 = interpreters.create()
+ id1 = _interpreters.create()
+ id2 = _interpreters.create()
+ id3 = _interpreters.create()
# Now destroy 2 of them.
- interpreters.destroy(id1)
- interpreters.destroy(id3)
+ _interpreters.destroy(id1)
+ _interpreters.destroy(id3)
# Finally, create another.
- id = interpreters.create()
- self.assertEqual(set(interpreters.list_all()), before | {id, id2})
+ id = _interpreters.create()
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(after, before | {id, id2})
class DestroyTests(TestBase):
def test_one(self):
- id1 = interpreters.create()
- id2 = interpreters.create()
- id3 = interpreters.create()
- self.assertIn(id2, interpreters.list_all())
- interpreters.destroy(id2)
- self.assertNotIn(id2, interpreters.list_all())
- self.assertIn(id1, interpreters.list_all())
- self.assertIn(id3, interpreters.list_all())
+ id1 = _interpreters.create()
+ id2 = _interpreters.create()
+ id3 = _interpreters.create()
+ before = set(id for id, *_ in _interpreters.list_all())
+ self.assertIn(id2, before)
+
+ _interpreters.destroy(id2)
+
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertNotIn(id2, after)
+ self.assertIn(id1, after)
+ self.assertIn(id3, after)
def test_all(self):
- before = set(interpreters.list_all())
+ initial = set(id for id, *_ in _interpreters.list_all())
ids = set()
for _ in range(3):
- id = interpreters.create()
+ id = _interpreters.create()
ids.add(id)
- self.assertEqual(set(interpreters.list_all()), before | ids)
+ before = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(before, initial | ids)
for id in ids:
- interpreters.destroy(id)
- self.assertEqual(set(interpreters.list_all()), before)
+ _interpreters.destroy(id)
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(after, initial)
def test_main(self):
- main, = interpreters.list_all()
- with self.assertRaises(interpreters.InterpreterError):
- interpreters.destroy(main)
+ main, = [id for id, *_ in _interpreters.list_all()]
+ with self.assertRaises(_interpreters.InterpreterError):
+ _interpreters.destroy(main)
def f():
- with self.assertRaises(interpreters.InterpreterError):
- interpreters.destroy(main)
+ with self.assertRaises(_interpreters.InterpreterError):
+ _interpreters.destroy(main)
t = threading.Thread(target=f)
t.start()
t.join()
def test_already_destroyed(self):
- id = interpreters.create()
- interpreters.destroy(id)
+ id = _interpreters.create()
+ _interpreters.destroy(id)
with self.assertRaises(InterpreterNotFoundError):
- interpreters.destroy(id)
+ _interpreters.destroy(id)
def test_does_not_exist(self):
with self.assertRaises(InterpreterNotFoundError):
- interpreters.destroy(1_000_000)
+ _interpreters.destroy(1_000_000)
def test_bad_id(self):
with self.assertRaises(ValueError):
- interpreters.destroy(-1)
+ _interpreters.destroy(-1)
def test_from_current(self):
- main, = interpreters.list_all()
- id = interpreters.create()
+ main, = [id for id, *_ in _interpreters.list_all()]
+ id = _interpreters.create()
script = dedent(f"""
import _xxsubinterpreters as _interpreters
try:
_interpreters.destroy({id})
- except interpreters.InterpreterError:
+ except _interpreters.InterpreterError:
pass
""")
- interpreters.run_string(id, script)
- self.assertEqual(set(interpreters.list_all()), {main, id})
+ _interpreters.run_string(id, script)
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(after, {main, id})
def test_from_sibling(self):
- main, = interpreters.list_all()
- id1 = interpreters.create()
- id2 = interpreters.create()
+ main, = [id for id, *_ in _interpreters.list_all()]
+ id1 = _interpreters.create()
+ id2 = _interpreters.create()
script = dedent(f"""
import _xxsubinterpreters as _interpreters
_interpreters.destroy({id2})
""")
- interpreters.run_string(id1, script)
+ _interpreters.run_string(id1, script)
- self.assertEqual(set(interpreters.list_all()), {main, id1})
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(after, {main, id1})
def test_from_other_thread(self):
- id = interpreters.create()
+ id = _interpreters.create()
def f():
- interpreters.destroy(id)
+ _interpreters.destroy(id)
t = threading.Thread(target=f)
t.start()
t.join()
def test_still_running(self):
- main, = interpreters.list_all()
- interp = interpreters.create()
+ main, = [id for id, *_ in _interpreters.list_all()]
+ interp = _interpreters.create()
with _running(interp):
- self.assertTrue(interpreters.is_running(interp),
+ self.assertTrue(_interpreters.is_running(interp),
msg=f"Interp {interp} should be running before destruction.")
- with self.assertRaises(interpreters.InterpreterError,
+ with self.assertRaises(_interpreters.InterpreterError,
msg=f"Should not be able to destroy interp {interp} while it's still running."):
- interpreters.destroy(interp)
- self.assertTrue(interpreters.is_running(interp))
+ _interpreters.destroy(interp)
+ self.assertTrue(_interpreters.is_running(interp))
class RunStringTests(TestBase):
def setUp(self):
super().setUp()
- self.id = interpreters.create()
+ self.id = _interpreters.create()
def test_success(self):
script, file = _captured_script('print("it worked!", end="")')
with file:
- interpreters.run_string(self.id, script)
+ _interpreters.run_string(self.id, script)
out = file.read()
self.assertEqual(out, 'it worked!')
@@ -555,7 +569,7 @@ class RunStringTests(TestBase):
script, file = _captured_script('print("it worked!", end="")')
with file:
def f():
- interpreters.run_string(self.id, script)
+ _interpreters.run_string(self.id, script)
t = threading.Thread(target=f)
t.start()
@@ -565,7 +579,7 @@ class RunStringTests(TestBase):
self.assertEqual(out, 'it worked!')
def test_create_thread(self):
- subinterp = interpreters.create()
+ subinterp = _interpreters.create()
script, file = _captured_script("""
import threading
def f():
@@ -576,7 +590,7 @@ class RunStringTests(TestBase):
t.join()
""")
with file:
- interpreters.run_string(subinterp, script)
+ _interpreters.run_string(subinterp, script)
out = file.read()
self.assertEqual(out, 'it worked!')
@@ -584,7 +598,7 @@ class RunStringTests(TestBase):
def test_create_daemon_thread(self):
with self.subTest('isolated'):
expected = 'spam spam spam spam spam'
- subinterp = interpreters.create('isolated')
+ subinterp = _interpreters.create('isolated')
script, file = _captured_script(f"""
import threading
def f():
@@ -598,13 +612,13 @@ class RunStringTests(TestBase):
print('{expected}', end='')
""")
with file:
- interpreters.run_string(subinterp, script)
+ _interpreters.run_string(subinterp, script)
out = file.read()
self.assertEqual(out, expected)
with self.subTest('not isolated'):
- subinterp = interpreters.create('legacy')
+ subinterp = _interpreters.create('legacy')
script, file = _captured_script("""
import threading
def f():
@@ -615,13 +629,13 @@ class RunStringTests(TestBase):
t.join()
""")
with file:
- interpreters.run_string(subinterp, script)
+ _interpreters.run_string(subinterp, script)
out = file.read()
self.assertEqual(out, 'it worked!')
def test_shareable_types(self):
- interp = interpreters.create()
+ interp = _interpreters.create()
objects = [
None,
'spam',
@@ -630,15 +644,15 @@ class RunStringTests(TestBase):
]
for obj in objects:
with self.subTest(obj):
- interpreters.set___main___attrs(interp, dict(obj=obj))
- interpreters.run_string(
+ _interpreters.set___main___attrs(interp, dict(obj=obj))
+ _interpreters.run_string(
interp,
f'assert(obj == {obj!r})',
)
def test_os_exec(self):
expected = 'spam spam spam spam spam'
- subinterp = interpreters.create()
+ subinterp = _interpreters.create()
script, file = _captured_script(f"""
import os, sys
try:
@@ -647,7 +661,7 @@ class RunStringTests(TestBase):
print('{expected}', end='')
""")
with file:
- interpreters.run_string(subinterp, script)
+ _interpreters.run_string(subinterp, script)
out = file.read()
self.assertEqual(out, expected)
@@ -668,7 +682,7 @@ class RunStringTests(TestBase):
with open('{file.name}', 'w', encoding='utf-8') as out:
out.write('{expected}')
""")
- interpreters.run_string(self.id, script)
+ _interpreters.run_string(self.id, script)
file.seek(0)
content = file.read()
@@ -676,31 +690,31 @@ class RunStringTests(TestBase):
def test_already_running(self):
with _running(self.id):
- with self.assertRaises(interpreters.InterpreterError):
- interpreters.run_string(self.id, 'print("spam")')
+ with self.assertRaises(_interpreters.InterpreterError):
+ _interpreters.run_string(self.id, 'print("spam")')
def test_does_not_exist(self):
id = 0
- while id in interpreters.list_all():
+ while id in set(id for id, *_ in _interpreters.list_all()):
id += 1
with self.assertRaises(InterpreterNotFoundError):
- interpreters.run_string(id, 'print("spam")')
+ _interpreters.run_string(id, 'print("spam")')
def test_error_id(self):
with self.assertRaises(ValueError):
- interpreters.run_string(-1, 'print("spam")')
+ _interpreters.run_string(-1, 'print("spam")')
def test_bad_id(self):
with self.assertRaises(TypeError):
- interpreters.run_string('spam', 'print("spam")')
+ _interpreters.run_string('spam', 'print("spam")')
def test_bad_script(self):
with self.assertRaises(TypeError):
- interpreters.run_string(self.id, 10)
+ _interpreters.run_string(self.id, 10)
def test_bytes_for_script(self):
with self.assertRaises(TypeError):
- interpreters.run_string(self.id, b'print("spam")')
+ _interpreters.run_string(self.id, b'print("spam")')
def test_with_shared(self):
r, w = os.pipe()
@@ -721,8 +735,8 @@ class RunStringTests(TestBase):
with open({w}, 'wb') as chan:
pickle.dump(ns, chan)
""")
- interpreters.set___main___attrs(self.id, shared)
- interpreters.run_string(self.id, script)
+ _interpreters.set___main___attrs(self.id, shared)
+ _interpreters.run_string(self.id, script)
with open(r, 'rb') as chan:
ns = pickle.load(chan)
@@ -732,7 +746,7 @@ class RunStringTests(TestBase):
self.assertIsNone(ns['cheddar'])
def test_shared_overwrites(self):
- interpreters.run_string(self.id, dedent("""
+ _interpreters.run_string(self.id, dedent("""
spam = 'eggs'
ns1 = dict(vars())
del ns1['__builtins__']
@@ -743,8 +757,8 @@ class RunStringTests(TestBase):
ns2 = dict(vars())
del ns2['__builtins__']
""")
- interpreters.set___main___attrs(self.id, shared)
- interpreters.run_string(self.id, script)
+ _interpreters.set___main___attrs(self.id, shared)
+ _interpreters.run_string(self.id, script)
r, w = os.pipe()
script = dedent(f"""
@@ -754,7 +768,7 @@ class RunStringTests(TestBase):
with open({w}, 'wb') as chan:
pickle.dump(ns, chan)
""")
- interpreters.run_string(self.id, script)
+ _interpreters.run_string(self.id, script)
with open(r, 'rb') as chan:
ns = pickle.load(chan)
@@ -775,8 +789,8 @@ class RunStringTests(TestBase):
with open({w}, 'wb') as chan:
pickle.dump(ns, chan)
""")
- interpreters.set___main___attrs(self.id, shared)
- interpreters.run_string(self.id, script)
+ _interpreters.set___main___attrs(self.id, shared)
+ _interpreters.run_string(self.id, script)
with open(r, 'rb') as chan:
ns = pickle.load(chan)
@@ -784,7 +798,7 @@ class RunStringTests(TestBase):
def test_main_reused(self):
r, w = os.pipe()
- interpreters.run_string(self.id, dedent(f"""
+ _interpreters.run_string(self.id, dedent(f"""
spam = True
ns = dict(vars())
@@ -798,7 +812,7 @@ class RunStringTests(TestBase):
ns1 = pickle.load(chan)
r, w = os.pipe()
- interpreters.run_string(self.id, dedent(f"""
+ _interpreters.run_string(self.id, dedent(f"""
eggs = False
ns = dict(vars())
@@ -827,7 +841,7 @@ class RunStringTests(TestBase):
with open({w}, 'wb') as chan:
pickle.dump(ns, chan)
""")
- interpreters.run_string(self.id, script)
+ _interpreters.run_string(self.id, script)
with open(r, 'rb') as chan:
ns = pickle.load(chan)
@@ -872,13 +886,13 @@ class RunFailedTests(TestBase):
def setUp(self):
super().setUp()
- self.id = interpreters.create()
+ self.id = _interpreters.create()
def add_module(self, modname, text):
import tempfile
tempdir = tempfile.mkdtemp()
self.addCleanup(lambda: os_helper.rmtree(tempdir))
- interpreters.run_string(self.id, dedent(f"""
+ _interpreters.run_string(self.id, dedent(f"""
import sys
sys.path.insert(0, {tempdir!r})
"""))
@@ -900,11 +914,11 @@ class RunFailedTests(TestBase):
raise NeverError # never raised
""").format(dedent(text))
if fails:
- err = interpreters.run_string(self.id, script)
+ err = _interpreters.run_string(self.id, script)
self.assertIsNot(err, None)
return err
else:
- err = interpreters.run_string(self.id, script)
+ err = _interpreters.run_string(self.id, script)
self.assertIs(err, None)
return None
except:
@@ -1029,7 +1043,7 @@ class RunFuncTests(TestBase):
def setUp(self):
super().setUp()
- self.id = interpreters.create()
+ self.id = _interpreters.create()
def test_success(self):
r, w = os.pipe()
@@ -1039,8 +1053,8 @@ class RunFuncTests(TestBase):
with open(w, 'w', encoding="utf-8") as spipe:
with contextlib.redirect_stdout(spipe):
print('it worked!', end='')
- interpreters.set___main___attrs(self.id, dict(w=w))
- interpreters.run_func(self.id, script)
+ _interpreters.set___main___attrs(self.id, dict(w=w))
+ _interpreters.run_func(self.id, script)
with open(r, encoding="utf-8") as outfile:
out = outfile.read()
@@ -1056,8 +1070,8 @@ class RunFuncTests(TestBase):
with contextlib.redirect_stdout(spipe):
print('it worked!', end='')
def f():
- interpreters.set___main___attrs(self.id, dict(w=w))
- interpreters.run_func(self.id, script)
+ _interpreters.set___main___attrs(self.id, dict(w=w))
+ _interpreters.run_func(self.id, script)
t = threading.Thread(target=f)
t.start()
t.join()
@@ -1077,8 +1091,8 @@ class RunFuncTests(TestBase):
with contextlib.redirect_stdout(spipe):
print('it worked!', end='')
code = script.__code__
- interpreters.set___main___attrs(self.id, dict(w=w))
- interpreters.run_func(self.id, code)
+ _interpreters.set___main___attrs(self.id, dict(w=w))
+ _interpreters.run_func(self.id, code)
with open(r, encoding="utf-8") as outfile:
out = outfile.read()
@@ -1091,7 +1105,7 @@ class RunFuncTests(TestBase):
assert spam
with self.assertRaises(ValueError):
- interpreters.run_func(self.id, script)
+ _interpreters.run_func(self.id, script)
# XXX This hasn't been fixed yet.
@unittest.expectedFailure
@@ -1099,38 +1113,38 @@ class RunFuncTests(TestBase):
def script():
return 'spam'
with self.assertRaises(ValueError):
- interpreters.run_func(self.id, script)
+ _interpreters.run_func(self.id, script)
def test_args(self):
with self.subTest('args'):
def script(a, b=0):
assert a == b
with self.assertRaises(ValueError):
- interpreters.run_func(self.id, script)
+ _interpreters.run_func(self.id, script)
with self.subTest('*args'):
def script(*args):
assert not args
with self.assertRaises(ValueError):
- interpreters.run_func(self.id, script)
+ _interpreters.run_func(self.id, script)
with self.subTest('**kwargs'):
def script(**kwargs):
assert not kwargs
with self.assertRaises(ValueError):
- interpreters.run_func(self.id, script)
+ _interpreters.run_func(self.id, script)
with self.subTest('kwonly'):
def script(*, spam=True):
assert spam
with self.assertRaises(ValueError):
- interpreters.run_func(self.id, script)
+ _interpreters.run_func(self.id, script)
with self.subTest('posonly'):
def script(spam, /):
assert spam
with self.assertRaises(ValueError):
- interpreters.run_func(self.id, script)
+ _interpreters.run_func(self.id, script)
if __name__ == '__main__':
diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py
index 2f2bf03..35d6a20 100644
--- a/Lib/test/test_capi/test_misc.py
+++ b/Lib/test/test_capi/test_misc.py
@@ -2065,7 +2065,7 @@ class SubinterpreterTest(unittest.TestCase):
_testinternalcapi.get_interp_settings()
raise NotImplementedError('unreachable')
''')
- with self.assertRaises(RuntimeError):
+ with self.assertRaises(_interpreters.InterpreterError):
support.run_in_subinterp_with_config(script, **kwargs)
@unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module")
@@ -2403,7 +2403,7 @@ class InterpreterConfigTests(unittest.TestCase):
continue
if match(config, invalid):
with self.subTest(f'invalid: {config}'):
- with self.assertRaises(RuntimeError):
+ with self.assertRaises(_interpreters.InterpreterError):
check(config)
elif match(config, questionable):
with self.subTest(f'questionable: {config}'):
@@ -2427,7 +2427,7 @@ class InterpreterConfigTests(unittest.TestCase):
with self.subTest('main'):
expected = _interpreters.new_config('legacy')
expected.gil = 'own'
- interpid = _interpreters.get_main()
+ interpid, *_ = _interpreters.get_main()
config = _interpreters.get_config(interpid)
self.assert_ns_equal(config, expected)
@@ -2579,7 +2579,7 @@ class InterpreterIDTests(unittest.TestCase):
def test_linked_lifecycle_initial(self):
is_linked = _testinternalcapi.interpreter_refcount_linked
- get_refcount = _testinternalcapi.get_interpreter_refcount
+ get_refcount, _, _ = self.get_refcount_helpers()
# A new interpreter will start out not linked, with a refcount of 0.
interpid = self.new_interpreter()
diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py
index a326b39..abf66a7 100644
--- a/Lib/test/test_interpreters/test_api.py
+++ b/Lib/test/test_interpreters/test_api.py
@@ -1,6 +1,7 @@
import os
import pickle
-from textwrap import dedent
+import sys
+from textwrap import dedent, indent
import threading
import types
import unittest
@@ -10,8 +11,13 @@ from test.support import import_helper
# Raise SkipTest if subinterpreters not supported.
_interpreters = import_helper.import_module('_xxsubinterpreters')
from test.support import interpreters
-from test.support.interpreters import InterpreterNotFoundError
-from .utils import _captured_script, _run_output, _running, TestBase
+from test.support.interpreters import (
+ InterpreterError, InterpreterNotFoundError, ExecutionFailed,
+)
+from .utils import (
+ _captured_script, _run_output, _running, TestBase,
+ requires_test_modules, _testinternalcapi,
+)
class ModuleTests(TestBase):
@@ -157,6 +163,20 @@ class GetCurrentTests(TestBase):
id2 = id(interp)
self.assertNotEqual(id1, id2)
+ @requires_test_modules
+ def test_created_with_capi(self):
+ last = 0
+ for id, *_ in _interpreters.list_all():
+ last = max(last, id)
+ expected = _testinternalcapi.next_interpreter_id()
+ text = self.run_temp_from_capi(f"""
+ import {interpreters.__name__} as interpreters
+ interp = interpreters.get_current()
+ print(interp.id)
+ """)
+ interpid = eval(text)
+ self.assertEqual(interpid, expected)
+
class ListAllTests(TestBase):
@@ -199,6 +219,33 @@ class ListAllTests(TestBase):
for interp1, interp2 in zip(actual, expected):
self.assertIs(interp1, interp2)
+ def test_created_with_capi(self):
+ mainid, *_ = _interpreters.get_main()
+ interpid1 = _interpreters.create()
+ interpid2 = _interpreters.create()
+ interpid3 = _interpreters.create()
+ interpid4 = interpid3 + 1
+ interpid5 = interpid4 + 1
+ expected = [
+ (mainid,),
+ (interpid1,),
+ (interpid2,),
+ (interpid3,),
+ (interpid4,),
+ (interpid5,),
+ ]
+ expected2 = expected[:-2]
+ text = self.run_temp_from_capi(f"""
+ import {interpreters.__name__} as interpreters
+ interp = interpreters.create()
+ print(
+ [(i.id,) for i in interpreters.list_all()])
+ """)
+ res = eval(text)
+ res2 = [(i.id,) for i in interpreters.list_all()]
+ self.assertEqual(res, expected)
+ self.assertEqual(res2, expected2)
+
class InterpreterObjectTests(TestBase):
@@ -276,6 +323,7 @@ class TestInterpreterIsRunning(TestBase):
main = interpreters.get_main()
self.assertTrue(main.is_running())
+ # XXX Is this still true?
@unittest.skip('Fails on FreeBSD')
def test_subinterpreter(self):
interp = interpreters.create()
@@ -337,6 +385,55 @@ class TestInterpreterIsRunning(TestBase):
interp.exec('t.join()')
self.assertEqual(os.read(r_interp, 1), FINISHED)
+ def test_created_with_capi(self):
+ script = dedent(f"""
+ import {interpreters.__name__} as interpreters
+ interp = interpreters.get_current()
+ print(interp.is_running())
+ """)
+ def parse_results(text):
+ self.assertNotEqual(text, "")
+ try:
+ return eval(text)
+ except Exception:
+ raise Exception(repr(text))
+
+ with self.subTest('running __main__ (from self)'):
+ with self.interpreter_from_capi() as interpid:
+ text = self.run_from_capi(interpid, script, main=True)
+ running = parse_results(text)
+ self.assertTrue(running)
+
+ with self.subTest('running, but not __main__ (from self)'):
+ text = self.run_temp_from_capi(script)
+ running = parse_results(text)
+ self.assertFalse(running)
+
+ with self.subTest('running __main__ (from other)'):
+ with self.interpreter_obj_from_capi() as (interp, interpid):
+ before = interp.is_running()
+ with self.running_from_capi(interpid, main=True):
+ during = interp.is_running()
+ after = interp.is_running()
+ self.assertFalse(before)
+ self.assertTrue(during)
+ self.assertFalse(after)
+
+ with self.subTest('running, but not __main__ (from other)'):
+ with self.interpreter_obj_from_capi() as (interp, interpid):
+ before = interp.is_running()
+ with self.running_from_capi(interpid, main=False):
+ during = interp.is_running()
+ after = interp.is_running()
+ self.assertFalse(before)
+ self.assertFalse(during)
+ self.assertFalse(after)
+
+ with self.subTest('not running (from other)'):
+ with self.interpreter_obj_from_capi() as (interp, _):
+ running = interp.is_running()
+ self.assertFalse(running)
+
class TestInterpreterClose(TestBase):
@@ -364,11 +461,11 @@ class TestInterpreterClose(TestBase):
def test_main(self):
main, = interpreters.list_all()
- with self.assertRaises(interpreters.InterpreterError):
+ with self.assertRaises(InterpreterError):
main.close()
def f():
- with self.assertRaises(interpreters.InterpreterError):
+ with self.assertRaises(InterpreterError):
main.close()
t = threading.Thread(target=f)
@@ -419,12 +516,13 @@ class TestInterpreterClose(TestBase):
t.start()
t.join()
+ # XXX Is this still true?
@unittest.skip('Fails on FreeBSD')
def test_still_running(self):
main, = interpreters.list_all()
interp = interpreters.create()
with _running(interp):
- with self.assertRaises(interpreters.InterpreterError):
+ with self.assertRaises(InterpreterError):
interp.close()
self.assertTrue(interp.is_running())
@@ -459,6 +557,52 @@ class TestInterpreterClose(TestBase):
self.assertEqual(os.read(r_interp, 1), FINISHED)
+ def test_created_with_capi(self):
+ script = dedent(f"""
+ import {interpreters.__name__} as interpreters
+ interp = interpreters.get_current()
+ interp.close()
+ """)
+
+ with self.subTest('running __main__ (from self)'):
+ with self.interpreter_from_capi() as interpid:
+ with self.assertRaisesRegex(ExecutionFailed,
+ 'InterpreterError.*current'):
+ self.run_from_capi(interpid, script, main=True)
+
+ with self.subTest('running, but not __main__ (from self)'):
+ with self.assertRaisesRegex(ExecutionFailed,
+ 'InterpreterError.*current'):
+ self.run_temp_from_capi(script)
+
+ with self.subTest('running __main__ (from other)'):
+ with self.interpreter_obj_from_capi() as (interp, interpid):
+ with self.running_from_capi(interpid, main=True):
+ with self.assertRaisesRegex(InterpreterError, 'running'):
+ interp.close()
+ # Make sure it wssn't closed.
+ self.assertTrue(
+ interp.is_running())
+
+ # The rest must be skipped until we deal with running threads when
+ # interp.close() is called.
+ return
+
+ with self.subTest('running, but not __main__ (from other)'):
+ with self.interpreter_obj_from_capi() as (interp, interpid):
+ with self.running_from_capi(interpid, main=False):
+ with self.assertRaisesRegex(InterpreterError, 'not managed'):
+ interp.close()
+ # Make sure it wssn't closed.
+ self.assertFalse(interp.is_running())
+
+ with self.subTest('not running (from other)'):
+ with self.interpreter_obj_from_capi() as (interp, _):
+ with self.assertRaisesRegex(InterpreterError, 'not managed'):
+ interp.close()
+ # Make sure it wssn't closed.
+ self.assertFalse(interp.is_running())
+
class TestInterpreterPrepareMain(TestBase):
@@ -511,26 +655,45 @@ class TestInterpreterPrepareMain(TestBase):
interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'})
# Make sure neither was actually bound.
- with self.assertRaises(interpreters.ExecutionFailed):
+ with self.assertRaises(ExecutionFailed):
interp.exec('print(foo)')
- with self.assertRaises(interpreters.ExecutionFailed):
+ with self.assertRaises(ExecutionFailed):
interp.exec('print(spam)')
+ def test_running(self):
+ interp = interpreters.create()
+ interp.prepare_main({'spam': True})
+ with self.running(interp):
+ with self.assertRaisesRegex(InterpreterError, 'running'):
+ interp.prepare_main({'spam': False})
+ interp.exec('assert spam is True')
+
+ @requires_test_modules
+ def test_created_with_capi(self):
+ with self.interpreter_from_capi() as interpid:
+ interp = interpreters.Interpreter(interpid)
+ interp.prepare_main({'spam': True})
+ rc = _testinternalcapi.exec_interpreter(interpid,
+ 'assert spam is True')
+ assert rc == 0, rc
+
class TestInterpreterExec(TestBase):
def test_success(self):
interp = interpreters.create()
- script, file = _captured_script('print("it worked!", end="")')
- with file:
+ script, results = _captured_script('print("it worked!", end="")')
+ with results:
interp.exec(script)
- out = file.read()
+ results = results.final()
+ results.raise_if_failed()
+ out = results.stdout
self.assertEqual(out, 'it worked!')
def test_failure(self):
interp = interpreters.create()
- with self.assertRaises(interpreters.ExecutionFailed):
+ with self.assertRaises(ExecutionFailed):
interp.exec('raise Exception')
def test_display_preserved_exception(self):
@@ -583,15 +746,17 @@ class TestInterpreterExec(TestBase):
def test_in_thread(self):
interp = interpreters.create()
- script, file = _captured_script('print("it worked!", end="")')
- with file:
+ script, results = _captured_script('print("it worked!", end="")')
+ with results:
def f():
interp.exec(script)
t = threading.Thread(target=f)
t.start()
t.join()
- out = file.read()
+ results = results.final()
+ results.raise_if_failed()
+ out = results.stdout
self.assertEqual(out, 'it worked!')
@@ -618,6 +783,7 @@ class TestInterpreterExec(TestBase):
content = file.read()
self.assertEqual(content, expected)
+ # XXX Is this still true?
@unittest.skip('Fails on FreeBSD')
def test_already_running(self):
interp = interpreters.create()
@@ -666,6 +832,11 @@ class TestInterpreterExec(TestBase):
self.assertEqual(os.read(r_interp, 1), RAN)
self.assertEqual(os.read(r_interp, 1), FINISHED)
+ def test_created_with_capi(self):
+ with self.interpreter_obj_from_capi() as (interp, _):
+ with self.assertRaisesRegex(ExecutionFailed, 'it worked'):
+ interp.exec('raise Exception("it worked!")')
+
# test_xxsubinterpreters covers the remaining
# Interpreter.exec() behavior.
@@ -830,7 +1001,7 @@ class TestInterpreterCall(TestBase):
raise Exception((args, kwargs))
interp.call(callable)
- with self.assertRaises(interpreters.ExecutionFailed):
+ with self.assertRaises(ExecutionFailed):
interp.call(call_func_failure)
def test_call_in_thread(self):
@@ -942,6 +1113,14 @@ class LowLevelTests(TestBase):
# encountered by the high-level module, thus they
# mostly shouldn't matter as much.
+ def interp_exists(self, interpid):
+ try:
+ _interpreters.is_running(interpid)
+ except InterpreterNotFoundError:
+ return False
+ else:
+ return True
+
def test_new_config(self):
# This test overlaps with
# test.test_capi.test_misc.InterpreterConfigTests.
@@ -1064,46 +1243,107 @@ class LowLevelTests(TestBase):
with self.assertRaises(ValueError):
_interpreters.new_config(gil=value)
- def test_get_config(self):
- # This test overlaps with
- # test.test_capi.test_misc.InterpreterConfigTests.
+ def test_get_main(self):
+ interpid, = _interpreters.get_main()
+ self.assertEqual(interpid, 0)
+ def test_get_current(self):
with self.subTest('main'):
- expected = _interpreters.new_config('legacy')
- expected.gil = 'own'
- interpid = _interpreters.get_main()
- config = _interpreters.get_config(interpid)
- self.assert_ns_equal(config, expected)
+ main, *_ = _interpreters.get_main()
+ interpid, = _interpreters.get_current()
+ self.assertEqual(interpid, main)
+
+ script = f"""
+ import {_interpreters.__name__} as _interpreters
+ interpid, = _interpreters.get_current()
+ print(interpid)
+ """
+ def parse_stdout(text):
+ parts = text.split()
+ assert len(parts) == 1, parts
+ interpid, = parts
+ interpid = int(interpid)
+ return interpid,
+
+ with self.subTest('from _interpreters'):
+ orig = _interpreters.create()
+ text = self.run_and_capture(orig, script)
+ interpid, = parse_stdout(text)
+ self.assertEqual(interpid, orig)
+
+ with self.subTest('from C-API'):
+ last = 0
+ for id, *_ in _interpreters.list_all():
+ last = max(last, id)
+ expected = last + 1
+ text = self.run_temp_from_capi(script)
+ interpid, = parse_stdout(text)
+ self.assertEqual(interpid, expected)
+
+ def test_list_all(self):
+ mainid, *_ = _interpreters.get_main()
+ interpid1 = _interpreters.create()
+ interpid2 = _interpreters.create()
+ interpid3 = _interpreters.create()
+ expected = [
+ (mainid,),
+ (interpid1,),
+ (interpid2,),
+ (interpid3,),
+ ]
- with self.subTest('isolated'):
- expected = _interpreters.new_config('isolated')
- interpid = _interpreters.create('isolated')
- config = _interpreters.get_config(interpid)
- self.assert_ns_equal(config, expected)
+ with self.subTest('main'):
+ res = _interpreters.list_all()
+ self.assertEqual(res, expected)
+
+ with self.subTest('from _interpreters'):
+ text = self.run_and_capture(interpid2, f"""
+ import {_interpreters.__name__} as _interpreters
+ print(
+ _interpreters.list_all())
+ """)
- with self.subTest('legacy'):
- expected = _interpreters.new_config('legacy')
- interpid = _interpreters.create('legacy')
- config = _interpreters.get_config(interpid)
- self.assert_ns_equal(config, expected)
+ res = eval(text)
+ self.assertEqual(res, expected)
+
+ with self.subTest('from C-API'):
+ interpid4 = interpid3 + 1
+ interpid5 = interpid4 + 1
+ expected2 = expected + [
+ (interpid4,),
+ (interpid5,),
+ ]
+ expected3 = expected + [
+ (interpid5,),
+ ]
+ text = self.run_temp_from_capi(f"""
+ import {_interpreters.__name__} as _interpreters
+ _interpreters.create()
+ print(
+ _interpreters.list_all())
+ """)
+ res2 = eval(text)
+ res3 = _interpreters.list_all()
+ self.assertEqual(res2, expected2)
+ self.assertEqual(res3, expected3)
def test_create(self):
isolated = _interpreters.new_config('isolated')
legacy = _interpreters.new_config('legacy')
default = isolated
- with self.subTest('no arg'):
+ with self.subTest('no args'):
interpid = _interpreters.create()
config = _interpreters.get_config(interpid)
self.assert_ns_equal(config, default)
- with self.subTest('arg: None'):
+ with self.subTest('config: None'):
interpid = _interpreters.create(None)
config = _interpreters.get_config(interpid)
self.assert_ns_equal(config, default)
- with self.subTest('arg: \'empty\''):
- with self.assertRaises(interpreters.InterpreterError):
+ with self.subTest('config: \'empty\''):
+ with self.assertRaises(InterpreterError):
# The "empty" config isn't viable on its own.
_interpreters.create('empty')
@@ -1138,6 +1378,230 @@ class LowLevelTests(TestBase):
with self.assertRaises(ValueError):
_interpreters.create(orig)
+ @requires_test_modules
+ def test_destroy(self):
+ with self.subTest('from _interpreters'):
+ interpid = _interpreters.create()
+ before = [id for id, *_ in _interpreters.list_all()]
+ _interpreters.destroy(interpid)
+ after = [id for id, *_ in _interpreters.list_all()]
+
+ self.assertIn(interpid, before)
+ self.assertNotIn(interpid, after)
+ self.assertFalse(
+ self.interp_exists(interpid))
+
+ with self.subTest('main'):
+ interpid, *_ = _interpreters.get_main()
+ with self.assertRaises(InterpreterError):
+ # It is the current interpreter.
+ _interpreters.destroy(interpid)
+
+ with self.subTest('from C-API'):
+ interpid = _testinternalcapi.create_interpreter()
+ _interpreters.destroy(interpid)
+ self.assertFalse(
+ self.interp_exists(interpid))
+
+ def test_get_config(self):
+ # This test overlaps with
+ # test.test_capi.test_misc.InterpreterConfigTests.
+
+ with self.subTest('main'):
+ expected = _interpreters.new_config('legacy')
+ expected.gil = 'own'
+ interpid, *_ = _interpreters.get_main()
+ config = _interpreters.get_config(interpid)
+ self.assert_ns_equal(config, expected)
+
+ with self.subTest('main'):
+ expected = _interpreters.new_config('legacy')
+ expected.gil = 'own'
+ interpid, *_ = _interpreters.get_main()
+ config = _interpreters.get_config(interpid)
+ self.assert_ns_equal(config, expected)
+
+ with self.subTest('isolated'):
+ expected = _interpreters.new_config('isolated')
+ interpid = _interpreters.create('isolated')
+ config = _interpreters.get_config(interpid)
+ self.assert_ns_equal(config, expected)
+
+ with self.subTest('legacy'):
+ expected = _interpreters.new_config('legacy')
+ interpid = _interpreters.create('legacy')
+ config = _interpreters.get_config(interpid)
+ self.assert_ns_equal(config, expected)
+
+ with self.subTest('from C-API'):
+ orig = _interpreters.new_config('isolated')
+ with self.interpreter_from_capi(orig) as interpid:
+ config = _interpreters.get_config(interpid)
+ self.assert_ns_equal(config, orig)
+
+ @requires_test_modules
+ def test_whence(self):
+ with self.subTest('main'):
+ interpid, *_ = _interpreters.get_main()
+ whence = _interpreters.whence(interpid)
+ self.assertEqual(whence, _interpreters.WHENCE_RUNTIME)
+
+ with self.subTest('stdlib'):
+ interpid = _interpreters.create()
+ whence = _interpreters.whence(interpid)
+ self.assertEqual(whence, _interpreters.WHENCE_XI)
+
+ for orig, name in {
+ # XXX Also check WHENCE_UNKNOWN.
+ _interpreters.WHENCE_LEGACY_CAPI: 'legacy C-API',
+ _interpreters.WHENCE_CAPI: 'C-API',
+ _interpreters.WHENCE_XI: 'cross-interpreter C-API',
+ }.items():
+ with self.subTest(f'from C-API ({orig}: {name})'):
+ with self.interpreter_from_capi(whence=orig) as interpid:
+ whence = _interpreters.whence(interpid)
+ self.assertEqual(whence, orig)
+
+ with self.subTest('from C-API, running'):
+ text = self.run_temp_from_capi(dedent(f"""
+ import {_interpreters.__name__} as _interpreters
+ interpid, *_ = _interpreters.get_current()
+ print(_interpreters.whence(interpid))
+ """),
+ config=True)
+ whence = eval(text)
+ self.assertEqual(whence, _interpreters.WHENCE_CAPI)
+
+ with self.subTest('from legacy C-API, running'):
+ ...
+ text = self.run_temp_from_capi(dedent(f"""
+ import {_interpreters.__name__} as _interpreters
+ interpid, *_ = _interpreters.get_current()
+ print(_interpreters.whence(interpid))
+ """),
+ config=False)
+ whence = eval(text)
+ self.assertEqual(whence, _interpreters.WHENCE_LEGACY_CAPI)
+
+ def test_is_running(self):
+ with self.subTest('main'):
+ interpid, *_ = _interpreters.get_main()
+ running = _interpreters.is_running(interpid)
+ self.assertTrue(running)
+
+ with self.subTest('from _interpreters (running)'):
+ interpid = _interpreters.create()
+ with self.running(interpid):
+ running = _interpreters.is_running(interpid)
+ self.assertTrue(running)
+
+ with self.subTest('from _interpreters (not running)'):
+ interpid = _interpreters.create()
+ running = _interpreters.is_running(interpid)
+ self.assertFalse(running)
+
+ with self.subTest('from C-API (running __main__)'):
+ with self.interpreter_from_capi() as interpid:
+ with self.running_from_capi(interpid, main=True):
+ running = _interpreters.is_running(interpid)
+ self.assertTrue(running)
+
+ with self.subTest('from C-API (running, but not __main__)'):
+ with self.interpreter_from_capi() as interpid:
+ with self.running_from_capi(interpid, main=False):
+ running = _interpreters.is_running(interpid)
+ self.assertFalse(running)
+
+ with self.subTest('from C-API (not running)'):
+ with self.interpreter_from_capi() as interpid:
+ running = _interpreters.is_running(interpid)
+ self.assertFalse(running)
+
+ def test_exec(self):
+ with self.subTest('run script'):
+ interpid = _interpreters.create()
+ script, results = _captured_script('print("it worked!", end="")')
+ with results:
+ exc = _interpreters.exec(interpid, script)
+ results = results.final()
+ results.raise_if_failed()
+ out = results.stdout
+ self.assertEqual(out, 'it worked!')
+
+ with self.subTest('uncaught exception'):
+ interpid = _interpreters.create()
+ script, results = _captured_script("""
+ raise Exception('uh-oh!')
+ print("it worked!", end="")
+ """)
+ with results:
+ exc = _interpreters.exec(interpid, script)
+ out = results.stdout()
+ self.assertEqual(out, '')
+ self.assert_ns_equal(exc, types.SimpleNamespace(
+ type=types.SimpleNamespace(
+ __name__='Exception',
+ __qualname__='Exception',
+ __module__='builtins',
+ ),
+ msg='uh-oh!',
+ # We check these in other tests.
+ formatted=exc.formatted,
+ errdisplay=exc.errdisplay,
+ ))
+
+ with self.subTest('from C-API'):
+ with self.interpreter_from_capi() as interpid:
+ exc = _interpreters.exec(interpid, 'raise Exception("it worked!")')
+ self.assertIsNot(exc, None)
+ self.assertEqual(exc.msg, 'it worked!')
+
+ def test_call(self):
+ with self.subTest('no args'):
+ interpid = _interpreters.create()
+ exc = _interpreters.call(interpid, call_func_return_shareable)
+ self.assertIs(exc, None)
+
+ with self.subTest('uncaught exception'):
+ interpid = _interpreters.create()
+ exc = _interpreters.call(interpid, call_func_failure)
+ self.assertEqual(exc, types.SimpleNamespace(
+ type=types.SimpleNamespace(
+ __name__='Exception',
+ __qualname__='Exception',
+ __module__='builtins',
+ ),
+ msg='spam!',
+ # We check these in other tests.
+ formatted=exc.formatted,
+ errdisplay=exc.errdisplay,
+ ))
+
+ def test_set___main___attrs(self):
+ with self.subTest('from _interpreters'):
+ interpid = _interpreters.create()
+ before1 = _interpreters.exec(interpid, 'assert spam == \'eggs\'')
+ before2 = _interpreters.exec(interpid, 'assert ham == 42')
+ self.assertEqual(before1.type.__name__, 'NameError')
+ self.assertEqual(before2.type.__name__, 'NameError')
+
+ _interpreters.set___main___attrs(interpid, dict(
+ spam='eggs',
+ ham=42,
+ ))
+ after1 = _interpreters.exec(interpid, 'assert spam == \'eggs\'')
+ after2 = _interpreters.exec(interpid, 'assert ham == 42')
+ after3 = _interpreters.exec(interpid, 'assert spam == 42')
+ self.assertIs(after1, None)
+ self.assertIs(after2, None)
+ self.assertEqual(after3.type.__name__, 'AssertionError')
+
+ with self.subTest('from C-API'):
+ with self.interpreter_from_capi() as interpid:
+ _interpreters.set___main___attrs(interpid, {'spam': True})
+ exc = _interpreters.exec(interpid, 'assert spam is True')
+ self.assertIsNone(exc)
+
if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports.
diff --git a/Lib/test/test_interpreters/utils.py b/Lib/test/test_interpreters/utils.py
index 5ade676..d921794 100644
--- a/Lib/test/test_interpreters/utils.py
+++ b/Lib/test/test_interpreters/utils.py
@@ -1,30 +1,344 @@
+from collections import namedtuple
import contextlib
+import json
+import io
import os
import os.path
+import pickle
+import queue
+#import select
import subprocess
import sys
import tempfile
-from textwrap import dedent
+from textwrap import dedent, indent
import threading
import types
import unittest
+import warnings
from test import support
from test.support import os_helper
+from test.support import import_helper
+_interpreters = import_helper.import_module('_xxsubinterpreters')
from test.support import interpreters
-def _captured_script(script):
- r, w = os.pipe()
- indented = script.replace('\n', '\n ')
- wrapped = dedent(f"""
- import contextlib
- with open({w}, 'w', encoding='utf-8') as spipe:
- with contextlib.redirect_stdout(spipe):
+try:
+ import _testinternalcapi
+ import _testcapi
+except ImportError:
+ _testinternalcapi = None
+ _testcapi = None
+
+def requires_test_modules(func):
+ return unittest.skipIf(_testinternalcapi is None, "test requires _testinternalcapi module")(func)
+
+
+def _dump_script(text):
+ lines = text.splitlines()
+ print()
+ print('-' * 20)
+ for i, line in enumerate(lines, 1):
+ print(f' {i:>{len(str(len(lines)))}} {line}')
+ print('-' * 20)
+
+
+def _close_file(file):
+ try:
+ if hasattr(file, 'close'):
+ file.close()
+ else:
+ os.close(file)
+ except OSError as exc:
+ if exc.errno != 9:
+ raise # re-raise
+ # It was closed already.
+
+
+def pack_exception(exc=None):
+ captured = _interpreters.capture_exception(exc)
+ data = dict(captured.__dict__)
+ data['type'] = dict(captured.type.__dict__)
+ return json.dumps(data)
+
+
+def unpack_exception(packed):
+ try:
+ data = json.loads(packed)
+ except json.decoder.JSONDecodeError:
+ warnings.warn('incomplete exception data', RuntimeWarning)
+ print(packed if isinstance(packed, str) else packed.decode('utf-8'))
+ return None
+ exc = types.SimpleNamespace(**data)
+ exc.type = types.SimpleNamespace(**exc.type)
+ return exc;
+
+
+class CapturingResults:
+
+ STDIO = dedent("""\
+ with open({w_pipe}, 'wb', buffering=0) as _spipe_{stream}:
+ _captured_std{stream} = io.StringIO()
+ with contextlib.redirect_std{stream}(_captured_std{stream}):
+ #########################
+ # begin wrapped script
+
+ {indented}
+
+ # end wrapped script
+ #########################
+ text = _captured_std{stream}.getvalue()
+ _spipe_{stream}.write(text.encode('utf-8'))
+ """)[:-1]
+ EXC = dedent("""\
+ with open({w_pipe}, 'wb', buffering=0) as _spipe_exc:
+ try:
+ #########################
+ # begin wrapped script
+
{indented}
- """)
- return wrapped, open(r, encoding='utf-8')
+
+ # end wrapped script
+ #########################
+ except Exception as exc:
+ text = _interp_utils.pack_exception(exc)
+ _spipe_exc.write(text.encode('utf-8'))
+ """)[:-1]
+
+ @classmethod
+ def wrap_script(cls, script, *, stdout=True, stderr=False, exc=False):
+ script = dedent(script).strip(os.linesep)
+ imports = [
+ f'import {__name__} as _interp_utils',
+ ]
+ wrapped = script
+
+ # Handle exc.
+ if exc:
+ exc = os.pipe()
+ r_exc, w_exc = exc
+ indented = wrapped.replace('\n', '\n ')
+ wrapped = cls.EXC.format(
+ w_pipe=w_exc,
+ indented=indented,
+ )
+ else:
+ exc = None
+
+ # Handle stdout.
+ if stdout:
+ imports.extend([
+ 'import contextlib, io',
+ ])
+ stdout = os.pipe()
+ r_out, w_out = stdout
+ indented = wrapped.replace('\n', '\n ')
+ wrapped = cls.STDIO.format(
+ w_pipe=w_out,
+ indented=indented,
+ stream='out',
+ )
+ else:
+ stdout = None
+
+ # Handle stderr.
+ if stderr == 'stdout':
+ stderr = None
+ elif stderr:
+ if not stdout:
+ imports.extend([
+ 'import contextlib, io',
+ ])
+ stderr = os.pipe()
+ r_err, w_err = stderr
+ indented = wrapped.replace('\n', '\n ')
+ wrapped = cls.STDIO.format(
+ w_pipe=w_err,
+ indented=indented,
+ stream='err',
+ )
+ else:
+ stderr = None
+
+ if wrapped == script:
+ raise NotImplementedError
+ else:
+ for line in imports:
+ wrapped = f'{line}{os.linesep}{wrapped}'
+
+ results = cls(stdout, stderr, exc)
+ return wrapped, results
+
+ def __init__(self, out, err, exc):
+ self._rf_out = None
+ self._rf_err = None
+ self._rf_exc = None
+ self._w_out = None
+ self._w_err = None
+ self._w_exc = None
+
+ if out is not None:
+ r_out, w_out = out
+ self._rf_out = open(r_out, 'rb', buffering=0)
+ self._w_out = w_out
+
+ if err is not None:
+ r_err, w_err = err
+ self._rf_err = open(r_err, 'rb', buffering=0)
+ self._w_err = w_err
+
+ if exc is not None:
+ r_exc, w_exc = exc
+ self._rf_exc = open(r_exc, 'rb', buffering=0)
+ self._w_exc = w_exc
+
+ self._buf_out = b''
+ self._buf_err = b''
+ self._buf_exc = b''
+ self._exc = None
+
+ self._closed = False
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+ @property
+ def closed(self):
+ return self._closed
+
+ def close(self):
+ if self._closed:
+ return
+ self._closed = True
+
+ if self._w_out is not None:
+ _close_file(self._w_out)
+ self._w_out = None
+ if self._w_err is not None:
+ _close_file(self._w_err)
+ self._w_err = None
+ if self._w_exc is not None:
+ _close_file(self._w_exc)
+ self._w_exc = None
+
+ self._capture()
+
+ if self._rf_out is not None:
+ _close_file(self._rf_out)
+ self._rf_out = None
+ if self._rf_err is not None:
+ _close_file(self._rf_err)
+ self._rf_err = None
+ if self._rf_exc is not None:
+ _close_file(self._rf_exc)
+ self._rf_exc = None
+
+ def _capture(self):
+ # Ideally this is called only after the script finishes
+ # (and thus has closed the write end of the pipe.
+ if self._rf_out is not None:
+ chunk = self._rf_out.read(100)
+ while chunk:
+ self._buf_out += chunk
+ chunk = self._rf_out.read(100)
+ if self._rf_err is not None:
+ chunk = self._rf_err.read(100)
+ while chunk:
+ self._buf_err += chunk
+ chunk = self._rf_err.read(100)
+ if self._rf_exc is not None:
+ chunk = self._rf_exc.read(100)
+ while chunk:
+ self._buf_exc += chunk
+ chunk = self._rf_exc.read(100)
+
+ def _unpack_stdout(self):
+ return self._buf_out.decode('utf-8')
+
+ def _unpack_stderr(self):
+ return self._buf_err.decode('utf-8')
+
+ def _unpack_exc(self):
+ if self._exc is not None:
+ return self._exc
+ if not self._buf_exc:
+ return None
+ self._exc = unpack_exception(self._buf_exc)
+ return self._exc
+
+ def stdout(self):
+ if self.closed:
+ return self.final().stdout
+ self._capture()
+ return self._unpack_stdout()
+
+ def stderr(self):
+ if self.closed:
+ return self.final().stderr
+ self._capture()
+ return self._unpack_stderr()
+
+ def exc(self):
+ if self.closed:
+ return self.final().exc
+ self._capture()
+ return self._unpack_exc()
+
+ def final(self, *, force=False):
+ try:
+ return self._final
+ except AttributeError:
+ if not self._closed:
+ if not force:
+ raise Exception('no final results available yet')
+ else:
+ return CapturedResults.Proxy(self)
+ self._final = CapturedResults(
+ self._unpack_stdout(),
+ self._unpack_stderr(),
+ self._unpack_exc(),
+ )
+ return self._final
+
+
+class CapturedResults(namedtuple('CapturedResults', 'stdout stderr exc')):
+
+ class Proxy:
+ def __init__(self, capturing):
+ self._capturing = capturing
+ def _finish(self):
+ if self._capturing is None:
+ return
+ self._final = self._capturing.final()
+ self._capturing = None
+ def __iter__(self):
+ self._finish()
+ yield from self._final
+ def __len__(self):
+ self._finish()
+ return len(self._final)
+ def __getattr__(self, name):
+ self._finish()
+ if name.startswith('_'):
+ raise AttributeError(name)
+ return getattr(self._final, name)
+
+ def raise_if_failed(self):
+ if self.exc is not None:
+ raise interpreters.ExecutionFailed(self.exc)
+
+
+def _captured_script(script, *, stdout=True, stderr=False, exc=False):
+ return CapturingResults.wrap_script(
+ script,
+ stdout=stdout,
+ stderr=stderr,
+ exc=exc,
+ )
def clean_up_interpreters():
@@ -33,17 +347,17 @@ def clean_up_interpreters():
continue
try:
interp.close()
- except RuntimeError:
+ except _interpreters.InterpreterError:
pass # already destroyed
def _run_output(interp, request, init=None):
- script, rpipe = _captured_script(request)
- with rpipe:
+ script, results = _captured_script(request)
+ with results:
if init:
interp.prepare_main(init)
interp.exec(script)
- return rpipe.read()
+ return results.stdout()
@contextlib.contextmanager
@@ -175,3 +489,184 @@ class TestBase(unittest.TestCase):
diff = f'namespace({diff})'
standardMsg = self._truncateMessage(standardMsg, diff)
self.fail(self._formatMessage(msg, standardMsg))
+
+ def _run_string(self, interp, script):
+ wrapped, results = _captured_script(script, exc=False)
+ #_dump_script(wrapped)
+ with results:
+ if isinstance(interp, interpreters.Interpreter):
+ interp.exec(script)
+ else:
+ err = _interpreters.run_string(interp, wrapped)
+ if err is not None:
+ return None, err
+ return results.stdout(), None
+
+ def run_and_capture(self, interp, script):
+ text, err = self._run_string(interp, script)
+ if err is not None:
+ raise interpreters.ExecutionFailed(err)
+ else:
+ return text
+
+ @requires_test_modules
+ @contextlib.contextmanager
+ def interpreter_from_capi(self, config=None, whence=None):
+ if config is False:
+ if whence is None:
+ whence = _interpreters.WHENCE_LEGACY_CAPI
+ else:
+ assert whence in (_interpreters.WHENCE_LEGACY_CAPI,
+ _interpreters.WHENCE_UNKNOWN), repr(whence)
+ config = None
+ elif config is True:
+ config = _interpreters.new_config('default')
+ elif config is None:
+ if whence not in (
+ _interpreters.WHENCE_LEGACY_CAPI,
+ _interpreters.WHENCE_UNKNOWN,
+ ):
+ config = _interpreters.new_config('legacy')
+ elif isinstance(config, str):
+ config = _interpreters.new_config(config)
+
+ if whence is None:
+ whence = _interpreters.WHENCE_XI
+
+ interpid = _testinternalcapi.create_interpreter(config, whence=whence)
+ try:
+ yield interpid
+ finally:
+ try:
+ _testinternalcapi.destroy_interpreter(interpid)
+ except _interpreters.InterpreterNotFoundError:
+ pass
+
+ @contextlib.contextmanager
+ def interpreter_obj_from_capi(self, config='legacy'):
+ with self.interpreter_from_capi(config) as interpid:
+ yield interpreters.Interpreter(interpid), interpid
+
+ @contextlib.contextmanager
+ def capturing(self, script):
+ wrapped, capturing = _captured_script(script, stdout=True, exc=True)
+ #_dump_script(wrapped)
+ with capturing:
+ yield wrapped, capturing.final(force=True)
+
+ @requires_test_modules
+ def run_from_capi(self, interpid, script, *, main=False):
+ with self.capturing(script) as (wrapped, results):
+ rc = _testinternalcapi.exec_interpreter(interpid, wrapped, main=main)
+ assert rc == 0, rc
+ results.raise_if_failed()
+ return results.stdout
+
+ @contextlib.contextmanager
+ def _running(self, run_interp, exec_interp):
+ token = b'\0'
+ r_in, w_in = self.pipe()
+ r_out, w_out = self.pipe()
+
+ def close():
+ _close_file(r_in)
+ _close_file(w_in)
+ _close_file(r_out)
+ _close_file(w_out)
+
+ # Start running (and wait).
+ script = dedent(f"""
+ import os
+ try:
+ # handshake
+ token = os.read({r_in}, 1)
+ os.write({w_out}, token)
+ # Wait for the "done" message.
+ os.read({r_in}, 1)
+ except BrokenPipeError:
+ pass
+ except OSError as exc:
+ if exc.errno != 9:
+ raise # re-raise
+ # It was closed already.
+ """)
+ failed = None
+ def run():
+ nonlocal failed
+ try:
+ run_interp(script)
+ except Exception as exc:
+ failed = exc
+ close()
+ t = threading.Thread(target=run)
+ t.start()
+
+ # handshake
+ try:
+ os.write(w_in, token)
+ token2 = os.read(r_out, 1)
+ assert token2 == token, (token2, token)
+ except OSError:
+ t.join()
+ if failed is not None:
+ raise failed
+
+ # CM __exit__()
+ try:
+ try:
+ yield
+ finally:
+ # Send "done".
+ os.write(w_in, b'\0')
+ finally:
+ close()
+ t.join()
+ if failed is not None:
+ raise failed
+
+ @contextlib.contextmanager
+ def running(self, interp):
+ if isinstance(interp, int):
+ interpid = interp
+ def exec_interp(script):
+ exc = _interpreters.exec(interpid, script)
+ assert exc is None, exc
+ run_interp = exec_interp
+ else:
+ def run_interp(script):
+ text = self.run_and_capture(interp, script)
+ assert text == '', repr(text)
+ def exec_interp(script):
+ interp.exec(script)
+ with self._running(run_interp, exec_interp):
+ yield
+
+ @requires_test_modules
+ @contextlib.contextmanager
+ def running_from_capi(self, interpid, *, main=False):
+ def run_interp(script):
+ text = self.run_from_capi(interpid, script, main=main)
+ assert text == '', repr(text)
+ def exec_interp(script):
+ rc = _testinternalcapi.exec_interpreter(interpid, script)
+ assert rc == 0, rc
+ with self._running(run_interp, exec_interp):
+ yield
+
+ @requires_test_modules
+ def run_temp_from_capi(self, script, config='legacy'):
+ if config is False:
+ # Force using Py_NewInterpreter().
+ run_in_interp = (lambda s, c: _testcapi.run_in_subinterp(s))
+ config = None
+ else:
+ run_in_interp = _testinternalcapi.run_in_subinterp_with_config
+ if config is True:
+ config = 'default'
+ if isinstance(config, str):
+ config = _interpreters.new_config(config)
+ with self.capturing(script) as (wrapped, results):
+ rc = run_in_interp(wrapped, config)
+ assert rc == 0, rc
+ results.raise_if_failed()
+ return results.stdout