summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_interpreters/utils.py
diff options
context:
space:
mode:
authorEric Snow <ericsnowcurrently@gmail.com>2024-04-11 00:37:01 (GMT)
committerGitHub <noreply@github.com>2024-04-11 00:37:01 (GMT)
commit993c3cca16ed00a0bfe467f7f26ac4f5f6dfb24c (patch)
tree765b64249a8406226b300c9fc8500baa1afec746 /Lib/test/test_interpreters/utils.py
parent0cc71bde001950d3634c235e2b0d24cda6ce7dce (diff)
downloadcpython-993c3cca16ed00a0bfe467f7f26ac4f5f6dfb24c.zip
cpython-993c3cca16ed00a0bfe467f7f26ac4f5f6dfb24c.tar.gz
cpython-993c3cca16ed00a0bfe467f7f26ac4f5f6dfb24c.tar.bz2
gh-76785: Add More Tests to test_interpreters.test_api (gh-117662)
In addition to the increase test coverage, this is a precursor to sorting out how we handle interpreters created directly via the C-API.
Diffstat (limited to 'Lib/test/test_interpreters/utils.py')
-rw-r--r--Lib/test/test_interpreters/utils.py523
1 files changed, 509 insertions, 14 deletions
diff --git a/Lib/test/test_interpreters/utils.py b/Lib/test/test_interpreters/utils.py
index 5ade676..d921794 100644
--- a/Lib/test/test_interpreters/utils.py
+++ b/Lib/test/test_interpreters/utils.py
@@ -1,30 +1,344 @@
+from collections import namedtuple
import contextlib
+import json
+import io
import os
import os.path
+import pickle
+import queue
+#import select
import subprocess
import sys
import tempfile
-from textwrap import dedent
+from textwrap import dedent, indent
import threading
import types
import unittest
+import warnings
from test import support
from test.support import os_helper
+from test.support import import_helper
+_interpreters = import_helper.import_module('_xxsubinterpreters')
from test.support import interpreters
-def _captured_script(script):
- r, w = os.pipe()
- indented = script.replace('\n', '\n ')
- wrapped = dedent(f"""
- import contextlib
- with open({w}, 'w', encoding='utf-8') as spipe:
- with contextlib.redirect_stdout(spipe):
+try:
+ import _testinternalcapi
+ import _testcapi
+except ImportError:
+ _testinternalcapi = None
+ _testcapi = None
+
+def requires_test_modules(func):
+ return unittest.skipIf(_testinternalcapi is None, "test requires _testinternalcapi module")(func)
+
+
+def _dump_script(text):
+ lines = text.splitlines()
+ print()
+ print('-' * 20)
+ for i, line in enumerate(lines, 1):
+ print(f' {i:>{len(str(len(lines)))}} {line}')
+ print('-' * 20)
+
+
+def _close_file(file):
+ try:
+ if hasattr(file, 'close'):
+ file.close()
+ else:
+ os.close(file)
+ except OSError as exc:
+ if exc.errno != 9:
+ raise # re-raise
+ # It was closed already.
+
+
+def pack_exception(exc=None):
+ captured = _interpreters.capture_exception(exc)
+ data = dict(captured.__dict__)
+ data['type'] = dict(captured.type.__dict__)
+ return json.dumps(data)
+
+
+def unpack_exception(packed):
+ try:
+ data = json.loads(packed)
+ except json.decoder.JSONDecodeError:
+ warnings.warn('incomplete exception data', RuntimeWarning)
+ print(packed if isinstance(packed, str) else packed.decode('utf-8'))
+ return None
+ exc = types.SimpleNamespace(**data)
+ exc.type = types.SimpleNamespace(**exc.type)
+ return exc;
+
+
+class CapturingResults:
+
+ STDIO = dedent("""\
+ with open({w_pipe}, 'wb', buffering=0) as _spipe_{stream}:
+ _captured_std{stream} = io.StringIO()
+ with contextlib.redirect_std{stream}(_captured_std{stream}):
+ #########################
+ # begin wrapped script
+
+ {indented}
+
+ # end wrapped script
+ #########################
+ text = _captured_std{stream}.getvalue()
+ _spipe_{stream}.write(text.encode('utf-8'))
+ """)[:-1]
+ EXC = dedent("""\
+ with open({w_pipe}, 'wb', buffering=0) as _spipe_exc:
+ try:
+ #########################
+ # begin wrapped script
+
{indented}
- """)
- return wrapped, open(r, encoding='utf-8')
+
+ # end wrapped script
+ #########################
+ except Exception as exc:
+ text = _interp_utils.pack_exception(exc)
+ _spipe_exc.write(text.encode('utf-8'))
+ """)[:-1]
+
+ @classmethod
+ def wrap_script(cls, script, *, stdout=True, stderr=False, exc=False):
+ script = dedent(script).strip(os.linesep)
+ imports = [
+ f'import {__name__} as _interp_utils',
+ ]
+ wrapped = script
+
+ # Handle exc.
+ if exc:
+ exc = os.pipe()
+ r_exc, w_exc = exc
+ indented = wrapped.replace('\n', '\n ')
+ wrapped = cls.EXC.format(
+ w_pipe=w_exc,
+ indented=indented,
+ )
+ else:
+ exc = None
+
+ # Handle stdout.
+ if stdout:
+ imports.extend([
+ 'import contextlib, io',
+ ])
+ stdout = os.pipe()
+ r_out, w_out = stdout
+ indented = wrapped.replace('\n', '\n ')
+ wrapped = cls.STDIO.format(
+ w_pipe=w_out,
+ indented=indented,
+ stream='out',
+ )
+ else:
+ stdout = None
+
+ # Handle stderr.
+ if stderr == 'stdout':
+ stderr = None
+ elif stderr:
+ if not stdout:
+ imports.extend([
+ 'import contextlib, io',
+ ])
+ stderr = os.pipe()
+ r_err, w_err = stderr
+ indented = wrapped.replace('\n', '\n ')
+ wrapped = cls.STDIO.format(
+ w_pipe=w_err,
+ indented=indented,
+ stream='err',
+ )
+ else:
+ stderr = None
+
+ if wrapped == script:
+ raise NotImplementedError
+ else:
+ for line in imports:
+ wrapped = f'{line}{os.linesep}{wrapped}'
+
+ results = cls(stdout, stderr, exc)
+ return wrapped, results
+
+ def __init__(self, out, err, exc):
+ self._rf_out = None
+ self._rf_err = None
+ self._rf_exc = None
+ self._w_out = None
+ self._w_err = None
+ self._w_exc = None
+
+ if out is not None:
+ r_out, w_out = out
+ self._rf_out = open(r_out, 'rb', buffering=0)
+ self._w_out = w_out
+
+ if err is not None:
+ r_err, w_err = err
+ self._rf_err = open(r_err, 'rb', buffering=0)
+ self._w_err = w_err
+
+ if exc is not None:
+ r_exc, w_exc = exc
+ self._rf_exc = open(r_exc, 'rb', buffering=0)
+ self._w_exc = w_exc
+
+ self._buf_out = b''
+ self._buf_err = b''
+ self._buf_exc = b''
+ self._exc = None
+
+ self._closed = False
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+ @property
+ def closed(self):
+ return self._closed
+
+ def close(self):
+ if self._closed:
+ return
+ self._closed = True
+
+ if self._w_out is not None:
+ _close_file(self._w_out)
+ self._w_out = None
+ if self._w_err is not None:
+ _close_file(self._w_err)
+ self._w_err = None
+ if self._w_exc is not None:
+ _close_file(self._w_exc)
+ self._w_exc = None
+
+ self._capture()
+
+ if self._rf_out is not None:
+ _close_file(self._rf_out)
+ self._rf_out = None
+ if self._rf_err is not None:
+ _close_file(self._rf_err)
+ self._rf_err = None
+ if self._rf_exc is not None:
+ _close_file(self._rf_exc)
+ self._rf_exc = None
+
+ def _capture(self):
+ # Ideally this is called only after the script finishes
+ # (and thus has closed the write end of the pipe.
+ if self._rf_out is not None:
+ chunk = self._rf_out.read(100)
+ while chunk:
+ self._buf_out += chunk
+ chunk = self._rf_out.read(100)
+ if self._rf_err is not None:
+ chunk = self._rf_err.read(100)
+ while chunk:
+ self._buf_err += chunk
+ chunk = self._rf_err.read(100)
+ if self._rf_exc is not None:
+ chunk = self._rf_exc.read(100)
+ while chunk:
+ self._buf_exc += chunk
+ chunk = self._rf_exc.read(100)
+
+ def _unpack_stdout(self):
+ return self._buf_out.decode('utf-8')
+
+ def _unpack_stderr(self):
+ return self._buf_err.decode('utf-8')
+
+ def _unpack_exc(self):
+ if self._exc is not None:
+ return self._exc
+ if not self._buf_exc:
+ return None
+ self._exc = unpack_exception(self._buf_exc)
+ return self._exc
+
+ def stdout(self):
+ if self.closed:
+ return self.final().stdout
+ self._capture()
+ return self._unpack_stdout()
+
+ def stderr(self):
+ if self.closed:
+ return self.final().stderr
+ self._capture()
+ return self._unpack_stderr()
+
+ def exc(self):
+ if self.closed:
+ return self.final().exc
+ self._capture()
+ return self._unpack_exc()
+
+ def final(self, *, force=False):
+ try:
+ return self._final
+ except AttributeError:
+ if not self._closed:
+ if not force:
+ raise Exception('no final results available yet')
+ else:
+ return CapturedResults.Proxy(self)
+ self._final = CapturedResults(
+ self._unpack_stdout(),
+ self._unpack_stderr(),
+ self._unpack_exc(),
+ )
+ return self._final
+
+
+class CapturedResults(namedtuple('CapturedResults', 'stdout stderr exc')):
+
+ class Proxy:
+ def __init__(self, capturing):
+ self._capturing = capturing
+ def _finish(self):
+ if self._capturing is None:
+ return
+ self._final = self._capturing.final()
+ self._capturing = None
+ def __iter__(self):
+ self._finish()
+ yield from self._final
+ def __len__(self):
+ self._finish()
+ return len(self._final)
+ def __getattr__(self, name):
+ self._finish()
+ if name.startswith('_'):
+ raise AttributeError(name)
+ return getattr(self._final, name)
+
+ def raise_if_failed(self):
+ if self.exc is not None:
+ raise interpreters.ExecutionFailed(self.exc)
+
+
+def _captured_script(script, *, stdout=True, stderr=False, exc=False):
+ return CapturingResults.wrap_script(
+ script,
+ stdout=stdout,
+ stderr=stderr,
+ exc=exc,
+ )
def clean_up_interpreters():
@@ -33,17 +347,17 @@ def clean_up_interpreters():
continue
try:
interp.close()
- except RuntimeError:
+ except _interpreters.InterpreterError:
pass # already destroyed
def _run_output(interp, request, init=None):
- script, rpipe = _captured_script(request)
- with rpipe:
+ script, results = _captured_script(request)
+ with results:
if init:
interp.prepare_main(init)
interp.exec(script)
- return rpipe.read()
+ return results.stdout()
@contextlib.contextmanager
@@ -175,3 +489,184 @@ class TestBase(unittest.TestCase):
diff = f'namespace({diff})'
standardMsg = self._truncateMessage(standardMsg, diff)
self.fail(self._formatMessage(msg, standardMsg))
+
+ def _run_string(self, interp, script):
+ wrapped, results = _captured_script(script, exc=False)
+ #_dump_script(wrapped)
+ with results:
+ if isinstance(interp, interpreters.Interpreter):
+ interp.exec(script)
+ else:
+ err = _interpreters.run_string(interp, wrapped)
+ if err is not None:
+ return None, err
+ return results.stdout(), None
+
+ def run_and_capture(self, interp, script):
+ text, err = self._run_string(interp, script)
+ if err is not None:
+ raise interpreters.ExecutionFailed(err)
+ else:
+ return text
+
+ @requires_test_modules
+ @contextlib.contextmanager
+ def interpreter_from_capi(self, config=None, whence=None):
+ if config is False:
+ if whence is None:
+ whence = _interpreters.WHENCE_LEGACY_CAPI
+ else:
+ assert whence in (_interpreters.WHENCE_LEGACY_CAPI,
+ _interpreters.WHENCE_UNKNOWN), repr(whence)
+ config = None
+ elif config is True:
+ config = _interpreters.new_config('default')
+ elif config is None:
+ if whence not in (
+ _interpreters.WHENCE_LEGACY_CAPI,
+ _interpreters.WHENCE_UNKNOWN,
+ ):
+ config = _interpreters.new_config('legacy')
+ elif isinstance(config, str):
+ config = _interpreters.new_config(config)
+
+ if whence is None:
+ whence = _interpreters.WHENCE_XI
+
+ interpid = _testinternalcapi.create_interpreter(config, whence=whence)
+ try:
+ yield interpid
+ finally:
+ try:
+ _testinternalcapi.destroy_interpreter(interpid)
+ except _interpreters.InterpreterNotFoundError:
+ pass
+
+ @contextlib.contextmanager
+ def interpreter_obj_from_capi(self, config='legacy'):
+ with self.interpreter_from_capi(config) as interpid:
+ yield interpreters.Interpreter(interpid), interpid
+
+ @contextlib.contextmanager
+ def capturing(self, script):
+ wrapped, capturing = _captured_script(script, stdout=True, exc=True)
+ #_dump_script(wrapped)
+ with capturing:
+ yield wrapped, capturing.final(force=True)
+
+ @requires_test_modules
+ def run_from_capi(self, interpid, script, *, main=False):
+ with self.capturing(script) as (wrapped, results):
+ rc = _testinternalcapi.exec_interpreter(interpid, wrapped, main=main)
+ assert rc == 0, rc
+ results.raise_if_failed()
+ return results.stdout
+
+ @contextlib.contextmanager
+ def _running(self, run_interp, exec_interp):
+ token = b'\0'
+ r_in, w_in = self.pipe()
+ r_out, w_out = self.pipe()
+
+ def close():
+ _close_file(r_in)
+ _close_file(w_in)
+ _close_file(r_out)
+ _close_file(w_out)
+
+ # Start running (and wait).
+ script = dedent(f"""
+ import os
+ try:
+ # handshake
+ token = os.read({r_in}, 1)
+ os.write({w_out}, token)
+ # Wait for the "done" message.
+ os.read({r_in}, 1)
+ except BrokenPipeError:
+ pass
+ except OSError as exc:
+ if exc.errno != 9:
+ raise # re-raise
+ # It was closed already.
+ """)
+ failed = None
+ def run():
+ nonlocal failed
+ try:
+ run_interp(script)
+ except Exception as exc:
+ failed = exc
+ close()
+ t = threading.Thread(target=run)
+ t.start()
+
+ # handshake
+ try:
+ os.write(w_in, token)
+ token2 = os.read(r_out, 1)
+ assert token2 == token, (token2, token)
+ except OSError:
+ t.join()
+ if failed is not None:
+ raise failed
+
+ # CM __exit__()
+ try:
+ try:
+ yield
+ finally:
+ # Send "done".
+ os.write(w_in, b'\0')
+ finally:
+ close()
+ t.join()
+ if failed is not None:
+ raise failed
+
+ @contextlib.contextmanager
+ def running(self, interp):
+ if isinstance(interp, int):
+ interpid = interp
+ def exec_interp(script):
+ exc = _interpreters.exec(interpid, script)
+ assert exc is None, exc
+ run_interp = exec_interp
+ else:
+ def run_interp(script):
+ text = self.run_and_capture(interp, script)
+ assert text == '', repr(text)
+ def exec_interp(script):
+ interp.exec(script)
+ with self._running(run_interp, exec_interp):
+ yield
+
+ @requires_test_modules
+ @contextlib.contextmanager
+ def running_from_capi(self, interpid, *, main=False):
+ def run_interp(script):
+ text = self.run_from_capi(interpid, script, main=main)
+ assert text == '', repr(text)
+ def exec_interp(script):
+ rc = _testinternalcapi.exec_interpreter(interpid, script)
+ assert rc == 0, rc
+ with self._running(run_interp, exec_interp):
+ yield
+
+ @requires_test_modules
+ def run_temp_from_capi(self, script, config='legacy'):
+ if config is False:
+ # Force using Py_NewInterpreter().
+ run_in_interp = (lambda s, c: _testcapi.run_in_subinterp(s))
+ config = None
+ else:
+ run_in_interp = _testinternalcapi.run_in_subinterp_with_config
+ if config is True:
+ config = 'default'
+ if isinstance(config, str):
+ config = _interpreters.new_config(config)
+ with self.capturing(script) as (wrapped, results):
+ rc = run_in_interp(wrapped, config)
+ assert rc == 0, rc
+ results.raise_if_failed()
+ return results.stdout