summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-10-12 21:24:12 (GMT)
committerGitHub <noreply@github.com>2023-10-12 21:24:12 (GMT)
commit80f958529bc5c466add532df4c8dd314995f689f (patch)
treef57fccb54062c882ba1b389cf14b2135b76f6401
parent4b7a12db5488755fa5c2f2f879332205d799fe32 (diff)
downloadcpython-80f958529bc5c466add532df4c8dd314995f689f.zip
cpython-80f958529bc5c466add532df4c8dd314995f689f.tar.gz
cpython-80f958529bc5c466add532df4c8dd314995f689f.tar.bz2
[3.12] gh-110756: Sync regrtest with main branch (#110779)
gh-110756: Sync regrtest with main branch * Remove runtest.py and runtest_mp.py of Lib/test/libregrtest/. * Backport support._parse_memlimit().
-rw-r--r--Lib/test/libregrtest/runtest.py577
-rw-r--r--Lib/test/libregrtest/runtest_mp.py631
-rw-r--r--Lib/test/support/__init__.py40
-rw-r--r--Lib/test/test_support.py41
4 files changed, 61 insertions, 1228 deletions
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
deleted file mode 100644
index 8b7844c..0000000
--- a/Lib/test/libregrtest/runtest.py
+++ /dev/null
@@ -1,577 +0,0 @@
-import dataclasses
-import doctest
-import faulthandler
-import gc
-import importlib
-import io
-import os
-import sys
-import time
-import traceback
-import unittest
-
-from test import support
-from test.support import TestStats
-from test.support import os_helper
-from test.support import threading_helper
-from test.libregrtest.cmdline import Namespace
-from test.libregrtest.save_env import saved_test_environment
-from test.libregrtest.utils import clear_caches, format_duration, print_warning
-
-
-MatchTests = list[str]
-MatchTestsDict = dict[str, MatchTests]
-
-
-# Avoid enum.Enum to reduce the number of imports when tests are run
-class State:
- PASSED = "PASSED"
- FAILED = "FAILED"
- SKIPPED = "SKIPPED"
- UNCAUGHT_EXC = "UNCAUGHT_EXC"
- REFLEAK = "REFLEAK"
- ENV_CHANGED = "ENV_CHANGED"
- RESOURCE_DENIED = "RESOURCE_DENIED"
- INTERRUPTED = "INTERRUPTED"
- MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
- DID_NOT_RUN = "DID_NOT_RUN"
- TIMEOUT = "TIMEOUT"
-
- @staticmethod
- def is_failed(state):
- return state in {
- State.FAILED,
- State.UNCAUGHT_EXC,
- State.REFLEAK,
- State.MULTIPROCESSING_ERROR,
- State.TIMEOUT}
-
- @staticmethod
- def has_meaningful_duration(state):
- # Consider that the duration is meaningless for these cases.
- # For example, if a whole test file is skipped, its duration
- # is unlikely to be the duration of executing its tests,
- # but just the duration to execute code which skips the test.
- return state not in {
- State.SKIPPED,
- State.RESOURCE_DENIED,
- State.INTERRUPTED,
- State.MULTIPROCESSING_ERROR,
- State.DID_NOT_RUN}
-
- @staticmethod
- def must_stop(state):
- return state in {
- State.INTERRUPTED,
- State.MULTIPROCESSING_ERROR}
-
-
-# gh-90681: When rerunning tests, we might need to rerun the whole
-# class or module suite if some its life-cycle hooks fail.
-# Test level hooks are not affected.
-_TEST_LIFECYCLE_HOOKS = frozenset((
- 'setUpClass', 'tearDownClass',
- 'setUpModule', 'tearDownModule',
-))
-
-def normalize_test_name(test_full_name, *, is_error=False):
- short_name = test_full_name.split(" ")[0]
- if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
- if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
- # if setUpModule() or tearDownModule() failed, don't filter
- # tests with the test file name, don't use use filters.
- return None
-
- # This means that we have a failure in a life-cycle hook,
- # we need to rerun the whole module or class suite.
- # Basically the error looks like this:
- # ERROR: setUpClass (test.test_reg_ex.RegTest)
- # or
- # ERROR: setUpModule (test.test_reg_ex)
- # So, we need to parse the class / module name.
- lpar = test_full_name.index('(')
- rpar = test_full_name.index(')')
- return test_full_name[lpar + 1: rpar].split('.')[-1]
- return short_name
-
-
-@dataclasses.dataclass(slots=True)
-class TestResult:
- test_name: str
- state: str | None = None
- # Test duration in seconds
- duration: float | None = None
- xml_data: list[str] | None = None
- stats: TestStats | None = None
-
- # errors and failures copied from support.TestFailedWithDetails
- errors: list[tuple[str, str]] | None = None
- failures: list[tuple[str, str]] | None = None
-
- def is_failed(self, fail_env_changed: bool) -> bool:
- if self.state == State.ENV_CHANGED:
- return fail_env_changed
- return State.is_failed(self.state)
-
- def _format_failed(self):
- if self.errors and self.failures:
- le = len(self.errors)
- lf = len(self.failures)
- error_s = "error" + ("s" if le > 1 else "")
- failure_s = "failure" + ("s" if lf > 1 else "")
- return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
-
- if self.errors:
- le = len(self.errors)
- error_s = "error" + ("s" if le > 1 else "")
- return f"{self.test_name} failed ({le} {error_s})"
-
- if self.failures:
- lf = len(self.failures)
- failure_s = "failure" + ("s" if lf > 1 else "")
- return f"{self.test_name} failed ({lf} {failure_s})"
-
- return f"{self.test_name} failed"
-
- def __str__(self) -> str:
- match self.state:
- case State.PASSED:
- return f"{self.test_name} passed"
- case State.FAILED:
- return self._format_failed()
- case State.SKIPPED:
- return f"{self.test_name} skipped"
- case State.UNCAUGHT_EXC:
- return f"{self.test_name} failed (uncaught exception)"
- case State.REFLEAK:
- return f"{self.test_name} failed (reference leak)"
- case State.ENV_CHANGED:
- return f"{self.test_name} failed (env changed)"
- case State.RESOURCE_DENIED:
- return f"{self.test_name} skipped (resource denied)"
- case State.INTERRUPTED:
- return f"{self.test_name} interrupted"
- case State.MULTIPROCESSING_ERROR:
- return f"{self.test_name} process crashed"
- case State.DID_NOT_RUN:
- return f"{self.test_name} ran no tests"
- case State.TIMEOUT:
- return f"{self.test_name} timed out ({format_duration(self.duration)})"
- case _:
- raise ValueError("unknown result state: {state!r}")
-
- def has_meaningful_duration(self):
- return State.has_meaningful_duration(self.state)
-
- def set_env_changed(self):
- if self.state is None or self.state == State.PASSED:
- self.state = State.ENV_CHANGED
-
- def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
- if State.must_stop(self.state):
- return True
- if fail_fast and self.is_failed(fail_env_changed):
- return True
- return False
-
- def get_rerun_match_tests(self):
- match_tests = []
-
- errors = self.errors or []
- failures = self.failures or []
- for error_list, is_error in (
- (errors, True),
- (failures, False),
- ):
- for full_name, *_ in error_list:
- match_name = normalize_test_name(full_name, is_error=is_error)
- if match_name is None:
- # 'setUpModule (test.test_sys)': don't filter tests
- return None
- if not match_name:
- error_type = "ERROR" if is_error else "FAIL"
- print_warning(f"rerun failed to parse {error_type} test name: "
- f"{full_name!r}: don't filter tests")
- return None
- match_tests.append(match_name)
-
- return match_tests
-
-
-@dataclasses.dataclass(slots=True, frozen=True)
-class RunTests:
- tests: list[str]
- match_tests: MatchTestsDict | None = None
- rerun: bool = False
- forever: bool = False
-
- def get_match_tests(self, test_name) -> MatchTests | None:
- if self.match_tests is not None:
- return self.match_tests.get(test_name, None)
- else:
- return None
-
- def iter_tests(self):
- tests = tuple(self.tests)
- if self.forever:
- while True:
- yield from tests
- else:
- yield from tests
-
-
-# Minimum duration of a test to display its duration or to mention that
-# the test is running in background
-PROGRESS_MIN_TIME = 30.0 # seconds
-
-#If these test directories are encountered recurse into them and treat each
-# test_ .py or dir as a separate test module. This can increase parallelism.
-# Beware this can't generally be done for any directory with sub-tests as the
-# __init__.py may do things which alter what tests are to be run.
-
-SPLITTESTDIRS = {
- "test_asyncio",
- "test_concurrent_futures",
- "test_future_stmt",
- "test_gdb",
- "test_multiprocessing_fork",
- "test_multiprocessing_forkserver",
- "test_multiprocessing_spawn",
-}
-
-
-def findtestdir(path=None):
- return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
-
-
-def findtests(*, testdir=None, exclude=(),
- split_test_dirs=SPLITTESTDIRS, base_mod=""):
- """Return a list of all applicable test modules."""
- testdir = findtestdir(testdir)
- tests = []
- for name in os.listdir(testdir):
- mod, ext = os.path.splitext(name)
- if (not mod.startswith("test_")) or (mod in exclude):
- continue
- if mod in split_test_dirs:
- subdir = os.path.join(testdir, mod)
- mod = f"{base_mod or 'test'}.{mod}"
- tests.extend(findtests(testdir=subdir, exclude=exclude,
- split_test_dirs=split_test_dirs, base_mod=mod))
- elif ext in (".py", ""):
- tests.append(f"{base_mod}.{mod}" if base_mod else mod)
- return sorted(tests)
-
-
-def split_test_packages(tests, *, testdir=None, exclude=(),
- split_test_dirs=SPLITTESTDIRS):
- testdir = findtestdir(testdir)
- splitted = []
- for name in tests:
- if name in split_test_dirs:
- subdir = os.path.join(testdir, name)
- splitted.extend(findtests(testdir=subdir, exclude=exclude,
- split_test_dirs=split_test_dirs,
- base_mod=name))
- else:
- splitted.append(name)
- return splitted
-
-
-def abs_module_name(test_name: str, test_dir: str | None) -> str:
- if test_name.startswith('test.') or test_dir:
- return test_name
- else:
- # Import it from the test package
- return 'test.' + test_name
-
-
-def setup_support(ns: Namespace):
- support.PGO = ns.pgo
- support.PGO_EXTENDED = ns.pgo_extended
- support.set_match_tests(ns.match_tests, ns.ignore_tests)
- support.failfast = ns.failfast
- support.verbose = ns.verbose
- if ns.xmlpath:
- support.junit_xml_list = []
- else:
- support.junit_xml_list = None
-
-
-def _runtest(result: TestResult, ns: Namespace) -> None:
- # Capture stdout and stderr, set faulthandler timeout,
- # and create JUnit XML report.
- verbose = ns.verbose
- output_on_failure = ns.verbose3
- timeout = ns.timeout
-
- use_timeout = (
- timeout is not None and threading_helper.can_start_thread
- )
- if use_timeout:
- faulthandler.dump_traceback_later(timeout, exit=True)
-
- try:
- setup_support(ns)
-
- if output_on_failure:
- support.verbose = True
-
- stream = io.StringIO()
- orig_stdout = sys.stdout
- orig_stderr = sys.stderr
- print_warning = support.print_warning
- orig_print_warnings_stderr = print_warning.orig_stderr
-
- output = None
- try:
- sys.stdout = stream
- sys.stderr = stream
- # print_warning() writes into the temporary stream to preserve
- # messages order. If support.environment_altered becomes true,
- # warnings will be written to sys.stderr below.
- print_warning.orig_stderr = stream
-
- _runtest_env_changed_exc(result, ns, display_failure=False)
- # Ignore output if the test passed successfully
- if result.state != State.PASSED:
- output = stream.getvalue()
- finally:
- sys.stdout = orig_stdout
- sys.stderr = orig_stderr
- print_warning.orig_stderr = orig_print_warnings_stderr
-
- if output is not None:
- sys.stderr.write(output)
- sys.stderr.flush()
- else:
- # Tell tests to be moderately quiet
- support.verbose = verbose
- _runtest_env_changed_exc(result, ns, display_failure=not verbose)
-
- xml_list = support.junit_xml_list
- if xml_list:
- import xml.etree.ElementTree as ET
- result.xml_data = [ET.tostring(x).decode('us-ascii')
- for x in xml_list]
- finally:
- if use_timeout:
- faulthandler.cancel_dump_traceback_later()
- support.junit_xml_list = None
-
-
-def runtest(ns: Namespace, test_name: str) -> TestResult:
- """Run a single test.
-
- ns -- regrtest namespace of options
- test_name -- the name of the test
-
- Returns a TestResult.
-
- If ns.xmlpath is not None, xml_data is a list containing each
- generated testsuite element.
- """
- start_time = time.perf_counter()
- result = TestResult(test_name)
- try:
- _runtest(result, ns)
- except:
- if not ns.pgo:
- msg = traceback.format_exc()
- print(f"test {test_name} crashed -- {msg}",
- file=sys.stderr, flush=True)
- result.state = State.UNCAUGHT_EXC
- result.duration = time.perf_counter() - start_time
- return result
-
-
-def run_unittest(test_mod):
- loader = unittest.TestLoader()
- tests = loader.loadTestsFromModule(test_mod)
- for error in loader.errors:
- print(error, file=sys.stderr)
- if loader.errors:
- raise Exception("errors while loading tests")
- return support.run_unittest(tests)
-
-
-def save_env(ns: Namespace, test_name: str):
- return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)
-
-
-def regrtest_runner(result, test_func, ns) -> None:
- # Run test_func(), collect statistics, and detect reference and memory
- # leaks.
- if ns.huntrleaks:
- from test.libregrtest.refleak import dash_R
- refleak, test_result = dash_R(ns, result.test_name, test_func)
- else:
- test_result = test_func()
- refleak = False
-
- if refleak:
- result.state = State.REFLEAK
-
- match test_result:
- case TestStats():
- stats = test_result
- case unittest.TestResult():
- stats = TestStats.from_unittest(test_result)
- case doctest.TestResults():
- stats = TestStats.from_doctest(test_result)
- case None:
- print_warning(f"{result.test_name} test runner returned None: {test_func}")
- stats = None
- case _:
- print_warning(f"Unknown test result type: {type(test_result)}")
- stats = None
-
- result.stats = stats
-
-
-# Storage of uncollectable objects
-FOUND_GARBAGE = []
-
-
-def _load_run_test(result: TestResult, ns: Namespace) -> None:
- # Load the test function, run the test function.
- module_name = abs_module_name(result.test_name, ns.testdir)
-
- # Remove the module from sys.module to reload it if it was already imported
- sys.modules.pop(module_name, None)
-
- test_mod = importlib.import_module(module_name)
-
- if hasattr(test_mod, "test_main"):
- # https://github.com/python/cpython/issues/89392
- raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest")
- def test_func():
- return run_unittest(test_mod)
-
- try:
- with save_env(ns, result.test_name):
- regrtest_runner(result, test_func, ns)
- finally:
- # First kill any dangling references to open files etc.
- # This can also issue some ResourceWarnings which would otherwise get
- # triggered during the following test run, and possibly produce
- # failures.
- support.gc_collect()
-
- remove_testfn(result.test_name, ns.verbose)
-
- if gc.garbage:
- support.environment_altered = True
- print_warning(f"{result.test_name} created {len(gc.garbage)} "
- f"uncollectable object(s)")
-
- # move the uncollectable objects somewhere,
- # so we don't see them again
- FOUND_GARBAGE.extend(gc.garbage)
- gc.garbage.clear()
-
- support.reap_children()
-
-
-def _runtest_env_changed_exc(result: TestResult, ns: Namespace,
- display_failure: bool = True) -> None:
- # Detect environment changes, handle exceptions.
-
- # Reset the environment_altered flag to detect if a test altered
- # the environment
- support.environment_altered = False
-
- if ns.pgo:
- display_failure = False
-
- test_name = result.test_name
- try:
- clear_caches()
- support.gc_collect()
-
- with save_env(ns, test_name):
- _load_run_test(result, ns)
- except support.ResourceDenied as msg:
- if not ns.quiet and not ns.pgo:
- print(f"{test_name} skipped -- {msg}", flush=True)
- result.state = State.RESOURCE_DENIED
- return
- except unittest.SkipTest as msg:
- if not ns.quiet and not ns.pgo:
- print(f"{test_name} skipped -- {msg}", flush=True)
- result.state = State.SKIPPED
- return
- except support.TestFailedWithDetails as exc:
- msg = f"test {test_name} failed"
- if display_failure:
- msg = f"{msg} -- {exc}"
- print(msg, file=sys.stderr, flush=True)
- result.state = State.FAILED
- result.errors = exc.errors
- result.failures = exc.failures
- result.stats = exc.stats
- return
- except support.TestFailed as exc:
- msg = f"test {test_name} failed"
- if display_failure:
- msg = f"{msg} -- {exc}"
- print(msg, file=sys.stderr, flush=True)
- result.state = State.FAILED
- result.stats = exc.stats
- return
- except support.TestDidNotRun:
- result.state = State.DID_NOT_RUN
- return
- except KeyboardInterrupt:
- print()
- result.state = State.INTERRUPTED
- return
- except:
- if not ns.pgo:
- msg = traceback.format_exc()
- print(f"test {test_name} crashed -- {msg}",
- file=sys.stderr, flush=True)
- result.state = State.UNCAUGHT_EXC
- return
-
- if support.environment_altered:
- result.set_env_changed()
- # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
- if result.state is None:
- result.state = State.PASSED
-
-
-def remove_testfn(test_name: str, verbose: int) -> None:
- # Try to clean up os_helper.TESTFN if left behind.
- #
- # While tests shouldn't leave any files or directories behind, when a test
- # fails that can be tedious for it to arrange. The consequences can be
- # especially nasty on Windows, since if a test leaves a file open, it
- # cannot be deleted by name (while there's nothing we can do about that
- # here either, we can display the name of the offending test, which is a
- # real help).
- name = os_helper.TESTFN
- if not os.path.exists(name):
- return
-
- if os.path.isdir(name):
- import shutil
- kind, nuker = "directory", shutil.rmtree
- elif os.path.isfile(name):
- kind, nuker = "file", os.unlink
- else:
- raise RuntimeError(f"os.path says {name!r} exists but is neither "
- f"directory nor file")
-
- if verbose:
- print_warning(f"{test_name} left behind {kind} {name!r}")
- support.environment_altered = True
-
- try:
- import stat
- # fix possible permissions problems that might prevent cleanup
- os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
- nuker(name)
- except Exception as exc:
- print_warning(f"{test_name} left behind {kind} {name!r} "
- f"and it couldn't be removed: {exc}")
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
deleted file mode 100644
index 6008955..0000000
--- a/Lib/test/libregrtest/runtest_mp.py
+++ /dev/null
@@ -1,631 +0,0 @@
-import dataclasses
-import faulthandler
-import json
-import os.path
-import queue
-import signal
-import subprocess
-import sys
-import tempfile
-import threading
-import time
-import traceback
-from typing import NamedTuple, NoReturn, Literal, Any, TextIO
-
-from test import support
-from test.support import os_helper
-from test.support import TestStats
-
-from test.libregrtest.cmdline import Namespace
-from test.libregrtest.main import Regrtest
-from test.libregrtest.runtest import (
- runtest, TestResult, State, PROGRESS_MIN_TIME,
- MatchTests, RunTests)
-from test.libregrtest.setup import setup_tests
-from test.libregrtest.utils import format_duration, print_warning
-
-if sys.platform == 'win32':
- import locale
-
-
-# Display the running tests if nothing happened last N seconds
-PROGRESS_UPDATE = 30.0 # seconds
-assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
-
-# Kill the main process after 5 minutes. It is supposed to write an update
-# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
-# buildbot workers.
-MAIN_PROCESS_TIMEOUT = 5 * 60.0
-assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
-
-# Time to wait until a worker completes: should be immediate
-JOIN_TIMEOUT = 30.0 # seconds
-
-USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
-
-
-@dataclasses.dataclass(slots=True)
-class WorkerJob:
- test_name: str
- namespace: Namespace
- rerun: bool = False
- match_tests: MatchTests | None = None
-
-
-class _EncodeWorkerJob(json.JSONEncoder):
- def default(self, o: Any) -> dict[str, Any]:
- match o:
- case WorkerJob():
- result = dataclasses.asdict(o)
- result["__worker_job__"] = True
- return result
- case Namespace():
- result = vars(o)
- result["__namespace__"] = True
- return result
- case _:
- return super().default(o)
-
-
-def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]:
- if "__worker_job__" in d:
- d.pop('__worker_job__')
- return WorkerJob(**d)
- if "__namespace__" in d:
- d.pop('__namespace__')
- return Namespace(**d)
- else:
- return d
-
-
-def _parse_worker_args(worker_json: str) -> tuple[Namespace, str]:
- return json.loads(worker_json,
- object_hook=_decode_worker_job)
-
-
-def run_test_in_subprocess(worker_job: WorkerJob,
- output_file: TextIO,
- tmp_dir: str | None = None) -> subprocess.Popen:
- ns = worker_job.namespace
- python = ns.python
- worker_args = json.dumps(worker_job, cls=_EncodeWorkerJob)
-
- if python is not None:
- executable = python
- else:
- executable = [sys.executable]
- cmd = [*executable, *support.args_from_interpreter_flags(),
- '-u', # Unbuffered stdout and stderr
- '-m', 'test.regrtest',
- '--worker-args', worker_args]
-
- env = dict(os.environ)
- if tmp_dir is not None:
- env['TMPDIR'] = tmp_dir
- env['TEMP'] = tmp_dir
- env['TMP'] = tmp_dir
-
- # Running the child from the same working directory as regrtest's original
- # invocation ensures that TEMPDIR for the child is the same when
- # sysconfig.is_python_build() is true. See issue 15300.
- kw = dict(
- env=env,
- stdout=output_file,
- # bpo-45410: Write stderr into stdout to keep messages order
- stderr=output_file,
- text=True,
- close_fds=(os.name != 'nt'),
- cwd=os_helper.SAVEDCWD,
- )
- if USE_PROCESS_GROUP:
- kw['start_new_session'] = True
- return subprocess.Popen(cmd, **kw)
-
-
-def run_tests_worker(worker_json: str) -> NoReturn:
- worker_job = _parse_worker_args(worker_json)
- ns = worker_job.namespace
- test_name = worker_job.test_name
- rerun = worker_job.rerun
- match_tests = worker_job.match_tests
-
- setup_tests(ns)
-
- if rerun:
- if match_tests:
- matching = "matching: " + ", ".join(match_tests)
- print(f"Re-running {test_name} in verbose mode ({matching})", flush=True)
- else:
- print(f"Re-running {test_name} in verbose mode", flush=True)
- ns.verbose = True
-
- if match_tests is not None:
- ns.match_tests = match_tests
-
- result = runtest(ns, test_name)
- print() # Force a newline (just in case)
-
- # Serialize TestResult as dict in JSON
- print(json.dumps(result, cls=EncodeTestResult), flush=True)
- sys.exit(0)
-
-
-# We do not use a generator so multiple threads can call next().
-class MultiprocessIterator:
-
- """A thread-safe iterator over tests for multiprocess mode."""
-
- def __init__(self, tests_iter):
- self.lock = threading.Lock()
- self.tests_iter = tests_iter
-
- def __iter__(self):
- return self
-
- def __next__(self):
- with self.lock:
- if self.tests_iter is None:
- raise StopIteration
- return next(self.tests_iter)
-
- def stop(self):
- with self.lock:
- self.tests_iter = None
-
-
-class MultiprocessResult(NamedTuple):
- result: TestResult
- # bpo-45410: stderr is written into stdout to keep messages order
- worker_stdout: str | None = None
- err_msg: str | None = None
-
-
-ExcStr = str
-QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
-
-
-class ExitThread(Exception):
- pass
-
-
-class TestWorkerProcess(threading.Thread):
- def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
- super().__init__()
- self.worker_id = worker_id
- self.runtests = runner.runtests
- self.pending = runner.pending
- self.output = runner.output
- self.ns = runner.ns
- self.timeout = runner.worker_timeout
- self.regrtest = runner.regrtest
- self.rerun = runner.rerun
- self.current_test_name = None
- self.start_time = None
- self._popen = None
- self._killed = False
- self._stopped = False
-
- def __repr__(self) -> str:
- info = [f'TestWorkerProcess #{self.worker_id}']
- if self.is_alive():
- info.append("running")
- else:
- info.append('stopped')
- test = self.current_test_name
- if test:
- info.append(f'test={test}')
- popen = self._popen
- if popen is not None:
- dt = time.monotonic() - self.start_time
- info.extend((f'pid={self._popen.pid}',
- f'time={format_duration(dt)}'))
- return '<%s>' % ' '.join(info)
-
- def _kill(self) -> None:
- popen = self._popen
- if popen is None:
- return
-
- if self._killed:
- return
- self._killed = True
-
- if USE_PROCESS_GROUP:
- what = f"{self} process group"
- else:
- what = f"{self}"
-
- print(f"Kill {what}", file=sys.stderr, flush=True)
- try:
- if USE_PROCESS_GROUP:
- os.killpg(popen.pid, signal.SIGKILL)
- else:
- popen.kill()
- except ProcessLookupError:
- # popen.kill(): the process completed, the TestWorkerProcess thread
- # read its exit status, but Popen.send_signal() read the returncode
- # just before Popen.wait() set returncode.
- pass
- except OSError as exc:
- print_warning(f"Failed to kill {what}: {exc!r}")
-
- def stop(self) -> None:
- # Method called from a different thread to stop this thread
- self._stopped = True
- self._kill()
-
- def mp_result_error(
- self,
- test_result: TestResult,
- stdout: str | None = None,
- err_msg=None
- ) -> MultiprocessResult:
- return MultiprocessResult(test_result, stdout, err_msg)
-
- def _run_process(self, worker_job, output_file: TextIO,
- tmp_dir: str | None = None) -> int:
- self.current_test_name = worker_job.test_name
- try:
- popen = run_test_in_subprocess(worker_job, output_file, tmp_dir)
-
- self._killed = False
- self._popen = popen
- except:
- self.current_test_name = None
- raise
-
- try:
- if self._stopped:
- # If kill() has been called before self._popen is set,
- # self._popen is still running. Call again kill()
- # to ensure that the process is killed.
- self._kill()
- raise ExitThread
-
- try:
- # gh-94026: stdout+stderr are written to tempfile
- retcode = popen.wait(timeout=self.timeout)
- assert retcode is not None
- return retcode
- except subprocess.TimeoutExpired:
- if self._stopped:
- # kill() has been called: communicate() fails on reading
- # closed stdout
- raise ExitThread
-
- # On timeout, kill the process
- self._kill()
-
- # None means TIMEOUT for the caller
- retcode = None
- # bpo-38207: Don't attempt to call communicate() again: on it
- # can hang until all child processes using stdout
- # pipes completes.
- except OSError:
- if self._stopped:
- # kill() has been called: communicate() fails
- # on reading closed stdout
- raise ExitThread
- raise
- except:
- self._kill()
- raise
- finally:
- self._wait_completed()
- self._popen = None
- self.current_test_name = None
-
- def _runtest(self, test_name: str) -> MultiprocessResult:
- if sys.platform == 'win32':
- # gh-95027: When stdout is not a TTY, Python uses the ANSI code
- # page for the sys.stdout encoding. If the main process runs in a
- # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding.
- encoding = locale.getencoding()
- else:
- encoding = sys.stdout.encoding
-
- match_tests = self.runtests.get_match_tests(test_name)
-
- # gh-94026: Write stdout+stderr to a tempfile as workaround for
- # non-blocking pipes on Emscripten with NodeJS.
- with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file:
- worker_job = WorkerJob(test_name,
- namespace=self.ns,
- rerun=self.rerun,
- match_tests=match_tests)
- # gh-93353: Check for leaked temporary files in the parent process,
- # since the deletion of temporary files can happen late during
- # Python finalization: too late for libregrtest.
- if not support.is_wasi:
- # Don't check for leaked temporary files and directories if Python is
- # run on WASI. WASI don't pass environment variables like TMPDIR to
- # worker processes.
- tmp_dir = tempfile.mkdtemp(prefix="test_python_")
- tmp_dir = os.path.abspath(tmp_dir)
- try:
- retcode = self._run_process(worker_job, stdout_file, tmp_dir)
- finally:
- tmp_files = os.listdir(tmp_dir)
- os_helper.rmtree(tmp_dir)
- else:
- retcode = self._run_process(worker_job, stdout_file)
- tmp_files = ()
- stdout_file.seek(0)
-
- try:
- stdout = stdout_file.read().strip()
- except Exception as exc:
- # gh-101634: Catch UnicodeDecodeError if stdout cannot be
- # decoded from encoding
- err_msg = f"Cannot read process stdout: {exc}"
- result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
- return self.mp_result_error(result, err_msg=err_msg)
-
- if retcode is None:
- result = TestResult(test_name, state=State.TIMEOUT)
- return self.mp_result_error(result, stdout)
-
- err_msg = None
- if retcode != 0:
- err_msg = "Exit code %s" % retcode
- else:
- stdout, _, worker_json = stdout.rpartition("\n")
- stdout = stdout.rstrip()
- if not worker_json:
- err_msg = "Failed to parse worker stdout"
- else:
- try:
- # deserialize run_tests_worker() output
- result = json.loads(worker_json,
- object_hook=decode_test_result)
- except Exception as exc:
- err_msg = "Failed to parse worker JSON: %s" % exc
-
- if err_msg:
- result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
- return self.mp_result_error(result, stdout, err_msg)
-
- if tmp_files:
- msg = (f'\n\n'
- f'Warning -- {test_name} leaked temporary files '
- f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}')
- stdout += msg
- result.set_env_changed()
-
- return MultiprocessResult(result, stdout)
-
- def run(self) -> None:
- fail_fast = self.ns.failfast
- fail_env_changed = self.ns.fail_env_changed
- while not self._stopped:
- try:
- try:
- test_name = next(self.pending)
- except StopIteration:
- break
-
- self.start_time = time.monotonic()
- mp_result = self._runtest(test_name)
- mp_result.result.duration = time.monotonic() - self.start_time
- self.output.put((False, mp_result))
-
- if mp_result.result.must_stop(fail_fast, fail_env_changed):
- break
- except ExitThread:
- break
- except BaseException:
- self.output.put((True, traceback.format_exc()))
- break
-
- def _wait_completed(self) -> None:
- popen = self._popen
-
- try:
- popen.wait(JOIN_TIMEOUT)
- except (subprocess.TimeoutExpired, OSError) as exc:
- print_warning(f"Failed to wait for {self} completion "
- f"(timeout={format_duration(JOIN_TIMEOUT)}): "
- f"{exc!r}")
-
- def wait_stopped(self, start_time: float) -> None:
- # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
- # which killed the process. Sometimes, killing the process from the
- # main thread does not interrupt popen.communicate() in
- # TestWorkerProcess thread. This loop with a timeout is a workaround
- # for that.
- #
- # Moreover, if this method fails to join the thread, it is likely
- # that Python will hang at exit while calling threading._shutdown()
- # which tries again to join the blocked thread. Regrtest.main()
- # uses EXIT_TIMEOUT to workaround this second bug.
- while True:
- # Write a message every second
- self.join(1.0)
- if not self.is_alive():
- break
- dt = time.monotonic() - start_time
- self.regrtest.log(f"Waiting for {self} thread "
- f"for {format_duration(dt)}")
- if dt > JOIN_TIMEOUT:
- print_warning(f"Failed to join {self} in {format_duration(dt)}")
- break
-
-
-def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
- running = []
- for worker in workers:
- current_test_name = worker.current_test_name
- if not current_test_name:
- continue
- dt = time.monotonic() - worker.start_time
- if dt >= PROGRESS_MIN_TIME:
- text = '%s (%s)' % (current_test_name, format_duration(dt))
- running.append(text)
- return running
-
-
-class MultiprocessTestRunner:
- def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None:
- ns = regrtest.ns
- timeout = ns.timeout
-
- self.regrtest = regrtest
- self.runtests = runtests
- self.rerun = runtests.rerun
- self.log = self.regrtest.log
- self.ns = ns
- self.output: queue.Queue[QueueOutput] = queue.Queue()
- tests_iter = runtests.iter_tests()
- self.pending = MultiprocessIterator(tests_iter)
- if timeout is not None:
- # Rely on faulthandler to kill a worker process. This timouet is
- # when faulthandler fails to kill a worker process. Give a maximum
- # of 5 minutes to faulthandler to kill the worker.
- self.worker_timeout = min(timeout * 1.5, timeout + 5 * 60)
- else:
- self.worker_timeout = None
- self.workers = None
-
- def start_workers(self) -> None:
- use_mp = self.ns.use_mp
- timeout = self.ns.timeout
- self.workers = [TestWorkerProcess(index, self)
- for index in range(1, use_mp + 1)]
- msg = f"Run tests in parallel using {len(self.workers)} child processes"
- if timeout:
- msg += (" (timeout: %s, worker timeout: %s)"
- % (format_duration(timeout),
- format_duration(self.worker_timeout)))
- self.log(msg)
- for worker in self.workers:
- worker.start()
-
- def stop_workers(self) -> None:
- start_time = time.monotonic()
- for worker in self.workers:
- worker.stop()
- for worker in self.workers:
- worker.wait_stopped(start_time)
-
- def _get_result(self) -> QueueOutput | None:
- pgo = self.ns.pgo
- use_faulthandler = (self.ns.timeout is not None)
- timeout = PROGRESS_UPDATE
-
- # bpo-46205: check the status of workers every iteration to avoid
- # waiting forever on an empty queue.
- while any(worker.is_alive() for worker in self.workers):
- if use_faulthandler:
- faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
- exit=True)
-
- # wait for a thread
- try:
- return self.output.get(timeout=timeout)
- except queue.Empty:
- pass
-
- # display progress
- running = get_running(self.workers)
- if running and not pgo:
- self.log('running: %s' % ', '.join(running))
-
- # all worker threads are done: consume pending results
- try:
- return self.output.get(timeout=0)
- except queue.Empty:
- return None
-
- def display_result(self, mp_result: MultiprocessResult) -> None:
- result = mp_result.result
- pgo = self.ns.pgo
-
- text = str(result)
- if mp_result.err_msg:
- # MULTIPROCESSING_ERROR
- text += ' (%s)' % mp_result.err_msg
- elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
- text += ' (%s)' % format_duration(result.duration)
- running = get_running(self.workers)
- if running and not pgo:
- text += ' -- running: %s' % ', '.join(running)
- self.regrtest.display_progress(self.test_index, text)
-
- def _process_result(self, item: QueueOutput) -> bool:
- """Returns True if test runner must stop."""
- rerun = self.runtests.rerun
- if item[0]:
- # Thread got an exception
- format_exc = item[1]
- print_warning(f"regrtest worker thread failed: {format_exc}")
- result = TestResult("<regrtest worker>", state=State.MULTIPROCESSING_ERROR)
- self.regrtest.accumulate_result(result, rerun=rerun)
- return result
-
- self.test_index += 1
- mp_result = item[1]
- result = mp_result.result
- self.regrtest.accumulate_result(result, rerun=rerun)
- self.display_result(mp_result)
-
- if mp_result.worker_stdout:
- print(mp_result.worker_stdout, flush=True)
-
- return result
-
- def run_tests(self) -> None:
- fail_fast = self.ns.failfast
- fail_env_changed = self.ns.fail_env_changed
- timeout = self.ns.timeout
-
- self.start_workers()
-
- self.test_index = 0
- try:
- while True:
- item = self._get_result()
- if item is None:
- break
-
- result = self._process_result(item)
- if result.must_stop(fail_fast, fail_env_changed):
- break
- except KeyboardInterrupt:
- print()
- self.regrtest.interrupted = True
- finally:
- if timeout is not None:
- faulthandler.cancel_dump_traceback_later()
-
- # Always ensure that all worker processes are no longer
- # worker when we exit this function
- self.pending.stop()
- self.stop_workers()
-
-
-def run_tests_multiprocess(regrtest: Regrtest, runtests: RunTests) -> None:
- MultiprocessTestRunner(regrtest, runtests).run_tests()
-
-
-class EncodeTestResult(json.JSONEncoder):
- """Encode a TestResult (sub)class object into a JSON dict."""
-
- def default(self, o: Any) -> dict[str, Any]:
- if isinstance(o, TestResult):
- result = dataclasses.asdict(o)
- result["__test_result__"] = o.__class__.__name__
- return result
-
- return super().default(o)
-
-
-def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]:
- """Decode a TestResult (sub)class object from a JSON dict."""
-
- if "__test_result__" not in d:
- return d
-
- d.pop('__test_result__')
- if d['stats'] is not None:
- d['stats'] = TestStats(**d['stats'])
- return TestResult(**d)
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 37fd944..ded8ad9 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -400,19 +400,19 @@ def check_sanitizer(*, address=False, memory=False, ub=False):
raise ValueError('At least one of address, memory, or ub must be True')
- _cflags = sysconfig.get_config_var('CFLAGS') or ''
- _config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
+ cflags = sysconfig.get_config_var('CFLAGS') or ''
+ config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
memory_sanitizer = (
- '-fsanitize=memory' in _cflags or
- '--with-memory-sanitizer' in _config_args
+ '-fsanitize=memory' in cflags or
+ '--with-memory-sanitizer' in config_args
)
address_sanitizer = (
- '-fsanitize=address' in _cflags or
- '--with-address-sanitizer' in _config_args
+ '-fsanitize=address' in cflags or
+ '--with-address-sanitizer' in config_args
)
ub_sanitizer = (
- '-fsanitize=undefined' in _cflags or
- '--with-undefined-behavior-sanitizer' in _config_args
+ '-fsanitize=undefined' in cflags or
+ '--with-undefined-behavior-sanitizer' in config_args
)
return (
(memory and memory_sanitizer) or
@@ -916,27 +916,31 @@ _4G = 4 * _1G
MAX_Py_ssize_t = sys.maxsize
-def set_memlimit(limit):
- global max_memuse
- global real_max_memuse
+def _parse_memlimit(limit: str) -> int:
sizes = {
'k': 1024,
'm': _1M,
'g': _1G,
't': 1024*_1G,
}
- m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
+ m = re.match(r'(\d+(?:\.\d+)?) (K|M|G|T)b?$', limit,
re.IGNORECASE | re.VERBOSE)
if m is None:
- raise ValueError('Invalid memory limit %r' % (limit,))
- memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
- real_max_memuse = memlimit
- if memlimit > MAX_Py_ssize_t:
- memlimit = MAX_Py_ssize_t
+ raise ValueError(f'Invalid memory limit: {limit!r}')
+ return int(float(m.group(1)) * sizes[m.group(2).lower()])
+
+def set_memlimit(limit: str) -> None:
+ global max_memuse
+ global real_max_memuse
+ memlimit = _parse_memlimit(limit)
if memlimit < _2G - 1:
- raise ValueError('Memory limit %r too low to be useful' % (limit,))
+ raise ValueError('Memory limit {limit!r} too low to be useful')
+
+ real_max_memuse = memlimit
+ memlimit = min(memlimit, MAX_Py_ssize_t)
max_memuse = memlimit
+
class _MemoryWatchdog:
"""An object which periodically watches the process' memory consumption
and prints it out.
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index b9b05fc..4a93249 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -764,7 +764,45 @@ class TestSupport(unittest.TestCase):
else:
self.fail("RecursionError was not raised")
- #self.assertEqual(available, 2)
+ def test_parse_memlimit(self):
+ parse = support._parse_memlimit
+ KiB = 1024
+ MiB = KiB * 1024
+ GiB = MiB * 1024
+ TiB = GiB * 1024
+ self.assertEqual(parse('0k'), 0)
+ self.assertEqual(parse('3k'), 3 * KiB)
+ self.assertEqual(parse('2.4m'), int(2.4 * MiB))
+ self.assertEqual(parse('4g'), int(4 * GiB))
+ self.assertEqual(parse('1t'), TiB)
+
+ for limit in ('', '3', '3.5.10k', '10x'):
+ with self.subTest(limit=limit):
+ with self.assertRaises(ValueError):
+ parse(limit)
+
+ def test_set_memlimit(self):
+ _4GiB = 4 * 1024 ** 3
+ TiB = 1024 ** 4
+ old_max_memuse = support.max_memuse
+ old_real_max_memuse = support.real_max_memuse
+ try:
+ if sys.maxsize > 2**32:
+ support.set_memlimit('4g')
+ self.assertEqual(support.max_memuse, _4GiB)
+ self.assertEqual(support.real_max_memuse, _4GiB)
+
+ big = 2**100 // TiB
+ support.set_memlimit(f'{big}t')
+ self.assertEqual(support.max_memuse, sys.maxsize)
+ self.assertEqual(support.real_max_memuse, big * TiB)
+ else:
+ support.set_memlimit('4g')
+ self.assertEqual(support.max_memuse, sys.maxsize)
+ self.assertEqual(support.real_max_memuse, _4GiB)
+ finally:
+ support.max_memuse = old_max_memuse
+ support.real_max_memuse = old_real_max_memuse
def test_copy_python_src_ignore(self):
# Get source directory
@@ -813,7 +851,6 @@ class TestSupport(unittest.TestCase):
# EnvironmentVarGuard
# transient_internet
# run_with_locale
- # set_memlimit
# bigmemtest
# precisionbigmemtest
# bigaddrspacetest