summaryrefslogtreecommitdiffstats
path: root/Lib/test/test__xxsubinterpreters.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test__xxsubinterpreters.py')
-rw-r--r--Lib/test/test__xxsubinterpreters.py338
1 files changed, 176 insertions, 162 deletions
diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py
index 841077a..c8c964f 100644
--- a/Lib/test/test__xxsubinterpreters.py
+++ b/Lib/test/test__xxsubinterpreters.py
@@ -13,7 +13,7 @@ from test.support import os_helper
from test.support import script_helper
-interpreters = import_helper.import_module('_xxsubinterpreters')
+_interpreters = import_helper.import_module('_xxsubinterpreters')
_testinternalcapi = import_helper.import_module('_testinternalcapi')
from _xxsubinterpreters import InterpreterNotFoundError
@@ -36,7 +36,7 @@ def _captured_script(script):
def _run_output(interp, request):
script, rpipe = _captured_script(request)
with rpipe:
- interpreters.run_string(interp, script)
+ _interpreters.run_string(interp, script)
return rpipe.read()
@@ -47,7 +47,7 @@ def _wait_for_interp_to_run(interp, timeout=None):
if timeout is None:
timeout = support.SHORT_TIMEOUT
for _ in support.sleeping_retry(timeout, error=False):
- if interpreters.is_running(interp):
+ if _interpreters.is_running(interp):
break
else:
raise RuntimeError('interp is not running')
@@ -57,7 +57,7 @@ def _wait_for_interp_to_run(interp, timeout=None):
def _running(interp):
r, w = os.pipe()
def run():
- interpreters.run_string(interp, dedent(f"""
+ _interpreters.run_string(interp, dedent(f"""
# wait for "signal"
with open({r}, encoding="utf-8") as rpipe:
rpipe.read()
@@ -75,12 +75,12 @@ def _running(interp):
def clean_up_interpreters():
- for id in interpreters.list_all():
+ for id, *_ in _interpreters.list_all():
if id == 0: # main
continue
try:
- interpreters.destroy(id)
- except interpreters.InterpreterError:
+ _interpreters.destroy(id)
+ except _interpreters.InterpreterError:
pass # already destroyed
@@ -112,7 +112,7 @@ class IsShareableTests(unittest.TestCase):
for obj in shareables:
with self.subTest(obj):
self.assertTrue(
- interpreters.is_shareable(obj))
+ _interpreters.is_shareable(obj))
def test_not_shareable(self):
class Cheese:
@@ -141,7 +141,7 @@ class IsShareableTests(unittest.TestCase):
for obj in not_shareables:
with self.subTest(repr(obj)):
self.assertFalse(
- interpreters.is_shareable(obj))
+ _interpreters.is_shareable(obj))
class ShareableTypeTests(unittest.TestCase):
@@ -230,7 +230,7 @@ class ModuleTests(TestBase):
def test_import_in_interpreter(self):
_run_output(
- interpreters.create(),
+ _interpreters.create(),
'import _xxsubinterpreters as _interpreters',
)
@@ -241,45 +241,45 @@ class ModuleTests(TestBase):
class ListAllTests(TestBase):
def test_initial(self):
- main = interpreters.get_main()
- ids = interpreters.list_all()
+ main, *_ = _interpreters.get_main()
+ ids = [id for id, *_ in _interpreters.list_all()]
self.assertEqual(ids, [main])
def test_after_creating(self):
- main = interpreters.get_main()
- first = interpreters.create()
- second = interpreters.create()
- ids = interpreters.list_all()
+ main, *_ = _interpreters.get_main()
+ first = _interpreters.create()
+ second = _interpreters.create()
+ ids = [id for id, *_ in _interpreters.list_all()]
self.assertEqual(ids, [main, first, second])
def test_after_destroying(self):
- main = interpreters.get_main()
- first = interpreters.create()
- second = interpreters.create()
- interpreters.destroy(first)
- ids = interpreters.list_all()
+ main, *_ = _interpreters.get_main()
+ first = _interpreters.create()
+ second = _interpreters.create()
+ _interpreters.destroy(first)
+ ids = [id for id, *_ in _interpreters.list_all()]
self.assertEqual(ids, [main, second])
class GetCurrentTests(TestBase):
def test_main(self):
- main = interpreters.get_main()
- cur = interpreters.get_current()
+ main, *_ = _interpreters.get_main()
+ cur, *_ = _interpreters.get_current()
self.assertEqual(cur, main)
self.assertIsInstance(cur, int)
def test_subinterpreter(self):
- main = interpreters.get_main()
- interp = interpreters.create()
+ main, *_ = _interpreters.get_main()
+ interp = _interpreters.create()
out = _run_output(interp, dedent("""
import _xxsubinterpreters as _interpreters
- cur = _interpreters.get_current()
+ cur, *_ = _interpreters.get_current()
print(cur)
assert isinstance(cur, int)
"""))
cur = int(out.strip())
- _, expected = interpreters.list_all()
+ _, expected = [id for id, *_ in _interpreters.list_all()]
self.assertEqual(cur, expected)
self.assertNotEqual(cur, main)
@@ -287,17 +287,17 @@ class GetCurrentTests(TestBase):
class GetMainTests(TestBase):
def test_from_main(self):
- [expected] = interpreters.list_all()
- main = interpreters.get_main()
+ [expected] = [id for id, *_ in _interpreters.list_all()]
+ main, *_ = _interpreters.get_main()
self.assertEqual(main, expected)
self.assertIsInstance(main, int)
def test_from_subinterpreter(self):
- [expected] = interpreters.list_all()
- interp = interpreters.create()
+ [expected] = [id for id, *_ in _interpreters.list_all()]
+ interp = _interpreters.create()
out = _run_output(interp, dedent("""
import _xxsubinterpreters as _interpreters
- main = _interpreters.get_main()
+ main, *_ = _interpreters.get_main()
print(main)
assert isinstance(main, int)
"""))
@@ -308,20 +308,20 @@ class GetMainTests(TestBase):
class IsRunningTests(TestBase):
def test_main(self):
- main = interpreters.get_main()
- self.assertTrue(interpreters.is_running(main))
+ main, *_ = _interpreters.get_main()
+ self.assertTrue(_interpreters.is_running(main))
@unittest.skip('Fails on FreeBSD')
def test_subinterpreter(self):
- interp = interpreters.create()
- self.assertFalse(interpreters.is_running(interp))
+ interp = _interpreters.create()
+ self.assertFalse(_interpreters.is_running(interp))
with _running(interp):
- self.assertTrue(interpreters.is_running(interp))
- self.assertFalse(interpreters.is_running(interp))
+ self.assertTrue(_interpreters.is_running(interp))
+ self.assertFalse(_interpreters.is_running(interp))
def test_from_subinterpreter(self):
- interp = interpreters.create()
+ interp = _interpreters.create()
out = _run_output(interp, dedent(f"""
import _xxsubinterpreters as _interpreters
if _interpreters.is_running({interp}):
@@ -332,34 +332,35 @@ class IsRunningTests(TestBase):
self.assertEqual(out.strip(), 'True')
def test_already_destroyed(self):
- interp = interpreters.create()
- interpreters.destroy(interp)
+ interp = _interpreters.create()
+ _interpreters.destroy(interp)
with self.assertRaises(InterpreterNotFoundError):
- interpreters.is_running(interp)
+ _interpreters.is_running(interp)
def test_does_not_exist(self):
with self.assertRaises(InterpreterNotFoundError):
- interpreters.is_running(1_000_000)
+ _interpreters.is_running(1_000_000)
def test_bad_id(self):
with self.assertRaises(ValueError):
- interpreters.is_running(-1)
+ _interpreters.is_running(-1)
class CreateTests(TestBase):
def test_in_main(self):
- id = interpreters.create()
+ id = _interpreters.create()
self.assertIsInstance(id, int)
- self.assertIn(id, interpreters.list_all())
+ after = [id for id, *_ in _interpreters.list_all()]
+ self.assertIn(id, after)
@unittest.skip('enable this test when working on pystate.c')
def test_unique_id(self):
seen = set()
for _ in range(100):
- id = interpreters.create()
- interpreters.destroy(id)
+ id = _interpreters.create()
+ _interpreters.destroy(id)
seen.add(id)
self.assertEqual(len(seen), 100)
@@ -369,7 +370,7 @@ class CreateTests(TestBase):
id = None
def f():
nonlocal id
- id = interpreters.create()
+ id = _interpreters.create()
lock.acquire()
lock.release()
@@ -377,11 +378,12 @@ class CreateTests(TestBase):
with lock:
t.start()
t.join()
- self.assertIn(id, interpreters.list_all())
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertIn(id, after)
def test_in_subinterpreter(self):
- main, = interpreters.list_all()
- id1 = interpreters.create()
+ main, = [id for id, *_ in _interpreters.list_all()]
+ id1 = _interpreters.create()
out = _run_output(id1, dedent("""
import _xxsubinterpreters as _interpreters
id = _interpreters.create()
@@ -390,11 +392,12 @@ class CreateTests(TestBase):
"""))
id2 = int(out.strip())
- self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(after, {main, id1, id2})
def test_in_threaded_subinterpreter(self):
- main, = interpreters.list_all()
- id1 = interpreters.create()
+ main, = [id for id, *_ in _interpreters.list_all()]
+ id1 = _interpreters.create()
id2 = None
def f():
nonlocal id2
@@ -409,144 +412,155 @@ class CreateTests(TestBase):
t.start()
t.join()
- self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(after, {main, id1, id2})
def test_after_destroy_all(self):
- before = set(interpreters.list_all())
+ before = set(id for id, *_ in _interpreters.list_all())
# Create 3 subinterpreters.
ids = []
for _ in range(3):
- id = interpreters.create()
+ id = _interpreters.create()
ids.append(id)
# Now destroy them.
for id in ids:
- interpreters.destroy(id)
+ _interpreters.destroy(id)
# Finally, create another.
- id = interpreters.create()
- self.assertEqual(set(interpreters.list_all()), before | {id})
+ id = _interpreters.create()
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(after, before | {id})
def test_after_destroy_some(self):
- before = set(interpreters.list_all())
+ before = set(id for id, *_ in _interpreters.list_all())
# Create 3 subinterpreters.
- id1 = interpreters.create()
- id2 = interpreters.create()
- id3 = interpreters.create()
+ id1 = _interpreters.create()
+ id2 = _interpreters.create()
+ id3 = _interpreters.create()
# Now destroy 2 of them.
- interpreters.destroy(id1)
- interpreters.destroy(id3)
+ _interpreters.destroy(id1)
+ _interpreters.destroy(id3)
# Finally, create another.
- id = interpreters.create()
- self.assertEqual(set(interpreters.list_all()), before | {id, id2})
+ id = _interpreters.create()
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(after, before | {id, id2})
class DestroyTests(TestBase):
def test_one(self):
- id1 = interpreters.create()
- id2 = interpreters.create()
- id3 = interpreters.create()
- self.assertIn(id2, interpreters.list_all())
- interpreters.destroy(id2)
- self.assertNotIn(id2, interpreters.list_all())
- self.assertIn(id1, interpreters.list_all())
- self.assertIn(id3, interpreters.list_all())
+ id1 = _interpreters.create()
+ id2 = _interpreters.create()
+ id3 = _interpreters.create()
+ before = set(id for id, *_ in _interpreters.list_all())
+ self.assertIn(id2, before)
+
+ _interpreters.destroy(id2)
+
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertNotIn(id2, after)
+ self.assertIn(id1, after)
+ self.assertIn(id3, after)
def test_all(self):
- before = set(interpreters.list_all())
+ initial = set(id for id, *_ in _interpreters.list_all())
ids = set()
for _ in range(3):
- id = interpreters.create()
+ id = _interpreters.create()
ids.add(id)
- self.assertEqual(set(interpreters.list_all()), before | ids)
+ before = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(before, initial | ids)
for id in ids:
- interpreters.destroy(id)
- self.assertEqual(set(interpreters.list_all()), before)
+ _interpreters.destroy(id)
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(after, initial)
def test_main(self):
- main, = interpreters.list_all()
- with self.assertRaises(interpreters.InterpreterError):
- interpreters.destroy(main)
+ main, = [id for id, *_ in _interpreters.list_all()]
+ with self.assertRaises(_interpreters.InterpreterError):
+ _interpreters.destroy(main)
def f():
- with self.assertRaises(interpreters.InterpreterError):
- interpreters.destroy(main)
+ with self.assertRaises(_interpreters.InterpreterError):
+ _interpreters.destroy(main)
t = threading.Thread(target=f)
t.start()
t.join()
def test_already_destroyed(self):
- id = interpreters.create()
- interpreters.destroy(id)
+ id = _interpreters.create()
+ _interpreters.destroy(id)
with self.assertRaises(InterpreterNotFoundError):
- interpreters.destroy(id)
+ _interpreters.destroy(id)
def test_does_not_exist(self):
with self.assertRaises(InterpreterNotFoundError):
- interpreters.destroy(1_000_000)
+ _interpreters.destroy(1_000_000)
def test_bad_id(self):
with self.assertRaises(ValueError):
- interpreters.destroy(-1)
+ _interpreters.destroy(-1)
def test_from_current(self):
- main, = interpreters.list_all()
- id = interpreters.create()
+ main, = [id for id, *_ in _interpreters.list_all()]
+ id = _interpreters.create()
script = dedent(f"""
import _xxsubinterpreters as _interpreters
try:
_interpreters.destroy({id})
- except interpreters.InterpreterError:
+ except _interpreters.InterpreterError:
pass
""")
- interpreters.run_string(id, script)
- self.assertEqual(set(interpreters.list_all()), {main, id})
+ _interpreters.run_string(id, script)
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(after, {main, id})
def test_from_sibling(self):
- main, = interpreters.list_all()
- id1 = interpreters.create()
- id2 = interpreters.create()
+ main, = [id for id, *_ in _interpreters.list_all()]
+ id1 = _interpreters.create()
+ id2 = _interpreters.create()
script = dedent(f"""
import _xxsubinterpreters as _interpreters
_interpreters.destroy({id2})
""")
- interpreters.run_string(id1, script)
+ _interpreters.run_string(id1, script)
- self.assertEqual(set(interpreters.list_all()), {main, id1})
+ after = set(id for id, *_ in _interpreters.list_all())
+ self.assertEqual(after, {main, id1})
def test_from_other_thread(self):
- id = interpreters.create()
+ id = _interpreters.create()
def f():
- interpreters.destroy(id)
+ _interpreters.destroy(id)
t = threading.Thread(target=f)
t.start()
t.join()
def test_still_running(self):
- main, = interpreters.list_all()
- interp = interpreters.create()
+ main, = [id for id, *_ in _interpreters.list_all()]
+ interp = _interpreters.create()
with _running(interp):
- self.assertTrue(interpreters.is_running(interp),
+ self.assertTrue(_interpreters.is_running(interp),
msg=f"Interp {interp} should be running before destruction.")
- with self.assertRaises(interpreters.InterpreterError,
+ with self.assertRaises(_interpreters.InterpreterError,
msg=f"Should not be able to destroy interp {interp} while it's still running."):
- interpreters.destroy(interp)
- self.assertTrue(interpreters.is_running(interp))
+ _interpreters.destroy(interp)
+ self.assertTrue(_interpreters.is_running(interp))
class RunStringTests(TestBase):
def setUp(self):
super().setUp()
- self.id = interpreters.create()
+ self.id = _interpreters.create()
def test_success(self):
script, file = _captured_script('print("it worked!", end="")')
with file:
- interpreters.run_string(self.id, script)
+ _interpreters.run_string(self.id, script)
out = file.read()
self.assertEqual(out, 'it worked!')
@@ -555,7 +569,7 @@ class RunStringTests(TestBase):
script, file = _captured_script('print("it worked!", end="")')
with file:
def f():
- interpreters.run_string(self.id, script)
+ _interpreters.run_string(self.id, script)
t = threading.Thread(target=f)
t.start()
@@ -565,7 +579,7 @@ class RunStringTests(TestBase):
self.assertEqual(out, 'it worked!')
def test_create_thread(self):
- subinterp = interpreters.create()
+ subinterp = _interpreters.create()
script, file = _captured_script("""
import threading
def f():
@@ -576,7 +590,7 @@ class RunStringTests(TestBase):
t.join()
""")
with file:
- interpreters.run_string(subinterp, script)
+ _interpreters.run_string(subinterp, script)
out = file.read()
self.assertEqual(out, 'it worked!')
@@ -584,7 +598,7 @@ class RunStringTests(TestBase):
def test_create_daemon_thread(self):
with self.subTest('isolated'):
expected = 'spam spam spam spam spam'
- subinterp = interpreters.create('isolated')
+ subinterp = _interpreters.create('isolated')
script, file = _captured_script(f"""
import threading
def f():
@@ -598,13 +612,13 @@ class RunStringTests(TestBase):
print('{expected}', end='')
""")
with file:
- interpreters.run_string(subinterp, script)
+ _interpreters.run_string(subinterp, script)
out = file.read()
self.assertEqual(out, expected)
with self.subTest('not isolated'):
- subinterp = interpreters.create('legacy')
+ subinterp = _interpreters.create('legacy')
script, file = _captured_script("""
import threading
def f():
@@ -615,13 +629,13 @@ class RunStringTests(TestBase):
t.join()
""")
with file:
- interpreters.run_string(subinterp, script)
+ _interpreters.run_string(subinterp, script)
out = file.read()
self.assertEqual(out, 'it worked!')
def test_shareable_types(self):
- interp = interpreters.create()
+ interp = _interpreters.create()
objects = [
None,
'spam',
@@ -630,15 +644,15 @@ class RunStringTests(TestBase):
]
for obj in objects:
with self.subTest(obj):
- interpreters.set___main___attrs(interp, dict(obj=obj))
- interpreters.run_string(
+ _interpreters.set___main___attrs(interp, dict(obj=obj))
+ _interpreters.run_string(
interp,
f'assert(obj == {obj!r})',
)
def test_os_exec(self):
expected = 'spam spam spam spam spam'
- subinterp = interpreters.create()
+ subinterp = _interpreters.create()
script, file = _captured_script(f"""
import os, sys
try:
@@ -647,7 +661,7 @@ class RunStringTests(TestBase):
print('{expected}', end='')
""")
with file:
- interpreters.run_string(subinterp, script)
+ _interpreters.run_string(subinterp, script)
out = file.read()
self.assertEqual(out, expected)
@@ -668,7 +682,7 @@ class RunStringTests(TestBase):
with open('{file.name}', 'w', encoding='utf-8') as out:
out.write('{expected}')
""")
- interpreters.run_string(self.id, script)
+ _interpreters.run_string(self.id, script)
file.seek(0)
content = file.read()
@@ -676,31 +690,31 @@ class RunStringTests(TestBase):
def test_already_running(self):
with _running(self.id):
- with self.assertRaises(interpreters.InterpreterError):
- interpreters.run_string(self.id, 'print("spam")')
+ with self.assertRaises(_interpreters.InterpreterError):
+ _interpreters.run_string(self.id, 'print("spam")')
def test_does_not_exist(self):
id = 0
- while id in interpreters.list_all():
+ while id in set(id for id, *_ in _interpreters.list_all()):
id += 1
with self.assertRaises(InterpreterNotFoundError):
- interpreters.run_string(id, 'print("spam")')
+ _interpreters.run_string(id, 'print("spam")')
def test_error_id(self):
with self.assertRaises(ValueError):
- interpreters.run_string(-1, 'print("spam")')
+ _interpreters.run_string(-1, 'print("spam")')
def test_bad_id(self):
with self.assertRaises(TypeError):
- interpreters.run_string('spam', 'print("spam")')
+ _interpreters.run_string('spam', 'print("spam")')
def test_bad_script(self):
with self.assertRaises(TypeError):
- interpreters.run_string(self.id, 10)
+ _interpreters.run_string(self.id, 10)
def test_bytes_for_script(self):
with self.assertRaises(TypeError):
- interpreters.run_string(self.id, b'print("spam")')
+ _interpreters.run_string(self.id, b'print("spam")')
def test_with_shared(self):
r, w = os.pipe()
@@ -721,8 +735,8 @@ class RunStringTests(TestBase):
with open({w}, 'wb') as chan:
pickle.dump(ns, chan)
""")
- interpreters.set___main___attrs(self.id, shared)
- interpreters.run_string(self.id, script)
+ _interpreters.set___main___attrs(self.id, shared)
+ _interpreters.run_string(self.id, script)
with open(r, 'rb') as chan:
ns = pickle.load(chan)
@@ -732,7 +746,7 @@ class RunStringTests(TestBase):
self.assertIsNone(ns['cheddar'])
def test_shared_overwrites(self):
- interpreters.run_string(self.id, dedent("""
+ _interpreters.run_string(self.id, dedent("""
spam = 'eggs'
ns1 = dict(vars())
del ns1['__builtins__']
@@ -743,8 +757,8 @@ class RunStringTests(TestBase):
ns2 = dict(vars())
del ns2['__builtins__']
""")
- interpreters.set___main___attrs(self.id, shared)
- interpreters.run_string(self.id, script)
+ _interpreters.set___main___attrs(self.id, shared)
+ _interpreters.run_string(self.id, script)
r, w = os.pipe()
script = dedent(f"""
@@ -754,7 +768,7 @@ class RunStringTests(TestBase):
with open({w}, 'wb') as chan:
pickle.dump(ns, chan)
""")
- interpreters.run_string(self.id, script)
+ _interpreters.run_string(self.id, script)
with open(r, 'rb') as chan:
ns = pickle.load(chan)
@@ -775,8 +789,8 @@ class RunStringTests(TestBase):
with open({w}, 'wb') as chan:
pickle.dump(ns, chan)
""")
- interpreters.set___main___attrs(self.id, shared)
- interpreters.run_string(self.id, script)
+ _interpreters.set___main___attrs(self.id, shared)
+ _interpreters.run_string(self.id, script)
with open(r, 'rb') as chan:
ns = pickle.load(chan)
@@ -784,7 +798,7 @@ class RunStringTests(TestBase):
def test_main_reused(self):
r, w = os.pipe()
- interpreters.run_string(self.id, dedent(f"""
+ _interpreters.run_string(self.id, dedent(f"""
spam = True
ns = dict(vars())
@@ -798,7 +812,7 @@ class RunStringTests(TestBase):
ns1 = pickle.load(chan)
r, w = os.pipe()
- interpreters.run_string(self.id, dedent(f"""
+ _interpreters.run_string(self.id, dedent(f"""
eggs = False
ns = dict(vars())
@@ -827,7 +841,7 @@ class RunStringTests(TestBase):
with open({w}, 'wb') as chan:
pickle.dump(ns, chan)
""")
- interpreters.run_string(self.id, script)
+ _interpreters.run_string(self.id, script)
with open(r, 'rb') as chan:
ns = pickle.load(chan)
@@ -872,13 +886,13 @@ class RunFailedTests(TestBase):
def setUp(self):
super().setUp()
- self.id = interpreters.create()
+ self.id = _interpreters.create()
def add_module(self, modname, text):
import tempfile
tempdir = tempfile.mkdtemp()
self.addCleanup(lambda: os_helper.rmtree(tempdir))
- interpreters.run_string(self.id, dedent(f"""
+ _interpreters.run_string(self.id, dedent(f"""
import sys
sys.path.insert(0, {tempdir!r})
"""))
@@ -900,11 +914,11 @@ class RunFailedTests(TestBase):
raise NeverError # never raised
""").format(dedent(text))
if fails:
- err = interpreters.run_string(self.id, script)
+ err = _interpreters.run_string(self.id, script)
self.assertIsNot(err, None)
return err
else:
- err = interpreters.run_string(self.id, script)
+ err = _interpreters.run_string(self.id, script)
self.assertIs(err, None)
return None
except:
@@ -1029,7 +1043,7 @@ class RunFuncTests(TestBase):
def setUp(self):
super().setUp()
- self.id = interpreters.create()
+ self.id = _interpreters.create()
def test_success(self):
r, w = os.pipe()
@@ -1039,8 +1053,8 @@ class RunFuncTests(TestBase):
with open(w, 'w', encoding="utf-8") as spipe:
with contextlib.redirect_stdout(spipe):
print('it worked!', end='')
- interpreters.set___main___attrs(self.id, dict(w=w))
- interpreters.run_func(self.id, script)
+ _interpreters.set___main___attrs(self.id, dict(w=w))
+ _interpreters.run_func(self.id, script)
with open(r, encoding="utf-8") as outfile:
out = outfile.read()
@@ -1056,8 +1070,8 @@ class RunFuncTests(TestBase):
with contextlib.redirect_stdout(spipe):
print('it worked!', end='')
def f():
- interpreters.set___main___attrs(self.id, dict(w=w))
- interpreters.run_func(self.id, script)
+ _interpreters.set___main___attrs(self.id, dict(w=w))
+ _interpreters.run_func(self.id, script)
t = threading.Thread(target=f)
t.start()
t.join()
@@ -1077,8 +1091,8 @@ class RunFuncTests(TestBase):
with contextlib.redirect_stdout(spipe):
print('it worked!', end='')
code = script.__code__
- interpreters.set___main___attrs(self.id, dict(w=w))
- interpreters.run_func(self.id, code)
+ _interpreters.set___main___attrs(self.id, dict(w=w))
+ _interpreters.run_func(self.id, code)
with open(r, encoding="utf-8") as outfile:
out = outfile.read()
@@ -1091,7 +1105,7 @@ class RunFuncTests(TestBase):
assert spam
with self.assertRaises(ValueError):
- interpreters.run_func(self.id, script)
+ _interpreters.run_func(self.id, script)
# XXX This hasn't been fixed yet.
@unittest.expectedFailure
@@ -1099,38 +1113,38 @@ class RunFuncTests(TestBase):
def script():
return 'spam'
with self.assertRaises(ValueError):
- interpreters.run_func(self.id, script)
+ _interpreters.run_func(self.id, script)
def test_args(self):
with self.subTest('args'):
def script(a, b=0):
assert a == b
with self.assertRaises(ValueError):
- interpreters.run_func(self.id, script)
+ _interpreters.run_func(self.id, script)
with self.subTest('*args'):
def script(*args):
assert not args
with self.assertRaises(ValueError):
- interpreters.run_func(self.id, script)
+ _interpreters.run_func(self.id, script)
with self.subTest('**kwargs'):
def script(**kwargs):
assert not kwargs
with self.assertRaises(ValueError):
- interpreters.run_func(self.id, script)
+ _interpreters.run_func(self.id, script)
with self.subTest('kwonly'):
def script(*, spam=True):
assert spam
with self.assertRaises(ValueError):
- interpreters.run_func(self.id, script)
+ _interpreters.run_func(self.id, script)
with self.subTest('posonly'):
def script(spam, /):
assert spam
with self.assertRaises(ValueError):
- interpreters.run_func(self.id, script)
+ _interpreters.run_func(self.id, script)
if __name__ == '__main__':