diff options
| author | Eric Snow <ericsnowcurrently@gmail.com> | 2024-04-11 00:37:01 (GMT) |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-04-11 00:37:01 (GMT) |
| commit | 993c3cca16ed00a0bfe467f7f26ac4f5f6dfb24c (patch) | |
| tree | 765b64249a8406226b300c9fc8500baa1afec746 /Lib/test/test_interpreters | |
| parent | 0cc71bde001950d3634c235e2b0d24cda6ce7dce (diff) | |
| download | cpython-993c3cca16ed00a0bfe467f7f26ac4f5f6dfb24c.zip cpython-993c3cca16ed00a0bfe467f7f26ac4f5f6dfb24c.tar.gz cpython-993c3cca16ed00a0bfe467f7f26ac4f5f6dfb24c.tar.bz2 | |
gh-76785: Add More Tests to test_interpreters.test_api (gh-117662)
In addition to the increase test coverage, this is a precursor to sorting out how we handle interpreters created directly via the C-API.
Diffstat (limited to 'Lib/test/test_interpreters')
| -rw-r--r-- | Lib/test/test_interpreters/test_api.py | 540 | ||||
| -rw-r--r-- | Lib/test/test_interpreters/utils.py | 523 |
2 files changed, 1011 insertions, 52 deletions
diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py index a326b39..abf66a7 100644 --- a/Lib/test/test_interpreters/test_api.py +++ b/Lib/test/test_interpreters/test_api.py @@ -1,6 +1,7 @@ import os import pickle -from textwrap import dedent +import sys +from textwrap import dedent, indent import threading import types import unittest @@ -10,8 +11,13 @@ from test.support import import_helper # Raise SkipTest if subinterpreters not supported. _interpreters = import_helper.import_module('_xxsubinterpreters') from test.support import interpreters -from test.support.interpreters import InterpreterNotFoundError -from .utils import _captured_script, _run_output, _running, TestBase +from test.support.interpreters import ( + InterpreterError, InterpreterNotFoundError, ExecutionFailed, +) +from .utils import ( + _captured_script, _run_output, _running, TestBase, + requires_test_modules, _testinternalcapi, +) class ModuleTests(TestBase): @@ -157,6 +163,20 @@ class GetCurrentTests(TestBase): id2 = id(interp) self.assertNotEqual(id1, id2) + @requires_test_modules + def test_created_with_capi(self): + last = 0 + for id, *_ in _interpreters.list_all(): + last = max(last, id) + expected = _testinternalcapi.next_interpreter_id() + text = self.run_temp_from_capi(f""" + import {interpreters.__name__} as interpreters + interp = interpreters.get_current() + print(interp.id) + """) + interpid = eval(text) + self.assertEqual(interpid, expected) + class ListAllTests(TestBase): @@ -199,6 +219,33 @@ class ListAllTests(TestBase): for interp1, interp2 in zip(actual, expected): self.assertIs(interp1, interp2) + def test_created_with_capi(self): + mainid, *_ = _interpreters.get_main() + interpid1 = _interpreters.create() + interpid2 = _interpreters.create() + interpid3 = _interpreters.create() + interpid4 = interpid3 + 1 + interpid5 = interpid4 + 1 + expected = [ + (mainid,), + (interpid1,), + (interpid2,), + (interpid3,), + (interpid4,), + (interpid5,), + ] + expected2 = expected[:-2] + text = self.run_temp_from_capi(f""" + import {interpreters.__name__} as interpreters + interp = interpreters.create() + print( + [(i.id,) for i in interpreters.list_all()]) + """) + res = eval(text) + res2 = [(i.id,) for i in interpreters.list_all()] + self.assertEqual(res, expected) + self.assertEqual(res2, expected2) + class InterpreterObjectTests(TestBase): @@ -276,6 +323,7 @@ class TestInterpreterIsRunning(TestBase): main = interpreters.get_main() self.assertTrue(main.is_running()) + # XXX Is this still true? @unittest.skip('Fails on FreeBSD') def test_subinterpreter(self): interp = interpreters.create() @@ -337,6 +385,55 @@ class TestInterpreterIsRunning(TestBase): interp.exec('t.join()') self.assertEqual(os.read(r_interp, 1), FINISHED) + def test_created_with_capi(self): + script = dedent(f""" + import {interpreters.__name__} as interpreters + interp = interpreters.get_current() + print(interp.is_running()) + """) + def parse_results(text): + self.assertNotEqual(text, "") + try: + return eval(text) + except Exception: + raise Exception(repr(text)) + + with self.subTest('running __main__ (from self)'): + with self.interpreter_from_capi() as interpid: + text = self.run_from_capi(interpid, script, main=True) + running = parse_results(text) + self.assertTrue(running) + + with self.subTest('running, but not __main__ (from self)'): + text = self.run_temp_from_capi(script) + running = parse_results(text) + self.assertFalse(running) + + with self.subTest('running __main__ (from other)'): + with self.interpreter_obj_from_capi() as (interp, interpid): + before = interp.is_running() + with self.running_from_capi(interpid, main=True): + during = interp.is_running() + after = interp.is_running() + self.assertFalse(before) + self.assertTrue(during) + self.assertFalse(after) + + with self.subTest('running, but not __main__ (from other)'): + with self.interpreter_obj_from_capi() as (interp, interpid): + before = interp.is_running() + with self.running_from_capi(interpid, main=False): + during = interp.is_running() + after = interp.is_running() + self.assertFalse(before) + self.assertFalse(during) + self.assertFalse(after) + + with self.subTest('not running (from other)'): + with self.interpreter_obj_from_capi() as (interp, _): + running = interp.is_running() + self.assertFalse(running) + class TestInterpreterClose(TestBase): @@ -364,11 +461,11 @@ class TestInterpreterClose(TestBase): def test_main(self): main, = interpreters.list_all() - with self.assertRaises(interpreters.InterpreterError): + with self.assertRaises(InterpreterError): main.close() def f(): - with self.assertRaises(interpreters.InterpreterError): + with self.assertRaises(InterpreterError): main.close() t = threading.Thread(target=f) @@ -419,12 +516,13 @@ class TestInterpreterClose(TestBase): t.start() t.join() + # XXX Is this still true? @unittest.skip('Fails on FreeBSD') def test_still_running(self): main, = interpreters.list_all() interp = interpreters.create() with _running(interp): - with self.assertRaises(interpreters.InterpreterError): + with self.assertRaises(InterpreterError): interp.close() self.assertTrue(interp.is_running()) @@ -459,6 +557,52 @@ class TestInterpreterClose(TestBase): self.assertEqual(os.read(r_interp, 1), FINISHED) + def test_created_with_capi(self): + script = dedent(f""" + import {interpreters.__name__} as interpreters + interp = interpreters.get_current() + interp.close() + """) + + with self.subTest('running __main__ (from self)'): + with self.interpreter_from_capi() as interpid: + with self.assertRaisesRegex(ExecutionFailed, + 'InterpreterError.*current'): + self.run_from_capi(interpid, script, main=True) + + with self.subTest('running, but not __main__ (from self)'): + with self.assertRaisesRegex(ExecutionFailed, + 'InterpreterError.*current'): + self.run_temp_from_capi(script) + + with self.subTest('running __main__ (from other)'): + with self.interpreter_obj_from_capi() as (interp, interpid): + with self.running_from_capi(interpid, main=True): + with self.assertRaisesRegex(InterpreterError, 'running'): + interp.close() + # Make sure it wssn't closed. + self.assertTrue( + interp.is_running()) + + # The rest must be skipped until we deal with running threads when + # interp.close() is called. + return + + with self.subTest('running, but not __main__ (from other)'): + with self.interpreter_obj_from_capi() as (interp, interpid): + with self.running_from_capi(interpid, main=False): + with self.assertRaisesRegex(InterpreterError, 'not managed'): + interp.close() + # Make sure it wssn't closed. + self.assertFalse(interp.is_running()) + + with self.subTest('not running (from other)'): + with self.interpreter_obj_from_capi() as (interp, _): + with self.assertRaisesRegex(InterpreterError, 'not managed'): + interp.close() + # Make sure it wssn't closed. + self.assertFalse(interp.is_running()) + class TestInterpreterPrepareMain(TestBase): @@ -511,26 +655,45 @@ class TestInterpreterPrepareMain(TestBase): interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'}) # Make sure neither was actually bound. - with self.assertRaises(interpreters.ExecutionFailed): + with self.assertRaises(ExecutionFailed): interp.exec('print(foo)') - with self.assertRaises(interpreters.ExecutionFailed): + with self.assertRaises(ExecutionFailed): interp.exec('print(spam)') + def test_running(self): + interp = interpreters.create() + interp.prepare_main({'spam': True}) + with self.running(interp): + with self.assertRaisesRegex(InterpreterError, 'running'): + interp.prepare_main({'spam': False}) + interp.exec('assert spam is True') + + @requires_test_modules + def test_created_with_capi(self): + with self.interpreter_from_capi() as interpid: + interp = interpreters.Interpreter(interpid) + interp.prepare_main({'spam': True}) + rc = _testinternalcapi.exec_interpreter(interpid, + 'assert spam is True') + assert rc == 0, rc + class TestInterpreterExec(TestBase): def test_success(self): interp = interpreters.create() - script, file = _captured_script('print("it worked!", end="")') - with file: + script, results = _captured_script('print("it worked!", end="")') + with results: interp.exec(script) - out = file.read() + results = results.final() + results.raise_if_failed() + out = results.stdout self.assertEqual(out, 'it worked!') def test_failure(self): interp = interpreters.create() - with self.assertRaises(interpreters.ExecutionFailed): + with self.assertRaises(ExecutionFailed): interp.exec('raise Exception') def test_display_preserved_exception(self): @@ -583,15 +746,17 @@ class TestInterpreterExec(TestBase): def test_in_thread(self): interp = interpreters.create() - script, file = _captured_script('print("it worked!", end="")') - with file: + script, results = _captured_script('print("it worked!", end="")') + with results: def f(): interp.exec(script) t = threading.Thread(target=f) t.start() t.join() - out = file.read() + results = results.final() + results.raise_if_failed() + out = results.stdout self.assertEqual(out, 'it worked!') @@ -618,6 +783,7 @@ class TestInterpreterExec(TestBase): content = file.read() self.assertEqual(content, expected) + # XXX Is this still true? @unittest.skip('Fails on FreeBSD') def test_already_running(self): interp = interpreters.create() @@ -666,6 +832,11 @@ class TestInterpreterExec(TestBase): self.assertEqual(os.read(r_interp, 1), RAN) self.assertEqual(os.read(r_interp, 1), FINISHED) + def test_created_with_capi(self): + with self.interpreter_obj_from_capi() as (interp, _): + with self.assertRaisesRegex(ExecutionFailed, 'it worked'): + interp.exec('raise Exception("it worked!")') + # test_xxsubinterpreters covers the remaining # Interpreter.exec() behavior. @@ -830,7 +1001,7 @@ class TestInterpreterCall(TestBase): raise Exception((args, kwargs)) interp.call(callable) - with self.assertRaises(interpreters.ExecutionFailed): + with self.assertRaises(ExecutionFailed): interp.call(call_func_failure) def test_call_in_thread(self): @@ -942,6 +1113,14 @@ class LowLevelTests(TestBase): # encountered by the high-level module, thus they # mostly shouldn't matter as much. + def interp_exists(self, interpid): + try: + _interpreters.is_running(interpid) + except InterpreterNotFoundError: + return False + else: + return True + def test_new_config(self): # This test overlaps with # test.test_capi.test_misc.InterpreterConfigTests. @@ -1064,46 +1243,107 @@ class LowLevelTests(TestBase): with self.assertRaises(ValueError): _interpreters.new_config(gil=value) - def test_get_config(self): - # This test overlaps with - # test.test_capi.test_misc.InterpreterConfigTests. + def test_get_main(self): + interpid, = _interpreters.get_main() + self.assertEqual(interpid, 0) + def test_get_current(self): with self.subTest('main'): - expected = _interpreters.new_config('legacy') - expected.gil = 'own' - interpid = _interpreters.get_main() - config = _interpreters.get_config(interpid) - self.assert_ns_equal(config, expected) + main, *_ = _interpreters.get_main() + interpid, = _interpreters.get_current() + self.assertEqual(interpid, main) + + script = f""" + import {_interpreters.__name__} as _interpreters + interpid, = _interpreters.get_current() + print(interpid) + """ + def parse_stdout(text): + parts = text.split() + assert len(parts) == 1, parts + interpid, = parts + interpid = int(interpid) + return interpid, + + with self.subTest('from _interpreters'): + orig = _interpreters.create() + text = self.run_and_capture(orig, script) + interpid, = parse_stdout(text) + self.assertEqual(interpid, orig) + + with self.subTest('from C-API'): + last = 0 + for id, *_ in _interpreters.list_all(): + last = max(last, id) + expected = last + 1 + text = self.run_temp_from_capi(script) + interpid, = parse_stdout(text) + self.assertEqual(interpid, expected) + + def test_list_all(self): + mainid, *_ = _interpreters.get_main() + interpid1 = _interpreters.create() + interpid2 = _interpreters.create() + interpid3 = _interpreters.create() + expected = [ + (mainid,), + (interpid1,), + (interpid2,), + (interpid3,), + ] - with self.subTest('isolated'): - expected = _interpreters.new_config('isolated') - interpid = _interpreters.create('isolated') - config = _interpreters.get_config(interpid) - self.assert_ns_equal(config, expected) + with self.subTest('main'): + res = _interpreters.list_all() + self.assertEqual(res, expected) + + with self.subTest('from _interpreters'): + text = self.run_and_capture(interpid2, f""" + import {_interpreters.__name__} as _interpreters + print( + _interpreters.list_all()) + """) - with self.subTest('legacy'): - expected = _interpreters.new_config('legacy') - interpid = _interpreters.create('legacy') - config = _interpreters.get_config(interpid) - self.assert_ns_equal(config, expected) + res = eval(text) + self.assertEqual(res, expected) + + with self.subTest('from C-API'): + interpid4 = interpid3 + 1 + interpid5 = interpid4 + 1 + expected2 = expected + [ + (interpid4,), + (interpid5,), + ] + expected3 = expected + [ + (interpid5,), + ] + text = self.run_temp_from_capi(f""" + import {_interpreters.__name__} as _interpreters + _interpreters.create() + print( + _interpreters.list_all()) + """) + res2 = eval(text) + res3 = _interpreters.list_all() + self.assertEqual(res2, expected2) + self.assertEqual(res3, expected3) def test_create(self): isolated = _interpreters.new_config('isolated') legacy = _interpreters.new_config('legacy') default = isolated - with self.subTest('no arg'): + with self.subTest('no args'): interpid = _interpreters.create() config = _interpreters.get_config(interpid) self.assert_ns_equal(config, default) - with self.subTest('arg: None'): + with self.subTest('config: None'): interpid = _interpreters.create(None) config = _interpreters.get_config(interpid) self.assert_ns_equal(config, default) - with self.subTest('arg: \'empty\''): - with self.assertRaises(interpreters.InterpreterError): + with self.subTest('config: \'empty\''): + with self.assertRaises(InterpreterError): # The "empty" config isn't viable on its own. _interpreters.create('empty') @@ -1138,6 +1378,230 @@ class LowLevelTests(TestBase): with self.assertRaises(ValueError): _interpreters.create(orig) + @requires_test_modules + def test_destroy(self): + with self.subTest('from _interpreters'): + interpid = _interpreters.create() + before = [id for id, *_ in _interpreters.list_all()] + _interpreters.destroy(interpid) + after = [id for id, *_ in _interpreters.list_all()] + + self.assertIn(interpid, before) + self.assertNotIn(interpid, after) + self.assertFalse( + self.interp_exists(interpid)) + + with self.subTest('main'): + interpid, *_ = _interpreters.get_main() + with self.assertRaises(InterpreterError): + # It is the current interpreter. + _interpreters.destroy(interpid) + + with self.subTest('from C-API'): + interpid = _testinternalcapi.create_interpreter() + _interpreters.destroy(interpid) + self.assertFalse( + self.interp_exists(interpid)) + + def test_get_config(self): + # This test overlaps with + # test.test_capi.test_misc.InterpreterConfigTests. + + with self.subTest('main'): + expected = _interpreters.new_config('legacy') + expected.gil = 'own' + interpid, *_ = _interpreters.get_main() + config = _interpreters.get_config(interpid) + self.assert_ns_equal(config, expected) + + with self.subTest('main'): + expected = _interpreters.new_config('legacy') + expected.gil = 'own' + interpid, *_ = _interpreters.get_main() + config = _interpreters.get_config(interpid) + self.assert_ns_equal(config, expected) + + with self.subTest('isolated'): + expected = _interpreters.new_config('isolated') + interpid = _interpreters.create('isolated') + config = _interpreters.get_config(interpid) + self.assert_ns_equal(config, expected) + + with self.subTest('legacy'): + expected = _interpreters.new_config('legacy') + interpid = _interpreters.create('legacy') + config = _interpreters.get_config(interpid) + self.assert_ns_equal(config, expected) + + with self.subTest('from C-API'): + orig = _interpreters.new_config('isolated') + with self.interpreter_from_capi(orig) as interpid: + config = _interpreters.get_config(interpid) + self.assert_ns_equal(config, orig) + + @requires_test_modules + def test_whence(self): + with self.subTest('main'): + interpid, *_ = _interpreters.get_main() + whence = _interpreters.whence(interpid) + self.assertEqual(whence, _interpreters.WHENCE_RUNTIME) + + with self.subTest('stdlib'): + interpid = _interpreters.create() + whence = _interpreters.whence(interpid) + self.assertEqual(whence, _interpreters.WHENCE_XI) + + for orig, name in { + # XXX Also check WHENCE_UNKNOWN. + _interpreters.WHENCE_LEGACY_CAPI: 'legacy C-API', + _interpreters.WHENCE_CAPI: 'C-API', + _interpreters.WHENCE_XI: 'cross-interpreter C-API', + }.items(): + with self.subTest(f'from C-API ({orig}: {name})'): + with self.interpreter_from_capi(whence=orig) as interpid: + whence = _interpreters.whence(interpid) + self.assertEqual(whence, orig) + + with self.subTest('from C-API, running'): + text = self.run_temp_from_capi(dedent(f""" + import {_interpreters.__name__} as _interpreters + interpid, *_ = _interpreters.get_current() + print(_interpreters.whence(interpid)) + """), + config=True) + whence = eval(text) + self.assertEqual(whence, _interpreters.WHENCE_CAPI) + + with self.subTest('from legacy C-API, running'): + ... + text = self.run_temp_from_capi(dedent(f""" + import {_interpreters.__name__} as _interpreters + interpid, *_ = _interpreters.get_current() + print(_interpreters.whence(interpid)) + """), + config=False) + whence = eval(text) + self.assertEqual(whence, _interpreters.WHENCE_LEGACY_CAPI) + + def test_is_running(self): + with self.subTest('main'): + interpid, *_ = _interpreters.get_main() + running = _interpreters.is_running(interpid) + self.assertTrue(running) + + with self.subTest('from _interpreters (running)'): + interpid = _interpreters.create() + with self.running(interpid): + running = _interpreters.is_running(interpid) + self.assertTrue(running) + + with self.subTest('from _interpreters (not running)'): + interpid = _interpreters.create() + running = _interpreters.is_running(interpid) + self.assertFalse(running) + + with self.subTest('from C-API (running __main__)'): + with self.interpreter_from_capi() as interpid: + with self.running_from_capi(interpid, main=True): + running = _interpreters.is_running(interpid) + self.assertTrue(running) + + with self.subTest('from C-API (running, but not __main__)'): + with self.interpreter_from_capi() as interpid: + with self.running_from_capi(interpid, main=False): + running = _interpreters.is_running(interpid) + self.assertFalse(running) + + with self.subTest('from C-API (not running)'): + with self.interpreter_from_capi() as interpid: + running = _interpreters.is_running(interpid) + self.assertFalse(running) + + def test_exec(self): + with self.subTest('run script'): + interpid = _interpreters.create() + script, results = _captured_script('print("it worked!", end="")') + with results: + exc = _interpreters.exec(interpid, script) + results = results.final() + results.raise_if_failed() + out = results.stdout + self.assertEqual(out, 'it worked!') + + with self.subTest('uncaught exception'): + interpid = _interpreters.create() + script, results = _captured_script(""" + raise Exception('uh-oh!') + print("it worked!", end="") + """) + with results: + exc = _interpreters.exec(interpid, script) + out = results.stdout() + self.assertEqual(out, '') + self.assert_ns_equal(exc, types.SimpleNamespace( + type=types.SimpleNamespace( + __name__='Exception', + __qualname__='Exception', + __module__='builtins', + ), + msg='uh-oh!', + # We check these in other tests. + formatted=exc.formatted, + errdisplay=exc.errdisplay, + )) + + with self.subTest('from C-API'): + with self.interpreter_from_capi() as interpid: + exc = _interpreters.exec(interpid, 'raise Exception("it worked!")') + self.assertIsNot(exc, None) + self.assertEqual(exc.msg, 'it worked!') + + def test_call(self): + with self.subTest('no args'): + interpid = _interpreters.create() + exc = _interpreters.call(interpid, call_func_return_shareable) + self.assertIs(exc, None) + + with self.subTest('uncaught exception'): + interpid = _interpreters.create() + exc = _interpreters.call(interpid, call_func_failure) + self.assertEqual(exc, types.SimpleNamespace( + type=types.SimpleNamespace( + __name__='Exception', + __qualname__='Exception', + __module__='builtins', + ), + msg='spam!', + # We check these in other tests. + formatted=exc.formatted, + errdisplay=exc.errdisplay, + )) + + def test_set___main___attrs(self): + with self.subTest('from _interpreters'): + interpid = _interpreters.create() + before1 = _interpreters.exec(interpid, 'assert spam == \'eggs\'') + before2 = _interpreters.exec(interpid, 'assert ham == 42') + self.assertEqual(before1.type.__name__, 'NameError') + self.assertEqual(before2.type.__name__, 'NameError') + + _interpreters.set___main___attrs(interpid, dict( + spam='eggs', + ham=42, + )) + after1 = _interpreters.exec(interpid, 'assert spam == \'eggs\'') + after2 = _interpreters.exec(interpid, 'assert ham == 42') + after3 = _interpreters.exec(interpid, 'assert spam == 42') + self.assertIs(after1, None) + self.assertIs(after2, None) + self.assertEqual(after3.type.__name__, 'AssertionError') + + with self.subTest('from C-API'): + with self.interpreter_from_capi() as interpid: + _interpreters.set___main___attrs(interpid, {'spam': True}) + exc = _interpreters.exec(interpid, 'assert spam is True') + self.assertIsNone(exc) + if __name__ == '__main__': # Test needs to be a package, so we can do relative imports. diff --git a/Lib/test/test_interpreters/utils.py b/Lib/test/test_interpreters/utils.py index 5ade676..d921794 100644 --- a/Lib/test/test_interpreters/utils.py +++ b/Lib/test/test_interpreters/utils.py @@ -1,30 +1,344 @@ +from collections import namedtuple import contextlib +import json +import io import os import os.path +import pickle +import queue +#import select import subprocess import sys import tempfile -from textwrap import dedent +from textwrap import dedent, indent import threading import types import unittest +import warnings from test import support from test.support import os_helper +from test.support import import_helper +_interpreters = import_helper.import_module('_xxsubinterpreters') from test.support import interpreters -def _captured_script(script): - r, w = os.pipe() - indented = script.replace('\n', '\n ') - wrapped = dedent(f""" - import contextlib - with open({w}, 'w', encoding='utf-8') as spipe: - with contextlib.redirect_stdout(spipe): +try: + import _testinternalcapi + import _testcapi +except ImportError: + _testinternalcapi = None + _testcapi = None + +def requires_test_modules(func): + return unittest.skipIf(_testinternalcapi is None, "test requires _testinternalcapi module")(func) + + +def _dump_script(text): + lines = text.splitlines() + print() + print('-' * 20) + for i, line in enumerate(lines, 1): + print(f' {i:>{len(str(len(lines)))}} {line}') + print('-' * 20) + + +def _close_file(file): + try: + if hasattr(file, 'close'): + file.close() + else: + os.close(file) + except OSError as exc: + if exc.errno != 9: + raise # re-raise + # It was closed already. + + +def pack_exception(exc=None): + captured = _interpreters.capture_exception(exc) + data = dict(captured.__dict__) + data['type'] = dict(captured.type.__dict__) + return json.dumps(data) + + +def unpack_exception(packed): + try: + data = json.loads(packed) + except json.decoder.JSONDecodeError: + warnings.warn('incomplete exception data', RuntimeWarning) + print(packed if isinstance(packed, str) else packed.decode('utf-8')) + return None + exc = types.SimpleNamespace(**data) + exc.type = types.SimpleNamespace(**exc.type) + return exc; + + +class CapturingResults: + + STDIO = dedent("""\ + with open({w_pipe}, 'wb', buffering=0) as _spipe_{stream}: + _captured_std{stream} = io.StringIO() + with contextlib.redirect_std{stream}(_captured_std{stream}): + ######################### + # begin wrapped script + + {indented} + + # end wrapped script + ######################### + text = _captured_std{stream}.getvalue() + _spipe_{stream}.write(text.encode('utf-8')) + """)[:-1] + EXC = dedent("""\ + with open({w_pipe}, 'wb', buffering=0) as _spipe_exc: + try: + ######################### + # begin wrapped script + {indented} - """) - return wrapped, open(r, encoding='utf-8') + + # end wrapped script + ######################### + except Exception as exc: + text = _interp_utils.pack_exception(exc) + _spipe_exc.write(text.encode('utf-8')) + """)[:-1] + + @classmethod + def wrap_script(cls, script, *, stdout=True, stderr=False, exc=False): + script = dedent(script).strip(os.linesep) + imports = [ + f'import {__name__} as _interp_utils', + ] + wrapped = script + + # Handle exc. + if exc: + exc = os.pipe() + r_exc, w_exc = exc + indented = wrapped.replace('\n', '\n ') + wrapped = cls.EXC.format( + w_pipe=w_exc, + indented=indented, + ) + else: + exc = None + + # Handle stdout. + if stdout: + imports.extend([ + 'import contextlib, io', + ]) + stdout = os.pipe() + r_out, w_out = stdout + indented = wrapped.replace('\n', '\n ') + wrapped = cls.STDIO.format( + w_pipe=w_out, + indented=indented, + stream='out', + ) + else: + stdout = None + + # Handle stderr. + if stderr == 'stdout': + stderr = None + elif stderr: + if not stdout: + imports.extend([ + 'import contextlib, io', + ]) + stderr = os.pipe() + r_err, w_err = stderr + indented = wrapped.replace('\n', '\n ') + wrapped = cls.STDIO.format( + w_pipe=w_err, + indented=indented, + stream='err', + ) + else: + stderr = None + + if wrapped == script: + raise NotImplementedError + else: + for line in imports: + wrapped = f'{line}{os.linesep}{wrapped}' + + results = cls(stdout, stderr, exc) + return wrapped, results + + def __init__(self, out, err, exc): + self._rf_out = None + self._rf_err = None + self._rf_exc = None + self._w_out = None + self._w_err = None + self._w_exc = None + + if out is not None: + r_out, w_out = out + self._rf_out = open(r_out, 'rb', buffering=0) + self._w_out = w_out + + if err is not None: + r_err, w_err = err + self._rf_err = open(r_err, 'rb', buffering=0) + self._w_err = w_err + + if exc is not None: + r_exc, w_exc = exc + self._rf_exc = open(r_exc, 'rb', buffering=0) + self._w_exc = w_exc + + self._buf_out = b'' + self._buf_err = b'' + self._buf_exc = b'' + self._exc = None + + self._closed = False + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + @property + def closed(self): + return self._closed + + def close(self): + if self._closed: + return + self._closed = True + + if self._w_out is not None: + _close_file(self._w_out) + self._w_out = None + if self._w_err is not None: + _close_file(self._w_err) + self._w_err = None + if self._w_exc is not None: + _close_file(self._w_exc) + self._w_exc = None + + self._capture() + + if self._rf_out is not None: + _close_file(self._rf_out) + self._rf_out = None + if self._rf_err is not None: + _close_file(self._rf_err) + self._rf_err = None + if self._rf_exc is not None: + _close_file(self._rf_exc) + self._rf_exc = None + + def _capture(self): + # Ideally this is called only after the script finishes + # (and thus has closed the write end of the pipe. + if self._rf_out is not None: + chunk = self._rf_out.read(100) + while chunk: + self._buf_out += chunk + chunk = self._rf_out.read(100) + if self._rf_err is not None: + chunk = self._rf_err.read(100) + while chunk: + self._buf_err += chunk + chunk = self._rf_err.read(100) + if self._rf_exc is not None: + chunk = self._rf_exc.read(100) + while chunk: + self._buf_exc += chunk + chunk = self._rf_exc.read(100) + + def _unpack_stdout(self): + return self._buf_out.decode('utf-8') + + def _unpack_stderr(self): + return self._buf_err.decode('utf-8') + + def _unpack_exc(self): + if self._exc is not None: + return self._exc + if not self._buf_exc: + return None + self._exc = unpack_exception(self._buf_exc) + return self._exc + + def stdout(self): + if self.closed: + return self.final().stdout + self._capture() + return self._unpack_stdout() + + def stderr(self): + if self.closed: + return self.final().stderr + self._capture() + return self._unpack_stderr() + + def exc(self): + if self.closed: + return self.final().exc + self._capture() + return self._unpack_exc() + + def final(self, *, force=False): + try: + return self._final + except AttributeError: + if not self._closed: + if not force: + raise Exception('no final results available yet') + else: + return CapturedResults.Proxy(self) + self._final = CapturedResults( + self._unpack_stdout(), + self._unpack_stderr(), + self._unpack_exc(), + ) + return self._final + + +class CapturedResults(namedtuple('CapturedResults', 'stdout stderr exc')): + + class Proxy: + def __init__(self, capturing): + self._capturing = capturing + def _finish(self): + if self._capturing is None: + return + self._final = self._capturing.final() + self._capturing = None + def __iter__(self): + self._finish() + yield from self._final + def __len__(self): + self._finish() + return len(self._final) + def __getattr__(self, name): + self._finish() + if name.startswith('_'): + raise AttributeError(name) + return getattr(self._final, name) + + def raise_if_failed(self): + if self.exc is not None: + raise interpreters.ExecutionFailed(self.exc) + + +def _captured_script(script, *, stdout=True, stderr=False, exc=False): + return CapturingResults.wrap_script( + script, + stdout=stdout, + stderr=stderr, + exc=exc, + ) def clean_up_interpreters(): @@ -33,17 +347,17 @@ def clean_up_interpreters(): continue try: interp.close() - except RuntimeError: + except _interpreters.InterpreterError: pass # already destroyed def _run_output(interp, request, init=None): - script, rpipe = _captured_script(request) - with rpipe: + script, results = _captured_script(request) + with results: if init: interp.prepare_main(init) interp.exec(script) - return rpipe.read() + return results.stdout() @contextlib.contextmanager @@ -175,3 +489,184 @@ class TestBase(unittest.TestCase): diff = f'namespace({diff})' standardMsg = self._truncateMessage(standardMsg, diff) self.fail(self._formatMessage(msg, standardMsg)) + + def _run_string(self, interp, script): + wrapped, results = _captured_script(script, exc=False) + #_dump_script(wrapped) + with results: + if isinstance(interp, interpreters.Interpreter): + interp.exec(script) + else: + err = _interpreters.run_string(interp, wrapped) + if err is not None: + return None, err + return results.stdout(), None + + def run_and_capture(self, interp, script): + text, err = self._run_string(interp, script) + if err is not None: + raise interpreters.ExecutionFailed(err) + else: + return text + + @requires_test_modules + @contextlib.contextmanager + def interpreter_from_capi(self, config=None, whence=None): + if config is False: + if whence is None: + whence = _interpreters.WHENCE_LEGACY_CAPI + else: + assert whence in (_interpreters.WHENCE_LEGACY_CAPI, + _interpreters.WHENCE_UNKNOWN), repr(whence) + config = None + elif config is True: + config = _interpreters.new_config('default') + elif config is None: + if whence not in ( + _interpreters.WHENCE_LEGACY_CAPI, + _interpreters.WHENCE_UNKNOWN, + ): + config = _interpreters.new_config('legacy') + elif isinstance(config, str): + config = _interpreters.new_config(config) + + if whence is None: + whence = _interpreters.WHENCE_XI + + interpid = _testinternalcapi.create_interpreter(config, whence=whence) + try: + yield interpid + finally: + try: + _testinternalcapi.destroy_interpreter(interpid) + except _interpreters.InterpreterNotFoundError: + pass + + @contextlib.contextmanager + def interpreter_obj_from_capi(self, config='legacy'): + with self.interpreter_from_capi(config) as interpid: + yield interpreters.Interpreter(interpid), interpid + + @contextlib.contextmanager + def capturing(self, script): + wrapped, capturing = _captured_script(script, stdout=True, exc=True) + #_dump_script(wrapped) + with capturing: + yield wrapped, capturing.final(force=True) + + @requires_test_modules + def run_from_capi(self, interpid, script, *, main=False): + with self.capturing(script) as (wrapped, results): + rc = _testinternalcapi.exec_interpreter(interpid, wrapped, main=main) + assert rc == 0, rc + results.raise_if_failed() + return results.stdout + + @contextlib.contextmanager + def _running(self, run_interp, exec_interp): + token = b'\0' + r_in, w_in = self.pipe() + r_out, w_out = self.pipe() + + def close(): + _close_file(r_in) + _close_file(w_in) + _close_file(r_out) + _close_file(w_out) + + # Start running (and wait). + script = dedent(f""" + import os + try: + # handshake + token = os.read({r_in}, 1) + os.write({w_out}, token) + # Wait for the "done" message. + os.read({r_in}, 1) + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != 9: + raise # re-raise + # It was closed already. + """) + failed = None + def run(): + nonlocal failed + try: + run_interp(script) + except Exception as exc: + failed = exc + close() + t = threading.Thread(target=run) + t.start() + + # handshake + try: + os.write(w_in, token) + token2 = os.read(r_out, 1) + assert token2 == token, (token2, token) + except OSError: + t.join() + if failed is not None: + raise failed + + # CM __exit__() + try: + try: + yield + finally: + # Send "done". + os.write(w_in, b'\0') + finally: + close() + t.join() + if failed is not None: + raise failed + + @contextlib.contextmanager + def running(self, interp): + if isinstance(interp, int): + interpid = interp + def exec_interp(script): + exc = _interpreters.exec(interpid, script) + assert exc is None, exc + run_interp = exec_interp + else: + def run_interp(script): + text = self.run_and_capture(interp, script) + assert text == '', repr(text) + def exec_interp(script): + interp.exec(script) + with self._running(run_interp, exec_interp): + yield + + @requires_test_modules + @contextlib.contextmanager + def running_from_capi(self, interpid, *, main=False): + def run_interp(script): + text = self.run_from_capi(interpid, script, main=main) + assert text == '', repr(text) + def exec_interp(script): + rc = _testinternalcapi.exec_interpreter(interpid, script) + assert rc == 0, rc + with self._running(run_interp, exec_interp): + yield + + @requires_test_modules + def run_temp_from_capi(self, script, config='legacy'): + if config is False: + # Force using Py_NewInterpreter(). + run_in_interp = (lambda s, c: _testcapi.run_in_subinterp(s)) + config = None + else: + run_in_interp = _testinternalcapi.run_in_subinterp_with_config + if config is True: + config = 'default' + if isinstance(config, str): + config = _interpreters.new_config(config) + with self.capturing(script) as (wrapped, results): + rc = run_in_interp(wrapped, config) + assert rc == 0, rc + results.raise_if_failed() + return results.stdout |
