diff options
Diffstat (limited to 'Lib/test/test_interpreters/test_api.py')
-rw-r--r-- | Lib/test/test_interpreters/test_api.py | 540 |
1 files changed, 502 insertions, 38 deletions
diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py index a326b39..abf66a7 100644 --- a/Lib/test/test_interpreters/test_api.py +++ b/Lib/test/test_interpreters/test_api.py @@ -1,6 +1,7 @@ import os import pickle -from textwrap import dedent +import sys +from textwrap import dedent, indent import threading import types import unittest @@ -10,8 +11,13 @@ from test.support import import_helper # Raise SkipTest if subinterpreters not supported. _interpreters = import_helper.import_module('_xxsubinterpreters') from test.support import interpreters -from test.support.interpreters import InterpreterNotFoundError -from .utils import _captured_script, _run_output, _running, TestBase +from test.support.interpreters import ( + InterpreterError, InterpreterNotFoundError, ExecutionFailed, +) +from .utils import ( + _captured_script, _run_output, _running, TestBase, + requires_test_modules, _testinternalcapi, +) class ModuleTests(TestBase): @@ -157,6 +163,20 @@ class GetCurrentTests(TestBase): id2 = id(interp) self.assertNotEqual(id1, id2) + @requires_test_modules + def test_created_with_capi(self): + last = 0 + for id, *_ in _interpreters.list_all(): + last = max(last, id) + expected = _testinternalcapi.next_interpreter_id() + text = self.run_temp_from_capi(f""" + import {interpreters.__name__} as interpreters + interp = interpreters.get_current() + print(interp.id) + """) + interpid = eval(text) + self.assertEqual(interpid, expected) + class ListAllTests(TestBase): @@ -199,6 +219,33 @@ class ListAllTests(TestBase): for interp1, interp2 in zip(actual, expected): self.assertIs(interp1, interp2) + def test_created_with_capi(self): + mainid, *_ = _interpreters.get_main() + interpid1 = _interpreters.create() + interpid2 = _interpreters.create() + interpid3 = _interpreters.create() + interpid4 = interpid3 + 1 + interpid5 = interpid4 + 1 + expected = [ + (mainid,), + (interpid1,), + (interpid2,), + (interpid3,), + (interpid4,), + (interpid5,), + ] + expected2 = expected[:-2] + text = self.run_temp_from_capi(f""" + import {interpreters.__name__} as interpreters + interp = interpreters.create() + print( + [(i.id,) for i in interpreters.list_all()]) + """) + res = eval(text) + res2 = [(i.id,) for i in interpreters.list_all()] + self.assertEqual(res, expected) + self.assertEqual(res2, expected2) + class InterpreterObjectTests(TestBase): @@ -276,6 +323,7 @@ class TestInterpreterIsRunning(TestBase): main = interpreters.get_main() self.assertTrue(main.is_running()) + # XXX Is this still true? @unittest.skip('Fails on FreeBSD') def test_subinterpreter(self): interp = interpreters.create() @@ -337,6 +385,55 @@ class TestInterpreterIsRunning(TestBase): interp.exec('t.join()') self.assertEqual(os.read(r_interp, 1), FINISHED) + def test_created_with_capi(self): + script = dedent(f""" + import {interpreters.__name__} as interpreters + interp = interpreters.get_current() + print(interp.is_running()) + """) + def parse_results(text): + self.assertNotEqual(text, "") + try: + return eval(text) + except Exception: + raise Exception(repr(text)) + + with self.subTest('running __main__ (from self)'): + with self.interpreter_from_capi() as interpid: + text = self.run_from_capi(interpid, script, main=True) + running = parse_results(text) + self.assertTrue(running) + + with self.subTest('running, but not __main__ (from self)'): + text = self.run_temp_from_capi(script) + running = parse_results(text) + self.assertFalse(running) + + with self.subTest('running __main__ (from other)'): + with self.interpreter_obj_from_capi() as (interp, interpid): + before = interp.is_running() + with self.running_from_capi(interpid, main=True): + during = interp.is_running() + after = interp.is_running() + self.assertFalse(before) + self.assertTrue(during) + self.assertFalse(after) + + with self.subTest('running, but not __main__ (from other)'): + with self.interpreter_obj_from_capi() as (interp, interpid): + before = interp.is_running() + with self.running_from_capi(interpid, main=False): + during = interp.is_running() + after = interp.is_running() + self.assertFalse(before) + self.assertFalse(during) + self.assertFalse(after) + + with self.subTest('not running (from other)'): + with self.interpreter_obj_from_capi() as (interp, _): + running = interp.is_running() + self.assertFalse(running) + class TestInterpreterClose(TestBase): @@ -364,11 +461,11 @@ class TestInterpreterClose(TestBase): def test_main(self): main, = interpreters.list_all() - with self.assertRaises(interpreters.InterpreterError): + with self.assertRaises(InterpreterError): main.close() def f(): - with self.assertRaises(interpreters.InterpreterError): + with self.assertRaises(InterpreterError): main.close() t = threading.Thread(target=f) @@ -419,12 +516,13 @@ class TestInterpreterClose(TestBase): t.start() t.join() + # XXX Is this still true? @unittest.skip('Fails on FreeBSD') def test_still_running(self): main, = interpreters.list_all() interp = interpreters.create() with _running(interp): - with self.assertRaises(interpreters.InterpreterError): + with self.assertRaises(InterpreterError): interp.close() self.assertTrue(interp.is_running()) @@ -459,6 +557,52 @@ class TestInterpreterClose(TestBase): self.assertEqual(os.read(r_interp, 1), FINISHED) + def test_created_with_capi(self): + script = dedent(f""" + import {interpreters.__name__} as interpreters + interp = interpreters.get_current() + interp.close() + """) + + with self.subTest('running __main__ (from self)'): + with self.interpreter_from_capi() as interpid: + with self.assertRaisesRegex(ExecutionFailed, + 'InterpreterError.*current'): + self.run_from_capi(interpid, script, main=True) + + with self.subTest('running, but not __main__ (from self)'): + with self.assertRaisesRegex(ExecutionFailed, + 'InterpreterError.*current'): + self.run_temp_from_capi(script) + + with self.subTest('running __main__ (from other)'): + with self.interpreter_obj_from_capi() as (interp, interpid): + with self.running_from_capi(interpid, main=True): + with self.assertRaisesRegex(InterpreterError, 'running'): + interp.close() + # Make sure it wssn't closed. + self.assertTrue( + interp.is_running()) + + # The rest must be skipped until we deal with running threads when + # interp.close() is called. + return + + with self.subTest('running, but not __main__ (from other)'): + with self.interpreter_obj_from_capi() as (interp, interpid): + with self.running_from_capi(interpid, main=False): + with self.assertRaisesRegex(InterpreterError, 'not managed'): + interp.close() + # Make sure it wssn't closed. + self.assertFalse(interp.is_running()) + + with self.subTest('not running (from other)'): + with self.interpreter_obj_from_capi() as (interp, _): + with self.assertRaisesRegex(InterpreterError, 'not managed'): + interp.close() + # Make sure it wssn't closed. + self.assertFalse(interp.is_running()) + class TestInterpreterPrepareMain(TestBase): @@ -511,26 +655,45 @@ class TestInterpreterPrepareMain(TestBase): interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'}) # Make sure neither was actually bound. - with self.assertRaises(interpreters.ExecutionFailed): + with self.assertRaises(ExecutionFailed): interp.exec('print(foo)') - with self.assertRaises(interpreters.ExecutionFailed): + with self.assertRaises(ExecutionFailed): interp.exec('print(spam)') + def test_running(self): + interp = interpreters.create() + interp.prepare_main({'spam': True}) + with self.running(interp): + with self.assertRaisesRegex(InterpreterError, 'running'): + interp.prepare_main({'spam': False}) + interp.exec('assert spam is True') + + @requires_test_modules + def test_created_with_capi(self): + with self.interpreter_from_capi() as interpid: + interp = interpreters.Interpreter(interpid) + interp.prepare_main({'spam': True}) + rc = _testinternalcapi.exec_interpreter(interpid, + 'assert spam is True') + assert rc == 0, rc + class TestInterpreterExec(TestBase): def test_success(self): interp = interpreters.create() - script, file = _captured_script('print("it worked!", end="")') - with file: + script, results = _captured_script('print("it worked!", end="")') + with results: interp.exec(script) - out = file.read() + results = results.final() + results.raise_if_failed() + out = results.stdout self.assertEqual(out, 'it worked!') def test_failure(self): interp = interpreters.create() - with self.assertRaises(interpreters.ExecutionFailed): + with self.assertRaises(ExecutionFailed): interp.exec('raise Exception') def test_display_preserved_exception(self): @@ -583,15 +746,17 @@ class TestInterpreterExec(TestBase): def test_in_thread(self): interp = interpreters.create() - script, file = _captured_script('print("it worked!", end="")') - with file: + script, results = _captured_script('print("it worked!", end="")') + with results: def f(): interp.exec(script) t = threading.Thread(target=f) t.start() t.join() - out = file.read() + results = results.final() + results.raise_if_failed() + out = results.stdout self.assertEqual(out, 'it worked!') @@ -618,6 +783,7 @@ class TestInterpreterExec(TestBase): content = file.read() self.assertEqual(content, expected) + # XXX Is this still true? @unittest.skip('Fails on FreeBSD') def test_already_running(self): interp = interpreters.create() @@ -666,6 +832,11 @@ class TestInterpreterExec(TestBase): self.assertEqual(os.read(r_interp, 1), RAN) self.assertEqual(os.read(r_interp, 1), FINISHED) + def test_created_with_capi(self): + with self.interpreter_obj_from_capi() as (interp, _): + with self.assertRaisesRegex(ExecutionFailed, 'it worked'): + interp.exec('raise Exception("it worked!")') + # test_xxsubinterpreters covers the remaining # Interpreter.exec() behavior. @@ -830,7 +1001,7 @@ class TestInterpreterCall(TestBase): raise Exception((args, kwargs)) interp.call(callable) - with self.assertRaises(interpreters.ExecutionFailed): + with self.assertRaises(ExecutionFailed): interp.call(call_func_failure) def test_call_in_thread(self): @@ -942,6 +1113,14 @@ class LowLevelTests(TestBase): # encountered by the high-level module, thus they # mostly shouldn't matter as much. + def interp_exists(self, interpid): + try: + _interpreters.is_running(interpid) + except InterpreterNotFoundError: + return False + else: + return True + def test_new_config(self): # This test overlaps with # test.test_capi.test_misc.InterpreterConfigTests. @@ -1064,46 +1243,107 @@ class LowLevelTests(TestBase): with self.assertRaises(ValueError): _interpreters.new_config(gil=value) - def test_get_config(self): - # This test overlaps with - # test.test_capi.test_misc.InterpreterConfigTests. + def test_get_main(self): + interpid, = _interpreters.get_main() + self.assertEqual(interpid, 0) + def test_get_current(self): with self.subTest('main'): - expected = _interpreters.new_config('legacy') - expected.gil = 'own' - interpid = _interpreters.get_main() - config = _interpreters.get_config(interpid) - self.assert_ns_equal(config, expected) + main, *_ = _interpreters.get_main() + interpid, = _interpreters.get_current() + self.assertEqual(interpid, main) + + script = f""" + import {_interpreters.__name__} as _interpreters + interpid, = _interpreters.get_current() + print(interpid) + """ + def parse_stdout(text): + parts = text.split() + assert len(parts) == 1, parts + interpid, = parts + interpid = int(interpid) + return interpid, + + with self.subTest('from _interpreters'): + orig = _interpreters.create() + text = self.run_and_capture(orig, script) + interpid, = parse_stdout(text) + self.assertEqual(interpid, orig) + + with self.subTest('from C-API'): + last = 0 + for id, *_ in _interpreters.list_all(): + last = max(last, id) + expected = last + 1 + text = self.run_temp_from_capi(script) + interpid, = parse_stdout(text) + self.assertEqual(interpid, expected) + + def test_list_all(self): + mainid, *_ = _interpreters.get_main() + interpid1 = _interpreters.create() + interpid2 = _interpreters.create() + interpid3 = _interpreters.create() + expected = [ + (mainid,), + (interpid1,), + (interpid2,), + (interpid3,), + ] - with self.subTest('isolated'): - expected = _interpreters.new_config('isolated') - interpid = _interpreters.create('isolated') - config = _interpreters.get_config(interpid) - self.assert_ns_equal(config, expected) + with self.subTest('main'): + res = _interpreters.list_all() + self.assertEqual(res, expected) + + with self.subTest('from _interpreters'): + text = self.run_and_capture(interpid2, f""" + import {_interpreters.__name__} as _interpreters + print( + _interpreters.list_all()) + """) - with self.subTest('legacy'): - expected = _interpreters.new_config('legacy') - interpid = _interpreters.create('legacy') - config = _interpreters.get_config(interpid) - self.assert_ns_equal(config, expected) + res = eval(text) + self.assertEqual(res, expected) + + with self.subTest('from C-API'): + interpid4 = interpid3 + 1 + interpid5 = interpid4 + 1 + expected2 = expected + [ + (interpid4,), + (interpid5,), + ] + expected3 = expected + [ + (interpid5,), + ] + text = self.run_temp_from_capi(f""" + import {_interpreters.__name__} as _interpreters + _interpreters.create() + print( + _interpreters.list_all()) + """) + res2 = eval(text) + res3 = _interpreters.list_all() + self.assertEqual(res2, expected2) + self.assertEqual(res3, expected3) def test_create(self): isolated = _interpreters.new_config('isolated') legacy = _interpreters.new_config('legacy') default = isolated - with self.subTest('no arg'): + with self.subTest('no args'): interpid = _interpreters.create() config = _interpreters.get_config(interpid) self.assert_ns_equal(config, default) - with self.subTest('arg: None'): + with self.subTest('config: None'): interpid = _interpreters.create(None) config = _interpreters.get_config(interpid) self.assert_ns_equal(config, default) - with self.subTest('arg: \'empty\''): - with self.assertRaises(interpreters.InterpreterError): + with self.subTest('config: \'empty\''): + with self.assertRaises(InterpreterError): # The "empty" config isn't viable on its own. _interpreters.create('empty') @@ -1138,6 +1378,230 @@ class LowLevelTests(TestBase): with self.assertRaises(ValueError): _interpreters.create(orig) + @requires_test_modules + def test_destroy(self): + with self.subTest('from _interpreters'): + interpid = _interpreters.create() + before = [id for id, *_ in _interpreters.list_all()] + _interpreters.destroy(interpid) + after = [id for id, *_ in _interpreters.list_all()] + + self.assertIn(interpid, before) + self.assertNotIn(interpid, after) + self.assertFalse( + self.interp_exists(interpid)) + + with self.subTest('main'): + interpid, *_ = _interpreters.get_main() + with self.assertRaises(InterpreterError): + # It is the current interpreter. + _interpreters.destroy(interpid) + + with self.subTest('from C-API'): + interpid = _testinternalcapi.create_interpreter() + _interpreters.destroy(interpid) + self.assertFalse( + self.interp_exists(interpid)) + + def test_get_config(self): + # This test overlaps with + # test.test_capi.test_misc.InterpreterConfigTests. + + with self.subTest('main'): + expected = _interpreters.new_config('legacy') + expected.gil = 'own' + interpid, *_ = _interpreters.get_main() + config = _interpreters.get_config(interpid) + self.assert_ns_equal(config, expected) + + with self.subTest('main'): + expected = _interpreters.new_config('legacy') + expected.gil = 'own' + interpid, *_ = _interpreters.get_main() + config = _interpreters.get_config(interpid) + self.assert_ns_equal(config, expected) + + with self.subTest('isolated'): + expected = _interpreters.new_config('isolated') + interpid = _interpreters.create('isolated') + config = _interpreters.get_config(interpid) + self.assert_ns_equal(config, expected) + + with self.subTest('legacy'): + expected = _interpreters.new_config('legacy') + interpid = _interpreters.create('legacy') + config = _interpreters.get_config(interpid) + self.assert_ns_equal(config, expected) + + with self.subTest('from C-API'): + orig = _interpreters.new_config('isolated') + with self.interpreter_from_capi(orig) as interpid: + config = _interpreters.get_config(interpid) + self.assert_ns_equal(config, orig) + + @requires_test_modules + def test_whence(self): + with self.subTest('main'): + interpid, *_ = _interpreters.get_main() + whence = _interpreters.whence(interpid) + self.assertEqual(whence, _interpreters.WHENCE_RUNTIME) + + with self.subTest('stdlib'): + interpid = _interpreters.create() + whence = _interpreters.whence(interpid) + self.assertEqual(whence, _interpreters.WHENCE_XI) + + for orig, name in { + # XXX Also check WHENCE_UNKNOWN. + _interpreters.WHENCE_LEGACY_CAPI: 'legacy C-API', + _interpreters.WHENCE_CAPI: 'C-API', + _interpreters.WHENCE_XI: 'cross-interpreter C-API', + }.items(): + with self.subTest(f'from C-API ({orig}: {name})'): + with self.interpreter_from_capi(whence=orig) as interpid: + whence = _interpreters.whence(interpid) + self.assertEqual(whence, orig) + + with self.subTest('from C-API, running'): + text = self.run_temp_from_capi(dedent(f""" + import {_interpreters.__name__} as _interpreters + interpid, *_ = _interpreters.get_current() + print(_interpreters.whence(interpid)) + """), + config=True) + whence = eval(text) + self.assertEqual(whence, _interpreters.WHENCE_CAPI) + + with self.subTest('from legacy C-API, running'): + ... + text = self.run_temp_from_capi(dedent(f""" + import {_interpreters.__name__} as _interpreters + interpid, *_ = _interpreters.get_current() + print(_interpreters.whence(interpid)) + """), + config=False) + whence = eval(text) + self.assertEqual(whence, _interpreters.WHENCE_LEGACY_CAPI) + + def test_is_running(self): + with self.subTest('main'): + interpid, *_ = _interpreters.get_main() + running = _interpreters.is_running(interpid) + self.assertTrue(running) + + with self.subTest('from _interpreters (running)'): + interpid = _interpreters.create() + with self.running(interpid): + running = _interpreters.is_running(interpid) + self.assertTrue(running) + + with self.subTest('from _interpreters (not running)'): + interpid = _interpreters.create() + running = _interpreters.is_running(interpid) + self.assertFalse(running) + + with self.subTest('from C-API (running __main__)'): + with self.interpreter_from_capi() as interpid: + with self.running_from_capi(interpid, main=True): + running = _interpreters.is_running(interpid) + self.assertTrue(running) + + with self.subTest('from C-API (running, but not __main__)'): + with self.interpreter_from_capi() as interpid: + with self.running_from_capi(interpid, main=False): + running = _interpreters.is_running(interpid) + self.assertFalse(running) + + with self.subTest('from C-API (not running)'): + with self.interpreter_from_capi() as interpid: + running = _interpreters.is_running(interpid) + self.assertFalse(running) + + def test_exec(self): + with self.subTest('run script'): + interpid = _interpreters.create() + script, results = _captured_script('print("it worked!", end="")') + with results: + exc = _interpreters.exec(interpid, script) + results = results.final() + results.raise_if_failed() + out = results.stdout + self.assertEqual(out, 'it worked!') + + with self.subTest('uncaught exception'): + interpid = _interpreters.create() + script, results = _captured_script(""" + raise Exception('uh-oh!') + print("it worked!", end="") + """) + with results: + exc = _interpreters.exec(interpid, script) + out = results.stdout() + self.assertEqual(out, '') + self.assert_ns_equal(exc, types.SimpleNamespace( + type=types.SimpleNamespace( + __name__='Exception', + __qualname__='Exception', + __module__='builtins', + ), + msg='uh-oh!', + # We check these in other tests. + formatted=exc.formatted, + errdisplay=exc.errdisplay, + )) + + with self.subTest('from C-API'): + with self.interpreter_from_capi() as interpid: + exc = _interpreters.exec(interpid, 'raise Exception("it worked!")') + self.assertIsNot(exc, None) + self.assertEqual(exc.msg, 'it worked!') + + def test_call(self): + with self.subTest('no args'): + interpid = _interpreters.create() + exc = _interpreters.call(interpid, call_func_return_shareable) + self.assertIs(exc, None) + + with self.subTest('uncaught exception'): + interpid = _interpreters.create() + exc = _interpreters.call(interpid, call_func_failure) + self.assertEqual(exc, types.SimpleNamespace( + type=types.SimpleNamespace( + __name__='Exception', + __qualname__='Exception', + __module__='builtins', + ), + msg='spam!', + # We check these in other tests. + formatted=exc.formatted, + errdisplay=exc.errdisplay, + )) + + def test_set___main___attrs(self): + with self.subTest('from _interpreters'): + interpid = _interpreters.create() + before1 = _interpreters.exec(interpid, 'assert spam == \'eggs\'') + before2 = _interpreters.exec(interpid, 'assert ham == 42') + self.assertEqual(before1.type.__name__, 'NameError') + self.assertEqual(before2.type.__name__, 'NameError') + + _interpreters.set___main___attrs(interpid, dict( + spam='eggs', + ham=42, + )) + after1 = _interpreters.exec(interpid, 'assert spam == \'eggs\'') + after2 = _interpreters.exec(interpid, 'assert ham == 42') + after3 = _interpreters.exec(interpid, 'assert spam == 42') + self.assertIs(after1, None) + self.assertIs(after2, None) + self.assertEqual(after3.type.__name__, 'AssertionError') + + with self.subTest('from C-API'): + with self.interpreter_from_capi() as interpid: + _interpreters.set___main___attrs(interpid, {'spam': True}) + exc = _interpreters.exec(interpid, 'assert spam is True') + self.assertIsNone(exc) + if __name__ == '__main__': # Test needs to be a package, so we can do relative imports. |