From 26748ed4f61520c59af15547792d1e73144a4314 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 12 Oct 2023 23:45:36 +0200 Subject: gh-110756: Sync regrtest with main branch (#110758) (#110781) Copy files from main to this branch: * Lib/test/libregrtest/*.py * Lib/test/__init__.py * Lib/test/__main__.py * Lib/test/autotest.py * Lib/test/pythoninfo.py * Lib/test/regrtest.py * Lib/test/test_regrtest.py Copy also changes from: * Lib/test/support/__init__.py * Lib/test/support/os_helper.py * Lib/test/support/testresult.py * Lib/test/support/threading_helper.py * Lib/test/test_support.py Do not modify scripts running tests such as Makefile.pre.in, .github/workflows/build.yml or Tools/scripts/run_tests.py: do not use --fast-ci and --slow-ci in this change. Changes: * SPLITTESTDIRS: don't include test_inspect. * Add utils.process_cpu_count() using len(os.sched_getaffinity(0)). * test_regrtest doesn't use @support.without_optimizer which doesn't exist in Python 3.11. * Add support.set_sanitizer_env_var(). * Update test_faulthandler to use support.set_sanitizer_env_var(). * @support.without_optimizer doesn't exist in 3.11. * Add support.Py_DEBUG. * regrtest.refleak: 3.11 doesn't have sys.getunicodeinternedsize. --- Lib/test/__main__.py | 4 +- Lib/test/autotest.py | 2 +- Lib/test/libregrtest/__init__.py | 2 - Lib/test/libregrtest/cmdline.py | 125 +++- Lib/test/libregrtest/findtests.py | 105 +++ Lib/test/libregrtest/logger.py | 86 +++ Lib/test/libregrtest/main.py | 1199 ++++++++++++++-------------------- Lib/test/libregrtest/pgo.py | 8 +- Lib/test/libregrtest/refleak.py | 35 +- Lib/test/libregrtest/result.py | 190 ++++++ Lib/test/libregrtest/results.py | 261 ++++++++ Lib/test/libregrtest/run_workers.py | 607 +++++++++++++++++ Lib/test/libregrtest/runtest.py | 479 -------------- Lib/test/libregrtest/runtest_mp.py | 564 ---------------- Lib/test/libregrtest/runtests.py | 162 +++++ Lib/test/libregrtest/save_env.py | 14 +- Lib/test/libregrtest/setup.py | 139 ++-- Lib/test/libregrtest/single.py | 278 ++++++++ Lib/test/libregrtest/utils.py | 409 +++++++++++- Lib/test/libregrtest/worker.py | 116 ++++ Lib/test/pythoninfo.py | 122 +++- Lib/test/regrtest.py | 2 +- Lib/test/support/__init__.py | 73 ++- Lib/test/support/os_helper.py | 2 +- Lib/test/support/testresult.py | 3 + Lib/test/support/threading_helper.py | 18 +- Lib/test/test_faulthandler.py | 17 +- Lib/test/test_regrtest.py | 823 +++++++++++++++++------ Lib/test/test_support.py | 43 +- 29 files changed, 3712 insertions(+), 2176 deletions(-) create mode 100644 Lib/test/libregrtest/findtests.py create mode 100644 Lib/test/libregrtest/logger.py create mode 100644 Lib/test/libregrtest/result.py create mode 100644 Lib/test/libregrtest/results.py create mode 100644 Lib/test/libregrtest/run_workers.py delete mode 100644 Lib/test/libregrtest/runtest.py delete mode 100644 Lib/test/libregrtest/runtest_mp.py create mode 100644 Lib/test/libregrtest/runtests.py create mode 100644 Lib/test/libregrtest/single.py create mode 100644 Lib/test/libregrtest/worker.py diff --git a/Lib/test/__main__.py b/Lib/test/__main__.py index 19a6b2b..82b50ad 100644 --- a/Lib/test/__main__.py +++ b/Lib/test/__main__.py @@ -1,2 +1,2 @@ -from test.libregrtest import main -main() +from test.libregrtest.main import main +main(_add_python_opts=True) diff --git a/Lib/test/autotest.py b/Lib/test/autotest.py index fa85cc1..b5a1fab 100644 --- a/Lib/test/autotest.py +++ b/Lib/test/autotest.py @@ -1,5 +1,5 @@ # This should be equivalent to running regrtest.py from the cmdline. # It can be especially handy if you're in an interactive shell, e.g., # from test import autotest. -from test.libregrtest import main +from test.libregrtest.main import main main() diff --git a/Lib/test/libregrtest/__init__.py b/Lib/test/libregrtest/__init__.py index 5e8dba5..e69de29 100644 --- a/Lib/test/libregrtest/__init__.py +++ b/Lib/test/libregrtest/__init__.py @@ -1,2 +0,0 @@ -from test.libregrtest.cmdline import _parse_args, RESOURCE_NAMES, ALL_RESOURCES -from test.libregrtest.main import main diff --git a/Lib/test/libregrtest/cmdline.py b/Lib/test/libregrtest/cmdline.py index 40ae8fc..dd4cd33 100644 --- a/Lib/test/libregrtest/cmdline.py +++ b/Lib/test/libregrtest/cmdline.py @@ -1,8 +1,9 @@ import argparse -import os +import os.path import shlex import sys from test.support import os_helper +from .utils import ALL_RESOURCES, RESOURCE_NAMES USAGE = """\ @@ -27,8 +28,10 @@ EPILOG = """\ Additional option details: -r randomizes test execution order. You can use --randseed=int to provide an -int seed value for the randomizer; this is useful for reproducing troublesome -test orders. +int seed value for the randomizer. The randseed value will be used +to set seeds for all random usages in tests +(including randomizing the tests order if -r is set). +By default we always set random seed, but do not randomize test order. -s On the first invocation of regrtest using -s, the first test file found or the first test file given on the command line is run, and the name of @@ -130,25 +133,17 @@ Pattern examples: """ -ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network', - 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui', 'walltime') - -# Other resources excluded from --use=all: -# -# - extralagefile (ex: test_zipfile64): really too slow to be enabled -# "by default" -# - tzdata: while needed to validate fully test_datetime, it makes -# test_datetime too slow (15-20 min on some buildbots) and so is disabled by -# default (see bpo-30822). -RESOURCE_NAMES = ALL_RESOURCES + ('extralargefile', 'tzdata') - - class Namespace(argparse.Namespace): def __init__(self, **kwargs) -> None: + self.ci = False self.testdir = None self.verbose = 0 self.quiet = False self.exclude = False + self.cleanup = False + self.wait = False + self.list_cases = False + self.list_tests = False self.single = False self.randomize = False self.fromfile = None @@ -157,8 +152,8 @@ class Namespace(argparse.Namespace): self.trace = False self.coverdir = 'coverage' self.runleaks = False - self.huntrleaks = False - self.verbose2 = False + self.huntrleaks: tuple[int, int, str] | None = None + self.rerun = False self.verbose3 = False self.print_slow = False self.random_seed = None @@ -170,6 +165,14 @@ class Namespace(argparse.Namespace): self.ignore_tests = None self.pgo = False self.pgo_extended = False + self.worker_json = None + self.start = None + self.timeout = None + self.memlimit = None + self.threshold = None + self.fail_rerun = False + self.tempdir = None + self._add_python_opts = True super().__init__(**kwargs) @@ -198,25 +201,35 @@ def _create_parser(): # We add help explicitly to control what argument group it renders under. group.add_argument('-h', '--help', action='help', help='show this help message and exit') - group.add_argument('--timeout', metavar='TIMEOUT', type=float, + group.add_argument('--fast-ci', action='store_true', + help='Fast Continuous Integration (CI) mode used by ' + 'GitHub Actions') + group.add_argument('--slow-ci', action='store_true', + help='Slow Continuous Integration (CI) mode used by ' + 'buildbot workers') + group.add_argument('--timeout', metavar='TIMEOUT', help='dump the traceback and exit if a test takes ' 'more than TIMEOUT seconds; disabled if TIMEOUT ' 'is negative or equals to zero') group.add_argument('--wait', action='store_true', help='wait for user input, e.g., allow a debugger ' 'to be attached') - group.add_argument('--worker-args', metavar='ARGS') group.add_argument('-S', '--start', metavar='START', help='the name of the test at which to start.' + more_details) group.add_argument('-p', '--python', metavar='PYTHON', help='Command to run Python test subprocesses with.') + group.add_argument('--randseed', metavar='SEED', + dest='random_seed', type=int, + help='pass a global random seed') group = parser.add_argument_group('Verbosity') group.add_argument('-v', '--verbose', action='count', help='run tests in verbose mode with output to stdout') - group.add_argument('-w', '--verbose2', action='store_true', + group.add_argument('-w', '--rerun', action='store_true', help='re-run failed tests in verbose mode') + group.add_argument('--verbose2', action='store_true', dest='rerun', + help='deprecated alias to --rerun') group.add_argument('-W', '--verbose3', action='store_true', help='display test output on failure') group.add_argument('-q', '--quiet', action='store_true', @@ -229,10 +242,6 @@ def _create_parser(): group = parser.add_argument_group('Selecting tests') group.add_argument('-r', '--randomize', action='store_true', help='randomize test execution order.' + more_details) - group.add_argument('--randseed', metavar='SEED', - dest='random_seed', type=int, - help='pass a random seed to reproduce a previous ' - 'random run') group.add_argument('-f', '--fromfile', metavar='FILE', help='read names of tests to run from a file.' + more_details) @@ -311,6 +320,9 @@ def _create_parser(): group.add_argument('--fail-env-changed', action='store_true', help='if a test file alters the environment, mark ' 'the test as failed') + group.add_argument('--fail-rerun', action='store_true', + help='if a test failed and then passed when re-run, ' + 'mark the tests as failed') group.add_argument('--junit-xml', dest='xmlpath', metavar='FILENAME', help='writes JUnit-style XML results to the specified ' @@ -319,6 +331,9 @@ def _create_parser(): help='override the working directory for the test run') group.add_argument('--cleanup', action='store_true', help='remove old test_python_* directories') + group.add_argument('--dont-add-python-opts', dest='_add_python_opts', + action='store_false', + help="internal option, don't use it") return parser @@ -369,7 +384,50 @@ def _parse_args(args, **kwargs): for arg in ns.args: if arg.startswith('-'): parser.error("unrecognized arguments: %s" % arg) - sys.exit(1) + + if ns.timeout is not None: + # Support "--timeout=" (no value) so Makefile.pre.pre TESTTIMEOUT + # can be used by "make buildbottest" and "make test". + if ns.timeout != "": + try: + ns.timeout = float(ns.timeout) + except ValueError: + parser.error(f"invalid timeout value: {ns.timeout!r}") + else: + ns.timeout = None + + # Continuous Integration (CI): common options for fast/slow CI modes + if ns.slow_ci or ns.fast_ci: + # Similar to options: + # + # -j0 --randomize --fail-env-changed --fail-rerun --rerun + # --slowest --verbose3 + if ns.use_mp is None: + ns.use_mp = 0 + ns.randomize = True + ns.fail_env_changed = True + ns.fail_rerun = True + if ns.python is None: + ns.rerun = True + ns.print_slow = True + ns.verbose3 = True + else: + ns._add_python_opts = False + + # When both --slow-ci and --fast-ci options are present, + # --slow-ci has the priority + if ns.slow_ci: + # Similar to: -u "all" --timeout=1200 + if not ns.use: + ns.use = [['all']] + if ns.timeout is None: + ns.timeout = 1200 # 20 minutes + elif ns.fast_ci: + # Similar to: -u "all,-cpu" --timeout=600 + if not ns.use: + ns.use = [['all', '-cpu']] + if ns.timeout is None: + ns.timeout = 600 # 10 minutes if ns.single and ns.fromfile: parser.error("-s and -f don't go together!") @@ -382,7 +440,7 @@ def _parse_args(args, **kwargs): ns.python = shlex.split(ns.python) if ns.failfast and not (ns.verbose or ns.verbose3): parser.error("-G/--failfast needs either -v or -W") - if ns.pgo and (ns.verbose or ns.verbose2 or ns.verbose3): + if ns.pgo and (ns.verbose or ns.rerun or ns.verbose3): parser.error("--pgo/-v don't go together!") if ns.pgo_extended: ns.pgo = True # pgo_extended implies pgo @@ -396,10 +454,6 @@ def _parse_args(args, **kwargs): if ns.timeout is not None: if ns.timeout <= 0: ns.timeout = None - if ns.use_mp is not None: - if ns.use_mp <= 0: - # Use all cores + extras for tests that like to sleep - ns.use_mp = 2 + (os.cpu_count() or 1) if ns.use: for a in ns.use: for r in a: @@ -443,4 +497,13 @@ def _parse_args(args, **kwargs): # --forever implies --failfast ns.failfast = True + if ns.huntrleaks: + warmup, repetitions, _ = ns.huntrleaks + if warmup < 1 or repetitions < 1: + msg = ("Invalid values for the --huntrleaks/-R parameters. The " + "number of warmups and repetitions must be at least 1 " + "each (1:1).") + print(msg, file=sys.stderr, flush=True) + sys.exit(2) + return ns diff --git a/Lib/test/libregrtest/findtests.py b/Lib/test/libregrtest/findtests.py new file mode 100644 index 0000000..96cc3e0 --- /dev/null +++ b/Lib/test/libregrtest/findtests.py @@ -0,0 +1,105 @@ +import os +import sys +import unittest + +from test import support + +from .utils import ( + StrPath, TestName, TestTuple, TestList, FilterTuple, + abs_module_name, count, printlist) + + +# If these test directories are encountered recurse into them and treat each +# "test_*.py" file or each sub-directory as a separate test module. This can +# increase parallelism. +# +# Beware this can't generally be done for any directory with sub-tests as the +# __init__.py may do things which alter what tests are to be run. +SPLITTESTDIRS: set[TestName] = { + "test_asyncio", + "test_concurrent_futures", + "test_future_stmt", + "test_gdb", + "test_multiprocessing_fork", + "test_multiprocessing_forkserver", + "test_multiprocessing_spawn", +} + + +def findtestdir(path: StrPath | None = None) -> StrPath: + return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir + + +def findtests(*, testdir: StrPath | None = None, exclude=(), + split_test_dirs: set[TestName] = SPLITTESTDIRS, + base_mod: str = "") -> TestList: + """Return a list of all applicable test modules.""" + testdir = findtestdir(testdir) + tests = [] + for name in os.listdir(testdir): + mod, ext = os.path.splitext(name) + if (not mod.startswith("test_")) or (mod in exclude): + continue + if base_mod: + fullname = f"{base_mod}.{mod}" + else: + fullname = mod + if fullname in split_test_dirs: + subdir = os.path.join(testdir, mod) + if not base_mod: + fullname = f"test.{mod}" + tests.extend(findtests(testdir=subdir, exclude=exclude, + split_test_dirs=split_test_dirs, + base_mod=fullname)) + elif ext in (".py", ""): + tests.append(fullname) + return sorted(tests) + + +def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(), + split_test_dirs=SPLITTESTDIRS): + testdir = findtestdir(testdir) + splitted = [] + for name in tests: + if name in split_test_dirs: + subdir = os.path.join(testdir, name) + splitted.extend(findtests(testdir=subdir, exclude=exclude, + split_test_dirs=split_test_dirs, + base_mod=name)) + else: + splitted.append(name) + return splitted + + +def _list_cases(suite): + for test in suite: + if isinstance(test, unittest.loader._FailedTest): + continue + if isinstance(test, unittest.TestSuite): + _list_cases(test) + elif isinstance(test, unittest.TestCase): + if support.match_test(test): + print(test.id()) + +def list_cases(tests: TestTuple, *, + match_tests: FilterTuple | None = None, + ignore_tests: FilterTuple | None = None, + test_dir: StrPath | None = None): + support.verbose = False + support.set_match_tests(match_tests, ignore_tests) + + skipped = [] + for test_name in tests: + module_name = abs_module_name(test_name, test_dir) + try: + suite = unittest.defaultTestLoader.loadTestsFromName(module_name) + _list_cases(suite) + except unittest.SkipTest: + skipped.append(test_name) + + if skipped: + sys.stdout.flush() + stderr = sys.stderr + print(file=stderr) + print(count(len(skipped), "test"), "skipped:", file=stderr) + printlist(skipped, file=stderr) diff --git a/Lib/test/libregrtest/logger.py b/Lib/test/libregrtest/logger.py new file mode 100644 index 0000000..a125706 --- /dev/null +++ b/Lib/test/libregrtest/logger.py @@ -0,0 +1,86 @@ +import os +import time + +from test.support import MS_WINDOWS +from .results import TestResults +from .runtests import RunTests +from .utils import print_warning + +if MS_WINDOWS: + from .win_utils import WindowsLoadTracker + + +class Logger: + def __init__(self, results: TestResults, quiet: bool, pgo: bool): + self.start_time = time.perf_counter() + self.test_count_text = '' + self.test_count_width = 3 + self.win_load_tracker: WindowsLoadTracker | None = None + self._results: TestResults = results + self._quiet: bool = quiet + self._pgo: bool = pgo + + def log(self, line: str = '') -> None: + empty = not line + + # add the system load prefix: "load avg: 1.80 " + load_avg = self.get_load_avg() + if load_avg is not None: + line = f"load avg: {load_avg:.2f} {line}" + + # add the timestamp prefix: "0:01:05 " + log_time = time.perf_counter() - self.start_time + + mins, secs = divmod(int(log_time), 60) + hours, mins = divmod(mins, 60) + formatted_log_time = "%d:%02d:%02d" % (hours, mins, secs) + + line = f"{formatted_log_time} {line}" + if empty: + line = line[:-1] + + print(line, flush=True) + + def get_load_avg(self) -> float | None: + if hasattr(os, 'getloadavg'): + return os.getloadavg()[0] + if self.win_load_tracker is not None: + return self.win_load_tracker.getloadavg() + return None + + def display_progress(self, test_index: int, text: str) -> None: + if self._quiet: + return + results = self._results + + # "[ 51/405/1] test_tcl passed" + line = f"{test_index:{self.test_count_width}}{self.test_count_text}" + fails = len(results.bad) + len(results.env_changed) + if fails and not self._pgo: + line = f"{line}/{fails}" + self.log(f"[{line}] {text}") + + def set_tests(self, runtests: RunTests) -> None: + if runtests.forever: + self.test_count_text = '' + self.test_count_width = 3 + else: + self.test_count_text = '/{}'.format(len(runtests.tests)) + self.test_count_width = len(self.test_count_text) - 1 + + def start_load_tracker(self) -> None: + if not MS_WINDOWS: + return + + try: + self.win_load_tracker = WindowsLoadTracker() + except PermissionError as error: + # Standard accounts may not have access to the performance + # counters. + print_warning(f'Failed to create WindowsLoadTracker: {error}') + + def stop_load_tracker(self) -> None: + if self.win_load_tracker is None: + return + self.win_load_tracker.close() + self.win_load_tracker = None diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index 7192244..fe35df0 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -1,45 +1,30 @@ -import faulthandler -import locale import os -import platform import random import re +import shlex import sys import sysconfig -import tempfile import time -import unittest -from test.libregrtest.cmdline import _parse_args -from test.libregrtest.runtest import ( - findtests, split_test_packages, runtest, get_abs_module, - PROGRESS_MIN_TIME, State) -from test.libregrtest.setup import setup_tests -from test.libregrtest.pgo import setup_pgo_tests -from test.libregrtest.utils import (removepy, count, format_duration, - printlist, get_build_info) -from test import support -from test.support import TestStats -from test.support import os_helper -from test.support import threading_helper - - -# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()). -# Used to protect against threading._shutdown() hang. -# Must be smaller than buildbot "1200 seconds without output" limit. -EXIT_TIMEOUT = 120.0 -# gh-90681: When rerunning tests, we might need to rerun the whole -# class or module suite if some its life-cycle hooks fail. -# Test level hooks are not affected. -_TEST_LIFECYCLE_HOOKS = frozenset(( - 'setUpClass', 'tearDownClass', - 'setUpModule', 'tearDownModule', -)) - -EXITCODE_BAD_TEST = 2 -EXITCODE_INTERRUPTED = 130 -EXITCODE_ENV_CHANGED = 3 -EXITCODE_NO_TESTS_RAN = 4 +from test import support +from test.support import os_helper, MS_WINDOWS + +from .cmdline import _parse_args, Namespace +from .findtests import findtests, split_test_packages, list_cases +from .logger import Logger +from .pgo import setup_pgo_tests +from .result import State +from .results import TestResults, EXITCODE_INTERRUPTED +from .runtests import RunTests, HuntRefleak +from .setup import setup_process, setup_test_dir +from .single import run_single_test, PROGRESS_MIN_TIME +from .utils import ( + StrPath, StrJSON, TestName, TestList, TestTuple, FilterTuple, + strip_py_suffix, count, format_duration, + printlist, get_temp_dir, get_work_dir, exit_timeout, + display_header, cleanup_temp_dir, print_warning, + is_cross_compiled, get_host_runner, process_cpu_count, + EXIT_TIMEOUT) class Regrtest: @@ -65,266 +50,212 @@ class Regrtest: directly to set the values that would normally be set by flags on the command line. """ - def __init__(self): - # Namespace of command line options - self.ns = None + def __init__(self, ns: Namespace, _add_python_opts: bool = False): + # Log verbosity + self.verbose: int = int(ns.verbose) + self.quiet: bool = ns.quiet + self.pgo: bool = ns.pgo + self.pgo_extended: bool = ns.pgo_extended + + # Test results + self.results: TestResults = TestResults() + self.first_state: str | None = None + + # Logger + self.logger = Logger(self.results, self.quiet, self.pgo) + + # Actions + self.want_header: bool = ns.header + self.want_list_tests: bool = ns.list_tests + self.want_list_cases: bool = ns.list_cases + self.want_wait: bool = ns.wait + self.want_cleanup: bool = ns.cleanup + self.want_rerun: bool = ns.rerun + self.want_run_leaks: bool = ns.runleaks + + self.ci_mode: bool = (ns.fast_ci or ns.slow_ci) + self.want_add_python_opts: bool = (_add_python_opts + and ns._add_python_opts) + + # Select tests + if ns.match_tests: + self.match_tests: FilterTuple | None = tuple(ns.match_tests) + else: + self.match_tests = None + if ns.ignore_tests: + self.ignore_tests: FilterTuple | None = tuple(ns.ignore_tests) + else: + self.ignore_tests = None + self.exclude: bool = ns.exclude + self.fromfile: StrPath | None = ns.fromfile + self.starting_test: TestName | None = ns.start + self.cmdline_args: TestList = ns.args + + # Workers + if ns.use_mp is None: + num_workers = 0 # run sequentially + elif ns.use_mp <= 0: + num_workers = -1 # use the number of CPUs + else: + num_workers = ns.use_mp + self.num_workers: int = num_workers + self.worker_json: StrJSON | None = ns.worker_json + + # Options to run tests + self.fail_fast: bool = ns.failfast + self.fail_env_changed: bool = ns.fail_env_changed + self.fail_rerun: bool = ns.fail_rerun + self.forever: bool = ns.forever + self.output_on_failure: bool = ns.verbose3 + self.timeout: float | None = ns.timeout + if ns.huntrleaks: + warmups, runs, filename = ns.huntrleaks + filename = os.path.abspath(filename) + self.hunt_refleak: HuntRefleak | None = HuntRefleak(warmups, runs, filename) + else: + self.hunt_refleak = None + self.test_dir: StrPath | None = ns.testdir + self.junit_filename: StrPath | None = ns.xmlpath + self.memory_limit: str | None = ns.memlimit + self.gc_threshold: int | None = ns.threshold + self.use_resources: tuple[str, ...] = tuple(ns.use_resources) + if ns.python: + self.python_cmd: tuple[str, ...] | None = tuple(ns.python) + else: + self.python_cmd = None + self.coverage: bool = ns.trace + self.coverage_dir: StrPath | None = ns.coverdir + self.tmp_dir: StrPath | None = ns.tempdir + + # Randomize + self.randomize: bool = ns.randomize + self.random_seed: int | None = ( + ns.random_seed + if ns.random_seed is not None + else random.getrandbits(32) + ) + if 'SOURCE_DATE_EPOCH' in os.environ: + self.randomize = False + self.random_seed = None # tests - self.tests = [] - self.selected = [] - - # test results - self.good = [] - self.bad = [] - self.skipped = [] - self.resource_denied = [] - self.environment_changed = [] - self.run_no_tests = [] - self.need_rerun = [] - self.rerun = [] - self.first_result = None - self.interrupted = False - self.stats_dict: dict[str, TestStats] = {} - - # used by --slow - self.test_times = [] - - # used by --coverage, trace.Trace instance - self.tracer = None + self.first_runtests: RunTests | None = None + + # used by --slowest + self.print_slowest: bool = ns.print_slow # used to display the progress bar "[ 3/100]" self.start_time = time.perf_counter() - self.test_count = '' - self.test_count_width = 1 # used by --single - self.next_single_test = None - self.next_single_filename = None - - # used by --junit-xml - self.testsuite_xml = None - - # misc - self.win_load_tracker = None - self.tmp_dir = None - self.worker_test_name = None - - def get_executed(self): - return (set(self.good) | set(self.bad) | set(self.skipped) - | set(self.resource_denied) | set(self.environment_changed) - | set(self.run_no_tests)) - - def accumulate_result(self, result, rerun=False): - test_name = result.test_name - - if result.has_meaningful_duration() and not rerun: - self.test_times.append((result.duration, test_name)) - - match result.state: - case State.PASSED: - self.good.append(test_name) - case State.ENV_CHANGED: - self.environment_changed.append(test_name) - case State.SKIPPED: - self.skipped.append(test_name) - case State.RESOURCE_DENIED: - self.skipped.append(test_name) - self.resource_denied.append(test_name) - case State.INTERRUPTED: - self.interrupted = True - case State.DID_NOT_RUN: - self.run_no_tests.append(test_name) - case _: - if result.is_failed(self.ns.fail_env_changed): - if not rerun: - self.bad.append(test_name) - self.need_rerun.append(result) - else: - raise ValueError(f"invalid test state: {state!r}") - - if result.stats is not None: - self.stats_dict[result.test_name] = result.stats - - if rerun and not(result.is_failed(False) or result.state == State.INTERRUPTED): - self.bad.remove(test_name) - - xml_data = result.xml_data - if xml_data: - import xml.etree.ElementTree as ET - for e in xml_data: - try: - self.testsuite_xml.append(ET.fromstring(e)) - except ET.ParseError: - print(xml_data, file=sys.__stderr__) - raise + self.single_test_run: bool = ns.single + self.next_single_test: TestName | None = None + self.next_single_filename: StrPath | None = None def log(self, line=''): - empty = not line - - # add the system load prefix: "load avg: 1.80 " - load_avg = self.getloadavg() - if load_avg is not None: - line = f"load avg: {load_avg:.2f} {line}" - - # add the timestamp prefix: "0:01:05 " - test_time = time.perf_counter() - self.start_time - - mins, secs = divmod(int(test_time), 60) - hours, mins = divmod(mins, 60) - test_time = "%d:%02d:%02d" % (hours, mins, secs) - - line = f"{test_time} {line}" - if empty: - line = line[:-1] - - print(line, flush=True) - - def display_progress(self, test_index, text): - if self.ns.quiet: - return - - # "[ 51/405/1] test_tcl passed" - line = f"{test_index:{self.test_count_width}}{self.test_count}" - fails = len(self.bad) + len(self.environment_changed) - if fails and not self.ns.pgo: - line = f"{line}/{fails}" - self.log(f"[{line}] {text}") - - def parse_args(self, kwargs): - ns = _parse_args(sys.argv[1:], **kwargs) - - if ns.xmlpath: - support.junit_xml_list = self.testsuite_xml = [] - - worker_args = ns.worker_args - if worker_args is not None: - from test.libregrtest.runtest_mp import parse_worker_args - ns, test_name = parse_worker_args(ns.worker_args) - ns.worker_args = worker_args - self.worker_test_name = test_name - - # Strip .py extensions. - removepy(ns.args) + self.logger.log(line) - if ns.huntrleaks: - warmup, repetitions, _ = ns.huntrleaks - if warmup < 1 or repetitions < 1: - msg = ("Invalid values for the --huntrleaks/-R parameters. The " - "number of warmups and repetitions must be at least 1 " - "each (1:1).") - print(msg, file=sys.stderr, flush=True) - sys.exit(2) - - if ns.tempdir: - ns.tempdir = os.path.expanduser(ns.tempdir) - - self.ns = ns - - def find_tests(self, tests): - self.tests = tests - - if self.ns.single: + def find_tests(self, tests: TestList | None = None) -> tuple[TestTuple, TestList | None]: + if self.single_test_run: self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest') try: with open(self.next_single_filename, 'r') as fp: next_test = fp.read().strip() - self.tests = [next_test] + tests = [next_test] except OSError: pass - if self.ns.fromfile: - self.tests = [] + if self.fromfile: + tests = [] # regex to match 'test_builtin' in line: # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec' regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b') - with open(os.path.join(os_helper.SAVEDCWD, self.ns.fromfile)) as fp: + with open(os.path.join(os_helper.SAVEDCWD, self.fromfile)) as fp: for line in fp: line = line.split('#', 1)[0] line = line.strip() match = regex.search(line) if match is not None: - self.tests.append(match.group()) + tests.append(match.group()) - removepy(self.tests) + strip_py_suffix(tests) - if self.ns.pgo: + if self.pgo: # add default PGO tests if no tests are specified - setup_pgo_tests(self.ns) + setup_pgo_tests(self.cmdline_args, self.pgo_extended) - exclude = set() - if self.ns.exclude: - for arg in self.ns.args: - exclude.add(arg) - self.ns.args = [] + exclude_tests = set() + if self.exclude: + for arg in self.cmdline_args: + exclude_tests.add(arg) + self.cmdline_args = [] - alltests = findtests(testdir=self.ns.testdir, exclude=exclude) + alltests = findtests(testdir=self.test_dir, + exclude=exclude_tests) - if not self.ns.fromfile: - self.selected = self.tests or self.ns.args - if self.selected: - self.selected = split_test_packages(self.selected) + if not self.fromfile: + selected = tests or self.cmdline_args + if selected: + selected = split_test_packages(selected) else: - self.selected = alltests + selected = alltests else: - self.selected = self.tests + selected = tests - if self.ns.single: - self.selected = self.selected[:1] + if self.single_test_run: + selected = selected[:1] try: - pos = alltests.index(self.selected[0]) + pos = alltests.index(selected[0]) self.next_single_test = alltests[pos + 1] except IndexError: pass # Remove all the selected tests that precede start if it's set. - if self.ns.start: + if self.starting_test: try: - del self.selected[:self.selected.index(self.ns.start)] + del selected[:selected.index(self.starting_test)] except ValueError: - print("Couldn't find starting test (%s), using all tests" - % self.ns.start, file=sys.stderr) - - if self.ns.randomize: - if self.ns.random_seed is None: - self.ns.random_seed = random.randrange(10000000) - random.seed(self.ns.random_seed) - random.shuffle(self.selected) + print(f"Cannot find starting test: {self.starting_test}") + sys.exit(1) - def list_tests(self): - for name in self.selected: - print(name) - - def _list_cases(self, suite): - for test in suite: - if isinstance(test, unittest.loader._FailedTest): - continue - if isinstance(test, unittest.TestSuite): - self._list_cases(test) - elif isinstance(test, unittest.TestCase): - if support.match_test(test): - print(test.id()) - - def list_cases(self): - support.verbose = False - support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests) - - for test_name in self.selected: - abstest = get_abs_module(self.ns, test_name) - try: - suite = unittest.defaultTestLoader.loadTestsFromName(abstest) - self._list_cases(suite) - except unittest.SkipTest: - self.skipped.append(test_name) + random.seed(self.random_seed) + if self.randomize: + random.shuffle(selected) - if self.skipped: - print(file=sys.stderr) - print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr) - printlist(self.skipped, file=sys.stderr) + return (tuple(selected), tests) - def rerun_failed_tests(self): - self.log() + @staticmethod + def list_tests(tests: TestTuple): + for name in tests: + print(name) - if self.ns.python: + def _rerun_failed_tests(self, runtests: RunTests): + # Configure the runner to re-run tests + if self.num_workers == 0: + # Always run tests in fresh processes to have more deterministic + # initial state. Don't re-run tests in parallel but limit to a + # single worker process to have side effects (on the system load + # and timings) between tests. + self.num_workers = 1 + + tests, match_tests_dict = self.results.prepare_rerun() + + # Re-run failed tests + self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses") + runtests = runtests.copy( + tests=tests, + rerun=True, + verbose=True, + forever=False, + fail_fast=False, + match_tests_dict=match_tests_dict, + output_on_failure=False) + self.logger.set_tests(runtests) + self._run_tests_mp(runtests, self.num_workers) + return runtests + + def rerun_failed_tests(self, runtests: RunTests): + if self.python_cmd: # Temp patch for https://github.com/python/cpython/issues/94052 self.log( "Re-running failed tests is not supported with --python " @@ -332,160 +263,81 @@ class Regrtest: ) return - self.ns.verbose = True - self.ns.failfast = False - self.ns.verbose3 = False - - self.first_result = self.get_tests_result() - - self.log("Re-running failed tests in verbose mode") - rerun_list = list(self.need_rerun) - self.need_rerun.clear() - for result in rerun_list: - test_name = result.test_name - self.rerun.append(test_name) - - errors = result.errors or [] - failures = result.failures or [] - error_names = [ - self.normalize_test_name(test_full_name, is_error=True) - for (test_full_name, *_) in errors] - failure_names = [ - self.normalize_test_name(test_full_name) - for (test_full_name, *_) in failures] - self.ns.verbose = True - orig_match_tests = self.ns.match_tests - if errors or failures: - if self.ns.match_tests is None: - self.ns.match_tests = [] - self.ns.match_tests.extend(error_names) - self.ns.match_tests.extend(failure_names) - matching = "matching: " + ", ".join(self.ns.match_tests) - self.log(f"Re-running {test_name} in verbose mode ({matching})") - else: - self.log(f"Re-running {test_name} in verbose mode") - result = runtest(self.ns, test_name) - self.ns.match_tests = orig_match_tests + self.first_state = self.get_state() - self.accumulate_result(result, rerun=True) + print() + rerun_runtests = self._rerun_failed_tests(runtests) - if result.state == State.INTERRUPTED: - break + if self.results.bad: + print(count(len(self.results.bad), 'test'), "failed again:") + printlist(self.results.bad) + + self.display_result(rerun_runtests) - if self.bad: - print(count(len(self.bad), 'test'), "failed again:") - printlist(self.bad) - - self.display_result() - - def normalize_test_name(self, test_full_name, *, is_error=False): - short_name = test_full_name.split(" ")[0] - if is_error and short_name in _TEST_LIFECYCLE_HOOKS: - # This means that we have a failure in a life-cycle hook, - # we need to rerun the whole module or class suite. - # Basically the error looks like this: - # ERROR: setUpClass (test.test_reg_ex.RegTest) - # or - # ERROR: setUpModule (test.test_reg_ex) - # So, we need to parse the class / module name. - lpar = test_full_name.index('(') - rpar = test_full_name.index(')') - return test_full_name[lpar + 1: rpar].split('.')[-1] - return short_name - - def display_result(self): + def display_result(self, runtests): # If running the test suite for PGO then no one cares about results. - if self.ns.pgo: + if runtests.pgo: return + state = self.get_state() print() - print("== Tests result: %s ==" % self.get_tests_result()) - - if self.interrupted: - print("Test suite interrupted by signal SIGINT.") - - omitted = set(self.selected) - self.get_executed() - if omitted: - print() - print(count(len(omitted), "test"), "omitted:") - printlist(omitted) - - if self.good and not self.ns.quiet: - print() - if (not self.bad - and not self.skipped - and not self.interrupted - and len(self.good) > 1): - print("All", end=' ') - print(count(len(self.good), "test"), "OK.") - - if self.ns.print_slow: - self.test_times.sort(reverse=True) - print() - print("10 slowest tests:") - for test_time, test in self.test_times[:10]: - print("- %s: %s" % (test, format_duration(test_time))) - - if self.bad: - print() - print(count(len(self.bad), "test"), "failed:") - printlist(self.bad) - - if self.environment_changed: - print() - print("{} altered the execution environment:".format( - count(len(self.environment_changed), "test"))) - printlist(self.environment_changed) - - if self.skipped and not self.ns.quiet: - print() - print(count(len(self.skipped), "test"), "skipped:") - printlist(self.skipped) - - if self.rerun: - print() - print("%s:" % count(len(self.rerun), "re-run test")) - printlist(self.rerun) - - if self.run_no_tests: - print() - print(count(len(self.run_no_tests), "test"), "run no tests:") - printlist(self.run_no_tests) - - def run_tests_sequential(self): - if self.ns.trace: + print(f"== Tests result: {state} ==") + + self.results.display_result(runtests.tests, + self.quiet, self.print_slowest) + + def run_test(self, test_name: TestName, runtests: RunTests, tracer): + if tracer is not None: + # If we're tracing code coverage, then we don't exit with status + # if on a false return value from main. + cmd = ('result = run_single_test(test_name, runtests)') + namespace = dict(locals()) + tracer.runctx(cmd, globals=globals(), locals=namespace) + result = namespace['result'] + else: + result = run_single_test(test_name, runtests) + + self.results.accumulate_result(result, runtests) + + return result + + def run_tests_sequentially(self, runtests): + if self.coverage: import trace - self.tracer = trace.Trace(trace=False, count=True) + tracer = trace.Trace(trace=False, count=True) + else: + tracer = None save_modules = sys.modules.keys() - msg = "Run tests sequentially" - if self.ns.timeout: - msg += " (timeout: %s)" % format_duration(self.ns.timeout) + jobs = runtests.get_jobs() + if jobs is not None: + tests = count(jobs, 'test') + else: + tests = 'tests' + msg = f"Run {tests} sequentially" + if runtests.timeout: + msg += " (timeout: %s)" % format_duration(runtests.timeout) self.log(msg) previous_test = None - for test_index, test_name in enumerate(self.tests, 1): + tests_iter = runtests.iter_tests() + for test_index, test_name in enumerate(tests_iter, 1): start_time = time.perf_counter() text = test_name if previous_test: text = '%s -- %s' % (text, previous_test) - self.display_progress(test_index, text) - - if self.tracer: - # If we're tracing code coverage, then we don't exit with status - # if on a false return value from main. - cmd = ('result = runtest(self.ns, test_name); ' - 'self.accumulate_result(result)') - ns = dict(locals()) - self.tracer.runctx(cmd, globals=globals(), locals=ns) - result = ns['result'] - else: - result = runtest(self.ns, test_name) - self.accumulate_result(result) + self.logger.display_progress(test_index, text) + + result = self.run_test(test_name, runtests, tracer) - if result.state == State.INTERRUPTED: + # Unload the newly imported modules (best effort finalization) + for module in sys.modules.keys(): + if module not in save_modules and module.startswith("test."): + support.unload(module) + + if result.must_stop(self.fail_fast, self.fail_env_changed): break previous_test = str(result) @@ -496,140 +348,22 @@ class Regrtest: # be quiet: say nothing if the test passed shortly previous_test = None - # Unload the newly imported modules (best effort finalization) - for module in sys.modules.keys(): - if module not in save_modules and module.startswith("test."): - support.unload(module) - - if self.ns.failfast and result.is_failed(self.ns.fail_env_changed): - break - if previous_test: print(previous_test) - def _test_forever(self, tests): - while True: - for test_name in tests: - yield test_name - if self.bad: - return - if self.ns.fail_env_changed and self.environment_changed: - return - - def display_header(self): - # Print basic platform information - print("==", platform.python_implementation(), *sys.version.split()) - print("==", platform.platform(aliased=True), - "%s-endian" % sys.byteorder) - print("== Python build:", ' '.join(get_build_info())) - print("== cwd:", os.getcwd()) - cpu_count = os.cpu_count() - if cpu_count: - print("== CPU count:", cpu_count) - print("== encodings: locale=%s, FS=%s" - % (locale.getencoding(), sys.getfilesystemencoding())) - self.display_sanitizers() - - def display_sanitizers(self): - # This makes it easier to remember what to set in your local - # environment when trying to reproduce a sanitizer failure. - asan = support.check_sanitizer(address=True) - msan = support.check_sanitizer(memory=True) - ubsan = support.check_sanitizer(ub=True) - sanitizers = [] - if asan: - sanitizers.append("address") - if msan: - sanitizers.append("memory") - if ubsan: - sanitizers.append("undefined behavior") - if not sanitizers: - return - - print(f"== sanitizers: {', '.join(sanitizers)}") - for sanitizer, env_var in ( - (asan, "ASAN_OPTIONS"), - (msan, "MSAN_OPTIONS"), - (ubsan, "UBSAN_OPTIONS"), - ): - options= os.environ.get(env_var) - if sanitizer and options is not None: - print(f"== {env_var}={options!r}") - - def no_tests_run(self): - return not any((self.good, self.bad, self.skipped, self.interrupted, - self.environment_changed)) - - def get_tests_result(self): - result = [] - if self.bad: - result.append("FAILURE") - elif self.ns.fail_env_changed and self.environment_changed: - result.append("ENV CHANGED") - elif self.no_tests_run(): - result.append("NO TESTS RAN") - - if self.interrupted: - result.append("INTERRUPTED") - - if not result: - result.append("SUCCESS") - - result = ', '.join(result) - if self.first_result: - result = '%s then %s' % (self.first_result, result) - return result + return tracer - def run_tests(self): - # For a partial run, we do not need to clutter the output. - if (self.ns.header - or not(self.ns.pgo or self.ns.quiet or self.ns.single - or self.tests or self.ns.args)): - self.display_header() - - if self.ns.huntrleaks: - warmup, repetitions, _ = self.ns.huntrleaks - if warmup < 3: - msg = ("WARNING: Running tests with --huntrleaks/-R and less than " - "3 warmup repetitions can give false positives!") - print(msg, file=sys.stdout, flush=True) - - if self.ns.randomize: - print("Using random seed", self.ns.random_seed) - - if self.ns.forever: - self.tests = self._test_forever(list(self.selected)) - self.test_count = '' - self.test_count_width = 3 - else: - self.tests = iter(self.selected) - self.test_count = '/{}'.format(len(self.selected)) - self.test_count_width = len(self.test_count) - 1 - - if self.ns.use_mp: - from test.libregrtest.runtest_mp import run_tests_multiprocess - # If we're on windows and this is the parent runner (not a worker), - # track the load average. - if sys.platform == 'win32' and self.worker_test_name is None: - from test.libregrtest.win_utils import WindowsLoadTracker - - try: - self.win_load_tracker = WindowsLoadTracker() - except PermissionError as error: - # Standard accounts may not have access to the performance - # counters. - print(f'Failed to create WindowsLoadTracker: {error}') + def get_state(self): + state = self.results.get_state(self.fail_env_changed) + if self.first_state: + state = f'{self.first_state} then {state}' + return state - try: - run_tests_multiprocess(self) - finally: - if self.win_load_tracker is not None: - self.win_load_tracker.close() - self.win_load_tracker = None - else: - self.run_tests_sequential() + def _run_tests_mp(self, runtests: RunTests, num_workers: int) -> None: + from .run_workers import RunWorkers + RunWorkers(num_workers, runtests, self.logger, self.results).run() - def finalize(self): + def finalize_tests(self, tracer): if self.next_single_filename: if self.next_single_test: with open(self.next_single_filename, 'w') as fp: @@ -637,232 +371,299 @@ class Regrtest: else: os.unlink(self.next_single_filename) - if self.tracer: - r = self.tracer.results() - r.write_results(show_missing=True, summary=True, - coverdir=self.ns.coverdir) - - print() - self.display_summary() + if tracer is not None: + results = tracer.results() + results.write_results(show_missing=True, summary=True, + coverdir=self.coverage_dir) - if self.ns.runleaks: + if self.want_run_leaks: os.system("leaks %d" % os.getpid()) + if self.junit_filename: + self.results.write_junit(self.junit_filename) + def display_summary(self): - duration = time.perf_counter() - self.start_time + duration = time.perf_counter() - self.logger.start_time + filtered = bool(self.match_tests) or bool(self.ignore_tests) # Total duration + print() print("Total duration: %s" % format_duration(duration)) - # Total tests - total = TestStats() - for stats in self.stats_dict.values(): - total.accumulate(stats) - stats = [f'run={total.tests_run:,}'] - if total.failures: - stats.append(f'failures={total.failures:,}') - if total.skipped: - stats.append(f'skipped={total.skipped:,}') - print(f"Total tests: {' '.join(stats)}") - - # Total test files - report = [f'success={len(self.good)}'] - if self.bad: - report.append(f'failed={len(self.bad)}') - if self.environment_changed: - report.append(f'env_changed={len(self.environment_changed)}') - if self.skipped: - report.append(f'skipped={len(self.skipped)}') - if self.resource_denied: - report.append(f'resource_denied={len(self.resource_denied)}') - if self.rerun: - report.append(f'rerun={len(self.rerun)}') - if self.run_no_tests: - report.append(f'run_no_tests={len(self.run_no_tests)}') - print(f"Total test files: {' '.join(report)}") + self.results.display_summary(self.first_runtests, filtered) # Result - result = self.get_tests_result() - print(f"Result: {result}") + state = self.get_state() + print(f"Result: {state}") + + def create_run_tests(self, tests: TestTuple): + return RunTests( + tests, + fail_fast=self.fail_fast, + fail_env_changed=self.fail_env_changed, + match_tests=self.match_tests, + ignore_tests=self.ignore_tests, + match_tests_dict=None, + rerun=False, + forever=self.forever, + pgo=self.pgo, + pgo_extended=self.pgo_extended, + output_on_failure=self.output_on_failure, + timeout=self.timeout, + verbose=self.verbose, + quiet=self.quiet, + hunt_refleak=self.hunt_refleak, + test_dir=self.test_dir, + use_junit=(self.junit_filename is not None), + memory_limit=self.memory_limit, + gc_threshold=self.gc_threshold, + use_resources=self.use_resources, + python_cmd=self.python_cmd, + randomize=self.randomize, + random_seed=self.random_seed, + json_file=None, + ) + + def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int: + if self.hunt_refleak and self.hunt_refleak.warmups < 3: + msg = ("WARNING: Running tests with --huntrleaks/-R and " + "less than 3 warmup repetitions can give false positives!") + print(msg, file=sys.stdout, flush=True) + + if self.num_workers < 0: + # Use all CPUs + 2 extra worker processes for tests + # that like to sleep + self.num_workers = (process_cpu_count() or 1) + 2 - def save_xml_result(self): - if not self.ns.xmlpath and not self.testsuite_xml: - return + # For a partial run, we do not need to clutter the output. + if (self.want_header + or not(self.pgo or self.quiet or self.single_test_run + or tests or self.cmdline_args)): + display_header(self.use_resources, self.python_cmd) - import xml.etree.ElementTree as ET - root = ET.Element("testsuites") - - # Manually count the totals for the overall summary - totals = {'tests': 0, 'errors': 0, 'failures': 0} - for suite in self.testsuite_xml: - root.append(suite) - for k in totals: - try: - totals[k] += int(suite.get(k, 0)) - except ValueError: - pass - - for k, v in totals.items(): - root.set(k, str(v)) - - xmlpath = os.path.join(os_helper.SAVEDCWD, self.ns.xmlpath) - with open(xmlpath, 'wb') as f: - for s in ET.tostringlist(root): - f.write(s) - - def fix_umask(self): - if support.is_emscripten: - # Emscripten has default umask 0o777, which breaks some tests. - # see https://github.com/emscripten-core/emscripten/issues/17269 - old_mask = os.umask(0) - if old_mask == 0o777: - os.umask(0o027) - else: - os.umask(old_mask) - - def set_temp_dir(self): - if self.ns.tempdir: - self.tmp_dir = self.ns.tempdir - - if not self.tmp_dir: - # When tests are run from the Python build directory, it is best practice - # to keep the test files in a subfolder. This eases the cleanup of leftover - # files using the "make distclean" command. - if sysconfig.is_python_build(): - self.tmp_dir = sysconfig.get_config_var('abs_builddir') - if self.tmp_dir is None: - self.tmp_dir = sysconfig.get_config_var('abs_srcdir') - if not self.tmp_dir: - # gh-74470: On Windows, only srcdir is available. Using - # abs_builddir mostly matters on UNIX when building - # Python out of the source tree, especially when the - # source tree is read only. - self.tmp_dir = sysconfig.get_config_var('srcdir') - self.tmp_dir = os.path.join(self.tmp_dir, 'build') - else: - self.tmp_dir = tempfile.gettempdir() + print("Using random seed", self.random_seed) - self.tmp_dir = os.path.abspath(self.tmp_dir) + runtests = self.create_run_tests(selected) + self.first_runtests = runtests + self.logger.set_tests(runtests) - def create_temp_dir(self): - os.makedirs(self.tmp_dir, exist_ok=True) + setup_process() - # Define a writable temp dir that will be used as cwd while running - # the tests. The name of the dir includes the pid to allow parallel - # testing (see the -j option). - # Emscripten and WASI have stubbed getpid(), Emscripten has only - # milisecond clock resolution. Use randint() instead. - if sys.platform in {"emscripten", "wasi"}: - nounce = random.randint(0, 1_000_000) - else: - nounce = os.getpid() - if self.worker_test_name is not None: - test_cwd = 'test_python_worker_{}'.format(nounce) + if self.hunt_refleak and not self.num_workers: + # gh-109739: WindowsLoadTracker thread interfers with refleak check + use_load_tracker = False else: - test_cwd = 'test_python_{}'.format(nounce) - test_cwd += os_helper.FS_NONASCII - test_cwd = os.path.join(self.tmp_dir, test_cwd) - return test_cwd - - def cleanup(self): - import glob - - path = os.path.join(glob.escape(self.tmp_dir), 'test_python_*') - print("Cleanup %s directory" % self.tmp_dir) - for name in glob.glob(path): - if os.path.isdir(name): - print("Remove directory: %s" % name) - os_helper.rmtree(name) + # WindowsLoadTracker is only needed on Windows + use_load_tracker = MS_WINDOWS + + if use_load_tracker: + self.logger.start_load_tracker() + try: + if self.num_workers: + self._run_tests_mp(runtests, self.num_workers) + tracer = None else: - print("Remove file: %s" % name) - os_helper.unlink(name) + tracer = self.run_tests_sequentially(runtests) - def main(self, tests=None, **kwargs): - self.parse_args(kwargs) + self.display_result(runtests) - self.set_temp_dir() + if self.want_rerun and self.results.need_rerun(): + self.rerun_failed_tests(runtests) + finally: + if use_load_tracker: + self.logger.stop_load_tracker() - self.fix_umask() + self.display_summary() + self.finalize_tests(tracer) - if self.ns.cleanup: - self.cleanup() - sys.exit(0) + return self.results.get_exitcode(self.fail_env_changed, + self.fail_rerun) - test_cwd = self.create_temp_dir() + def run_tests(self, selected: TestTuple, tests: TestList | None) -> int: + os.makedirs(self.tmp_dir, exist_ok=True) + work_dir = get_work_dir(self.tmp_dir) - try: - # Run the tests in a context manager that temporarily changes the CWD - # to a temporary and writable directory. If it's not possible to - # create or change the CWD, the original CWD will be used. + # Put a timeout on Python exit + with exit_timeout(): + # Run the tests in a context manager that temporarily changes the + # CWD to a temporary and writable directory. If it's not possible + # to create or change the CWD, the original CWD will be used. # The original CWD is available from os_helper.SAVEDCWD. - with os_helper.temp_cwd(test_cwd, quiet=True): - # When using multiprocessing, worker processes will use test_cwd - # as their parent temporary directory. So when the main process - # exit, it removes also subdirectories of worker processes. - self.ns.tempdir = test_cwd - - self._main(tests, kwargs) - except SystemExit as exc: - # bpo-38203: Python can hang at exit in Py_Finalize(), especially - # on threading._shutdown() call: put a timeout - if threading_helper.can_start_thread: - faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True) + with os_helper.temp_cwd(work_dir, quiet=True): + # When using multiprocessing, worker processes will use + # work_dir as their parent temporary directory. So when the + # main process exit, it removes also subdirectories of worker + # processes. + return self._run_tests(selected, tests) + + def _add_cross_compile_opts(self, regrtest_opts): + # WASM/WASI buildbot builders pass multiple PYTHON environment + # variables such as PYTHONPATH and _PYTHON_HOSTRUNNER. + keep_environ = bool(self.python_cmd) + environ = None + + # Are we using cross-compilation? + cross_compile = is_cross_compiled() + + # Get HOSTRUNNER + hostrunner = get_host_runner() + + if cross_compile: + # emulate -E, but keep PYTHONPATH + cross compile env vars, + # so test executable can load correct sysconfigdata file. + keep = { + '_PYTHON_PROJECT_BASE', + '_PYTHON_HOST_PLATFORM', + '_PYTHON_SYSCONFIGDATA_NAME', + 'PYTHONPATH' + } + old_environ = os.environ + new_environ = { + name: value for name, value in os.environ.items() + if not name.startswith(('PYTHON', '_PYTHON')) or name in keep + } + # Only set environ if at least one variable was removed + if new_environ != old_environ: + environ = new_environ + keep_environ = True + + if cross_compile and hostrunner: + if self.num_workers == 0: + # For now use only two cores for cross-compiled builds; + # hostrunner can be expensive. + regrtest_opts.extend(['-j', '2']) + + # If HOSTRUNNER is set and -p/--python option is not given, then + # use hostrunner to execute python binary for tests. + if not self.python_cmd: + buildpython = sysconfig.get_config_var("BUILDPYTHON") + python_cmd = f"{hostrunner} {buildpython}" + regrtest_opts.extend(["--python", python_cmd]) + keep_environ = True + + return (environ, keep_environ) + + def _add_ci_python_opts(self, python_opts, keep_environ): + # --fast-ci and --slow-ci add options to Python: + # "-u -W default -bb -E" + + # Unbuffered stdout and stderr + if not sys.stdout.write_through: + python_opts.append('-u') + + # Add warnings filter 'default' + if 'default' not in sys.warnoptions: + python_opts.extend(('-W', 'default')) + + # Error on bytes/str comparison + if sys.flags.bytes_warning < 2: + python_opts.append('-bb') + + if not keep_environ: + # Ignore PYTHON* environment variables + if not sys.flags.ignore_environment: + python_opts.append('-E') + + def _execute_python(self, cmd, environ): + # Make sure that messages before execv() are logged + sys.stdout.flush() + sys.stderr.flush() + + cmd_text = shlex.join(cmd) + try: + print(f"+ {cmd_text}", flush=True) - sys.exit(exc.code) + if hasattr(os, 'execv') and not MS_WINDOWS: + os.execv(cmd[0], cmd) + # On success, execv() do no return. + # On error, it raises an OSError. + else: + import subprocess + with subprocess.Popen(cmd, env=environ) as proc: + try: + proc.wait() + except KeyboardInterrupt: + # There is no need to call proc.terminate(): on CTRL+C, + # SIGTERM is also sent to the child process. + try: + proc.wait(timeout=EXIT_TIMEOUT) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + sys.exit(EXITCODE_INTERRUPTED) + + sys.exit(proc.returncode) + except Exception as exc: + print_warning(f"Failed to change Python options: {exc!r}\n" + f"Command: {cmd_text}") + # continue executing main() + + def _add_python_opts(self): + python_opts = [] + regrtest_opts = [] + + environ, keep_environ = self._add_cross_compile_opts(regrtest_opts) + if self.ci_mode: + self._add_ci_python_opts(python_opts, keep_environ) + + if (not python_opts) and (not regrtest_opts) and (environ is None): + # Nothing changed: nothing to do + return - def getloadavg(self): - if self.win_load_tracker is not None: - return self.win_load_tracker.getloadavg() + # Create new command line + cmd = list(sys.orig_argv) + if python_opts: + cmd[1:1] = python_opts + if regrtest_opts: + cmd.extend(regrtest_opts) + cmd.append("--dont-add-python-opts") - if hasattr(os, 'getloadavg'): - return os.getloadavg()[0] + self._execute_python(cmd, environ) - return None + def _init(self): + # Set sys.stdout encoder error handler to backslashreplace, + # similar to sys.stderr error handler, to avoid UnicodeEncodeError + # when printing a traceback or any other non-encodable character. + sys.stdout.reconfigure(errors="backslashreplace") - def _main(self, tests, kwargs): - if self.worker_test_name is not None: - from test.libregrtest.runtest_mp import run_tests_worker - run_tests_worker(self.ns, self.worker_test_name) + if self.junit_filename and not os.path.isabs(self.junit_filename): + self.junit_filename = os.path.abspath(self.junit_filename) - if self.ns.wait: - input("Press any key to continue...") + strip_py_suffix(self.cmdline_args) - support.PGO = self.ns.pgo - support.PGO_EXTENDED = self.ns.pgo_extended + self.tmp_dir = get_temp_dir(self.tmp_dir) - setup_tests(self.ns) + def main(self, tests: TestList | None = None): + if self.want_add_python_opts: + self._add_python_opts() - self.find_tests(tests) + self._init() - if self.ns.list_tests: - self.list_tests() + if self.want_cleanup: + cleanup_temp_dir(self.tmp_dir) sys.exit(0) - if self.ns.list_cases: - self.list_cases() - sys.exit(0) - - self.run_tests() - self.display_result() - - if self.ns.verbose2 and self.bad: - self.rerun_failed_tests() - - self.finalize() + if self.want_wait: + input("Press any key to continue...") - self.save_xml_result() + setup_test_dir(self.test_dir) + selected, tests = self.find_tests(tests) + + exitcode = 0 + if self.want_list_tests: + self.list_tests(selected) + elif self.want_list_cases: + list_cases(selected, + match_tests=self.match_tests, + ignore_tests=self.ignore_tests, + test_dir=self.test_dir) + else: + exitcode = self.run_tests(selected, tests) - if self.bad: - sys.exit(EXITCODE_BAD_TEST) - if self.interrupted: - sys.exit(EXITCODE_INTERRUPTED) - if self.ns.fail_env_changed and self.environment_changed: - sys.exit(EXITCODE_ENV_CHANGED) - if self.no_tests_run(): - sys.exit(EXITCODE_NO_TESTS_RAN) - sys.exit(0) + sys.exit(exitcode) -def main(tests=None, **kwargs): +def main(tests=None, _add_python_opts=False, **kwargs): """Run the Python suite.""" - Regrtest().main(tests=tests, **kwargs) + ns = _parse_args(sys.argv[1:], **kwargs) + Regrtest(ns, _add_python_opts=_add_python_opts).main(tests=tests) diff --git a/Lib/test/libregrtest/pgo.py b/Lib/test/libregrtest/pgo.py index 42ce5fb..e3a6927 100644 --- a/Lib/test/libregrtest/pgo.py +++ b/Lib/test/libregrtest/pgo.py @@ -42,15 +42,15 @@ PGO_TESTS = [ 'test_set', 'test_sqlite3', 'test_statistics', + 'test_str', 'test_struct', 'test_tabnanny', 'test_time', - 'test_unicode', 'test_xml_etree', 'test_xml_etree_c', ] -def setup_pgo_tests(ns): - if not ns.args and not ns.pgo_extended: +def setup_pgo_tests(cmdline_args, pgo_extended: bool): + if not cmdline_args and not pgo_extended: # run default set of tests for PGO training - ns.args = PGO_TESTS[:] + cmdline_args[:] = PGO_TESTS[:] diff --git a/Lib/test/libregrtest/refleak.py b/Lib/test/libregrtest/refleak.py index 58a1419..59f48bd 100644 --- a/Lib/test/libregrtest/refleak.py +++ b/Lib/test/libregrtest/refleak.py @@ -1,10 +1,13 @@ -import os import sys import warnings from inspect import isabstract +from typing import Any + from test import support from test.support import os_helper -from test.libregrtest.utils import clear_caches + +from .runtests import HuntRefleak +from .utils import clear_caches try: from _abc import _get_dump @@ -19,7 +22,9 @@ except ImportError: cls._abc_negative_cache, cls._abc_negative_cache_version) -def dash_R(ns, test_name, test_func): +def runtest_refleak(test_name, test_func, + hunt_refleak: HuntRefleak, + quiet: bool): """Run a test multiple times, looking for reference leaks. Returns: @@ -41,6 +46,7 @@ def dash_R(ns, test_name, test_func): fs = warnings.filters[:] ps = copyreg.dispatch_table.copy() pic = sys.path_importer_cache.copy() + zdc: dict[str, Any] | None try: import zipimport except ImportError: @@ -62,9 +68,10 @@ def dash_R(ns, test_name, test_func): def get_pooled_int(value): return int_pool.setdefault(value, value) - nwarmup, ntracked, fname = ns.huntrleaks - fname = os.path.join(os_helper.SAVEDCWD, fname) - repcount = nwarmup + ntracked + warmups = hunt_refleak.warmups + runs = hunt_refleak.runs + filename = hunt_refleak.filename + repcount = warmups + runs # Pre-allocate to ensure that the loop doesn't allocate anything new rep_range = list(range(repcount)) @@ -73,12 +80,11 @@ def dash_R(ns, test_name, test_func): fd_deltas = [0] * repcount getallocatedblocks = sys.getallocatedblocks gettotalrefcount = sys.gettotalrefcount - _getquickenedcount = sys._getquickenedcount fd_count = os_helper.fd_count # initialize variables to make pyflakes quiet rc_before = alloc_before = fd_before = 0 - if not ns.quiet: + if not quiet: print("beginning", repcount, "repetitions", file=sys.stderr) print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr, flush=True) @@ -93,12 +99,12 @@ def dash_R(ns, test_name, test_func): dash_R_cleanup(fs, ps, pic, zdc, abcs) support.gc_collect() - # Read memory statistics immediately after the garbage collection - alloc_after = getallocatedblocks() - _getquickenedcount() + # Read memory statistics immediately after the garbage collection. + alloc_after = getallocatedblocks() rc_after = gettotalrefcount() fd_after = fd_count() - if not ns.quiet: + if not quiet: print('.', end='', file=sys.stderr, flush=True) rc_deltas[i] = get_pooled_int(rc_after - rc_before) @@ -109,7 +115,7 @@ def dash_R(ns, test_name, test_func): rc_before = rc_after fd_before = fd_after - if not ns.quiet: + if not quiet: print(file=sys.stderr) # These checkers return False on success, True on failure @@ -138,12 +144,12 @@ def dash_R(ns, test_name, test_func): (fd_deltas, 'file descriptors', check_fd_deltas) ]: # ignore warmup runs - deltas = deltas[nwarmup:] + deltas = deltas[warmups:] if checker(deltas): msg = '%s leaked %s %s, sum=%s' % ( test_name, deltas, item_name, sum(deltas)) print(msg, file=sys.stderr, flush=True) - with open(fname, "a", encoding="utf-8") as refrep: + with open(filename, "a", encoding="utf-8") as refrep: print(msg, file=refrep) refrep.flush() failed = True @@ -169,6 +175,7 @@ def dash_R_cleanup(fs, ps, pic, zdc, abcs): zipimport._zip_directory_cache.update(zdc) # Clear ABC registries, restoring previously saved ABC registries. + # ignore deprecation warning for collections.abc.ByteString abs_classes = [getattr(collections.abc, a) for a in collections.abc.__all__] abs_classes = filter(isabstract, abs_classes) for abc in abs_classes: diff --git a/Lib/test/libregrtest/result.py b/Lib/test/libregrtest/result.py new file mode 100644 index 0000000..d6b0d5a --- /dev/null +++ b/Lib/test/libregrtest/result.py @@ -0,0 +1,190 @@ +import dataclasses +import json +from typing import Any + +from test.support import TestStats + +from .utils import ( + StrJSON, TestName, FilterTuple, + format_duration, normalize_test_name, print_warning) + + +# Avoid enum.Enum to reduce the number of imports when tests are run +class State: + PASSED = "PASSED" + FAILED = "FAILED" + SKIPPED = "SKIPPED" + UNCAUGHT_EXC = "UNCAUGHT_EXC" + REFLEAK = "REFLEAK" + ENV_CHANGED = "ENV_CHANGED" + RESOURCE_DENIED = "RESOURCE_DENIED" + INTERRUPTED = "INTERRUPTED" + WORKER_FAILED = "WORKER_FAILED" # non-zero worker process exit code + WORKER_BUG = "WORKER_BUG" # exception when running a worker + DID_NOT_RUN = "DID_NOT_RUN" + TIMEOUT = "TIMEOUT" + + @staticmethod + def is_failed(state): + return state in { + State.FAILED, + State.UNCAUGHT_EXC, + State.REFLEAK, + State.WORKER_FAILED, + State.WORKER_BUG, + State.TIMEOUT} + + @staticmethod + def has_meaningful_duration(state): + # Consider that the duration is meaningless for these cases. + # For example, if a whole test file is skipped, its duration + # is unlikely to be the duration of executing its tests, + # but just the duration to execute code which skips the test. + return state not in { + State.SKIPPED, + State.RESOURCE_DENIED, + State.INTERRUPTED, + State.WORKER_FAILED, + State.WORKER_BUG, + State.DID_NOT_RUN} + + @staticmethod + def must_stop(state): + return state in { + State.INTERRUPTED, + State.WORKER_BUG, + } + + +@dataclasses.dataclass(slots=True) +class TestResult: + test_name: TestName + state: str | None = None + # Test duration in seconds + duration: float | None = None + xml_data: list[str] | None = None + stats: TestStats | None = None + + # errors and failures copied from support.TestFailedWithDetails + errors: list[tuple[str, str]] | None = None + failures: list[tuple[str, str]] | None = None + + def is_failed(self, fail_env_changed: bool) -> bool: + if self.state == State.ENV_CHANGED: + return fail_env_changed + return State.is_failed(self.state) + + def _format_failed(self): + if self.errors and self.failures: + le = len(self.errors) + lf = len(self.failures) + error_s = "error" + ("s" if le > 1 else "") + failure_s = "failure" + ("s" if lf > 1 else "") + return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})" + + if self.errors: + le = len(self.errors) + error_s = "error" + ("s" if le > 1 else "") + return f"{self.test_name} failed ({le} {error_s})" + + if self.failures: + lf = len(self.failures) + failure_s = "failure" + ("s" if lf > 1 else "") + return f"{self.test_name} failed ({lf} {failure_s})" + + return f"{self.test_name} failed" + + def __str__(self) -> str: + match self.state: + case State.PASSED: + return f"{self.test_name} passed" + case State.FAILED: + return self._format_failed() + case State.SKIPPED: + return f"{self.test_name} skipped" + case State.UNCAUGHT_EXC: + return f"{self.test_name} failed (uncaught exception)" + case State.REFLEAK: + return f"{self.test_name} failed (reference leak)" + case State.ENV_CHANGED: + return f"{self.test_name} failed (env changed)" + case State.RESOURCE_DENIED: + return f"{self.test_name} skipped (resource denied)" + case State.INTERRUPTED: + return f"{self.test_name} interrupted" + case State.WORKER_FAILED: + return f"{self.test_name} worker non-zero exit code" + case State.WORKER_BUG: + return f"{self.test_name} worker bug" + case State.DID_NOT_RUN: + return f"{self.test_name} ran no tests" + case State.TIMEOUT: + return f"{self.test_name} timed out ({format_duration(self.duration)})" + case _: + raise ValueError("unknown result state: {state!r}") + + def has_meaningful_duration(self): + return State.has_meaningful_duration(self.state) + + def set_env_changed(self): + if self.state is None or self.state == State.PASSED: + self.state = State.ENV_CHANGED + + def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool: + if State.must_stop(self.state): + return True + if fail_fast and self.is_failed(fail_env_changed): + return True + return False + + def get_rerun_match_tests(self) -> FilterTuple | None: + match_tests = [] + + errors = self.errors or [] + failures = self.failures or [] + for error_list, is_error in ( + (errors, True), + (failures, False), + ): + for full_name, *_ in error_list: + match_name = normalize_test_name(full_name, is_error=is_error) + if match_name is None: + # 'setUpModule (test.test_sys)': don't filter tests + return None + if not match_name: + error_type = "ERROR" if is_error else "FAIL" + print_warning(f"rerun failed to parse {error_type} test name: " + f"{full_name!r}: don't filter tests") + return None + match_tests.append(match_name) + + if not match_tests: + return None + return tuple(match_tests) + + def write_json_into(self, file) -> None: + json.dump(self, file, cls=_EncodeTestResult) + + @staticmethod + def from_json(worker_json: StrJSON) -> 'TestResult': + return json.loads(worker_json, object_hook=_decode_test_result) + + +class _EncodeTestResult(json.JSONEncoder): + def default(self, o: Any) -> dict[str, Any]: + if isinstance(o, TestResult): + result = dataclasses.asdict(o) + result["__test_result__"] = o.__class__.__name__ + return result + else: + return super().default(o) + + +def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]: + if "__test_result__" in data: + data.pop('__test_result__') + if data['stats'] is not None: + data['stats'] = TestStats(**data['stats']) + return TestResult(**data) + else: + return data diff --git a/Lib/test/libregrtest/results.py b/Lib/test/libregrtest/results.py new file mode 100644 index 0000000..3708078 --- /dev/null +++ b/Lib/test/libregrtest/results.py @@ -0,0 +1,261 @@ +import sys +from test.support import TestStats + +from .runtests import RunTests +from .result import State, TestResult +from .utils import ( + StrPath, TestName, TestTuple, TestList, FilterDict, + printlist, count, format_duration) + + +# Python uses exit code 1 when an exception is not catched +# argparse.ArgumentParser.error() uses exit code 2 +EXITCODE_BAD_TEST = 2 +EXITCODE_ENV_CHANGED = 3 +EXITCODE_NO_TESTS_RAN = 4 +EXITCODE_RERUN_FAIL = 5 +EXITCODE_INTERRUPTED = 130 # 128 + signal.SIGINT=2 + + +class TestResults: + def __init__(self): + self.bad: TestList = [] + self.good: TestList = [] + self.rerun_bad: TestList = [] + self.skipped: TestList = [] + self.resource_denied: TestList = [] + self.env_changed: TestList = [] + self.run_no_tests: TestList = [] + self.rerun: TestList = [] + self.rerun_results: list[TestResult] = [] + + self.interrupted: bool = False + self.worker_bug: bool = False + self.test_times: list[tuple[float, TestName]] = [] + self.stats = TestStats() + # used by --junit-xml + self.testsuite_xml: list[str] = [] + + def is_all_good(self): + return (not self.bad + and not self.skipped + and not self.interrupted + and not self.worker_bug) + + def get_executed(self): + return (set(self.good) | set(self.bad) | set(self.skipped) + | set(self.resource_denied) | set(self.env_changed) + | set(self.run_no_tests)) + + def no_tests_run(self): + return not any((self.good, self.bad, self.skipped, self.interrupted, + self.env_changed)) + + def get_state(self, fail_env_changed): + state = [] + if self.bad: + state.append("FAILURE") + elif fail_env_changed and self.env_changed: + state.append("ENV CHANGED") + elif self.no_tests_run(): + state.append("NO TESTS RAN") + + if self.interrupted: + state.append("INTERRUPTED") + if self.worker_bug: + state.append("WORKER BUG") + if not state: + state.append("SUCCESS") + + return ', '.join(state) + + def get_exitcode(self, fail_env_changed, fail_rerun): + exitcode = 0 + if self.bad: + exitcode = EXITCODE_BAD_TEST + elif self.interrupted: + exitcode = EXITCODE_INTERRUPTED + elif fail_env_changed and self.env_changed: + exitcode = EXITCODE_ENV_CHANGED + elif self.no_tests_run(): + exitcode = EXITCODE_NO_TESTS_RAN + elif fail_rerun and self.rerun: + exitcode = EXITCODE_RERUN_FAIL + elif self.worker_bug: + exitcode = EXITCODE_BAD_TEST + return exitcode + + def accumulate_result(self, result: TestResult, runtests: RunTests): + test_name = result.test_name + rerun = runtests.rerun + fail_env_changed = runtests.fail_env_changed + + match result.state: + case State.PASSED: + self.good.append(test_name) + case State.ENV_CHANGED: + self.env_changed.append(test_name) + self.rerun_results.append(result) + case State.SKIPPED: + self.skipped.append(test_name) + case State.RESOURCE_DENIED: + self.resource_denied.append(test_name) + case State.INTERRUPTED: + self.interrupted = True + case State.DID_NOT_RUN: + self.run_no_tests.append(test_name) + case _: + if result.is_failed(fail_env_changed): + self.bad.append(test_name) + self.rerun_results.append(result) + else: + raise ValueError(f"invalid test state: {result.state!r}") + + if result.state == State.WORKER_BUG: + self.worker_bug = True + + if result.has_meaningful_duration() and not rerun: + self.test_times.append((result.duration, test_name)) + if result.stats is not None: + self.stats.accumulate(result.stats) + if rerun: + self.rerun.append(test_name) + + xml_data = result.xml_data + if xml_data: + self.add_junit(xml_data) + + def need_rerun(self): + return bool(self.rerun_results) + + def prepare_rerun(self) -> tuple[TestTuple, FilterDict]: + tests: TestList = [] + match_tests_dict = {} + for result in self.rerun_results: + tests.append(result.test_name) + + match_tests = result.get_rerun_match_tests() + # ignore empty match list + if match_tests: + match_tests_dict[result.test_name] = match_tests + + # Clear previously failed tests + self.rerun_bad.extend(self.bad) + self.bad.clear() + self.env_changed.clear() + self.rerun_results.clear() + + return (tuple(tests), match_tests_dict) + + def add_junit(self, xml_data: list[str]): + import xml.etree.ElementTree as ET + for e in xml_data: + try: + self.testsuite_xml.append(ET.fromstring(e)) + except ET.ParseError: + print(xml_data, file=sys.__stderr__) + raise + + def write_junit(self, filename: StrPath): + if not self.testsuite_xml: + # Don't create empty XML file + return + + import xml.etree.ElementTree as ET + root = ET.Element("testsuites") + + # Manually count the totals for the overall summary + totals = {'tests': 0, 'errors': 0, 'failures': 0} + for suite in self.testsuite_xml: + root.append(suite) + for k in totals: + try: + totals[k] += int(suite.get(k, 0)) + except ValueError: + pass + + for k, v in totals.items(): + root.set(k, str(v)) + + with open(filename, 'wb') as f: + for s in ET.tostringlist(root): + f.write(s) + + def display_result(self, tests: TestTuple, quiet: bool, print_slowest: bool): + if print_slowest: + self.test_times.sort(reverse=True) + print() + print("10 slowest tests:") + for test_time, test in self.test_times[:10]: + print("- %s: %s" % (test, format_duration(test_time))) + + all_tests = [] + omitted = set(tests) - self.get_executed() + + # less important + all_tests.append((omitted, "test", "{} omitted:")) + if not quiet: + all_tests.append((self.skipped, "test", "{} skipped:")) + all_tests.append((self.resource_denied, "test", "{} skipped (resource denied):")) + all_tests.append((self.run_no_tests, "test", "{} run no tests:")) + + # more important + all_tests.append((self.env_changed, "test", "{} altered the execution environment (env changed):")) + all_tests.append((self.rerun, "re-run test", "{}:")) + all_tests.append((self.bad, "test", "{} failed:")) + + for tests_list, count_text, title_format in all_tests: + if tests_list: + print() + count_text = count(len(tests_list), count_text) + print(title_format.format(count_text)) + printlist(tests_list) + + if self.good and not quiet: + print() + text = count(len(self.good), "test") + text = f"{text} OK." + if (self.is_all_good() and len(self.good) > 1): + text = f"All {text}" + print(text) + + if self.interrupted: + print() + print("Test suite interrupted by signal SIGINT.") + + def display_summary(self, first_runtests: RunTests, filtered: bool): + # Total tests + stats = self.stats + text = f'run={stats.tests_run:,}' + if filtered: + text = f"{text} (filtered)" + report = [text] + if stats.failures: + report.append(f'failures={stats.failures:,}') + if stats.skipped: + report.append(f'skipped={stats.skipped:,}') + print(f"Total tests: {' '.join(report)}") + + # Total test files + all_tests = [self.good, self.bad, self.rerun, + self.skipped, + self.env_changed, self.run_no_tests] + run = sum(map(len, all_tests)) + text = f'run={run}' + if not first_runtests.forever: + ntest = len(first_runtests.tests) + text = f"{text}/{ntest}" + if filtered: + text = f"{text} (filtered)" + report = [text] + for name, tests in ( + ('failed', self.bad), + ('env_changed', self.env_changed), + ('skipped', self.skipped), + ('resource_denied', self.resource_denied), + ('rerun', self.rerun), + ('run_no_tests', self.run_no_tests), + ): + if tests: + report.append(f'{name}={len(tests)}') + print(f"Total test files: {' '.join(report)}") diff --git a/Lib/test/libregrtest/run_workers.py b/Lib/test/libregrtest/run_workers.py new file mode 100644 index 0000000..16f8331 --- /dev/null +++ b/Lib/test/libregrtest/run_workers.py @@ -0,0 +1,607 @@ +import contextlib +import dataclasses +import faulthandler +import os.path +import queue +import signal +import subprocess +import sys +import tempfile +import threading +import time +import traceback +from typing import Literal, TextIO + +from test import support +from test.support import os_helper, MS_WINDOWS + +from .logger import Logger +from .result import TestResult, State +from .results import TestResults +from .runtests import RunTests, JsonFile, JsonFileType +from .single import PROGRESS_MIN_TIME +from .utils import ( + StrPath, TestName, + format_duration, print_warning, count, plural, get_signal_name) +from .worker import create_worker_process, USE_PROCESS_GROUP + +if MS_WINDOWS: + import locale + import msvcrt + + + +# Display the running tests if nothing happened last N seconds +PROGRESS_UPDATE = 30.0 # seconds +assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME + +# Kill the main process after 5 minutes. It is supposed to write an update +# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest +# buildbot workers. +MAIN_PROCESS_TIMEOUT = 5 * 60.0 +assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE + +# Time to wait until a worker completes: should be immediate +WAIT_COMPLETED_TIMEOUT = 30.0 # seconds + +# Time to wait a killed process (in seconds) +WAIT_KILLED_TIMEOUT = 60.0 + + +# We do not use a generator so multiple threads can call next(). +class MultiprocessIterator: + + """A thread-safe iterator over tests for multiprocess mode.""" + + def __init__(self, tests_iter): + self.lock = threading.Lock() + self.tests_iter = tests_iter + + def __iter__(self): + return self + + def __next__(self): + with self.lock: + if self.tests_iter is None: + raise StopIteration + return next(self.tests_iter) + + def stop(self): + with self.lock: + self.tests_iter = None + + +@dataclasses.dataclass(slots=True, frozen=True) +class MultiprocessResult: + result: TestResult + # bpo-45410: stderr is written into stdout to keep messages order + worker_stdout: str | None = None + err_msg: str | None = None + + +ExcStr = str +QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr] + + +class ExitThread(Exception): + pass + + +class WorkerError(Exception): + def __init__(self, + test_name: TestName, + err_msg: str | None, + stdout: str | None, + state: str): + result = TestResult(test_name, state=state) + self.mp_result = MultiprocessResult(result, stdout, err_msg) + super().__init__() + + +class WorkerThread(threading.Thread): + def __init__(self, worker_id: int, runner: "RunWorkers") -> None: + super().__init__() + self.worker_id = worker_id + self.runtests = runner.runtests + self.pending = runner.pending + self.output = runner.output + self.timeout = runner.worker_timeout + self.log = runner.log + self.test_name: TestName | None = None + self.start_time: float | None = None + self._popen: subprocess.Popen[str] | None = None + self._killed = False + self._stopped = False + + def __repr__(self) -> str: + info = [f'WorkerThread #{self.worker_id}'] + if self.is_alive(): + info.append("running") + else: + info.append('stopped') + test = self.test_name + if test: + info.append(f'test={test}') + popen = self._popen + if popen is not None: + dt = time.monotonic() - self.start_time + info.extend((f'pid={self._popen.pid}', + f'time={format_duration(dt)}')) + return '<%s>' % ' '.join(info) + + def _kill(self) -> None: + popen = self._popen + if popen is None: + return + + if self._killed: + return + self._killed = True + + if USE_PROCESS_GROUP: + what = f"{self} process group" + else: + what = f"{self} process" + + print(f"Kill {what}", file=sys.stderr, flush=True) + try: + if USE_PROCESS_GROUP: + os.killpg(popen.pid, signal.SIGKILL) + else: + popen.kill() + except ProcessLookupError: + # popen.kill(): the process completed, the WorkerThread thread + # read its exit status, but Popen.send_signal() read the returncode + # just before Popen.wait() set returncode. + pass + except OSError as exc: + print_warning(f"Failed to kill {what}: {exc!r}") + + def stop(self) -> None: + # Method called from a different thread to stop this thread + self._stopped = True + self._kill() + + def _run_process(self, runtests: RunTests, output_fd: int, + tmp_dir: StrPath | None = None) -> int | None: + popen = create_worker_process(runtests, output_fd, tmp_dir) + self._popen = popen + self._killed = False + + try: + if self._stopped: + # If kill() has been called before self._popen is set, + # self._popen is still running. Call again kill() + # to ensure that the process is killed. + self._kill() + raise ExitThread + + try: + # gh-94026: stdout+stderr are written to tempfile + retcode = popen.wait(timeout=self.timeout) + assert retcode is not None + return retcode + except subprocess.TimeoutExpired: + if self._stopped: + # kill() has been called: communicate() fails on reading + # closed stdout + raise ExitThread + + # On timeout, kill the process + self._kill() + + # None means TIMEOUT for the caller + retcode = None + # bpo-38207: Don't attempt to call communicate() again: on it + # can hang until all child processes using stdout + # pipes completes. + except OSError: + if self._stopped: + # kill() has been called: communicate() fails + # on reading closed stdout + raise ExitThread + raise + except: + self._kill() + raise + finally: + self._wait_completed() + self._popen = None + + def create_stdout(self, stack: contextlib.ExitStack) -> TextIO: + """Create stdout temporay file (file descriptor).""" + + if MS_WINDOWS: + # gh-95027: When stdout is not a TTY, Python uses the ANSI code + # page for the sys.stdout encoding. If the main process runs in a + # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding. + encoding = locale.getencoding() + else: + encoding = sys.stdout.encoding + + # gh-94026: Write stdout+stderr to a tempfile as workaround for + # non-blocking pipes on Emscripten with NodeJS. + # gh-109425: Use "backslashreplace" error handler: log corrupted + # stdout+stderr, instead of failing with a UnicodeDecodeError and not + # logging stdout+stderr at all. + stdout_file = tempfile.TemporaryFile('w+', + encoding=encoding, + errors='backslashreplace') + stack.enter_context(stdout_file) + return stdout_file + + def create_json_file(self, stack: contextlib.ExitStack) -> tuple[JsonFile, TextIO | None]: + """Create JSON file.""" + + json_file_use_stdout = self.runtests.json_file_use_stdout() + if json_file_use_stdout: + json_file = JsonFile(None, JsonFileType.STDOUT) + json_tmpfile = None + else: + json_tmpfile = tempfile.TemporaryFile('w+', encoding='utf8') + stack.enter_context(json_tmpfile) + + json_fd = json_tmpfile.fileno() + if MS_WINDOWS: + json_handle = msvcrt.get_osfhandle(json_fd) + json_file = JsonFile(json_handle, + JsonFileType.WINDOWS_HANDLE) + else: + json_file = JsonFile(json_fd, JsonFileType.UNIX_FD) + return (json_file, json_tmpfile) + + def create_worker_runtests(self, test_name: TestName, json_file: JsonFile) -> RunTests: + """Create the worker RunTests.""" + + tests = (test_name,) + if self.runtests.rerun: + match_tests = self.runtests.get_match_tests(test_name) + else: + match_tests = None + + kwargs = {} + if match_tests: + kwargs['match_tests'] = match_tests + if self.runtests.output_on_failure: + kwargs['verbose'] = True + kwargs['output_on_failure'] = False + return self.runtests.copy( + tests=tests, + json_file=json_file, + **kwargs) + + def run_tmp_files(self, worker_runtests: RunTests, + stdout_fd: int) -> tuple[int | None, list[StrPath]]: + # gh-93353: Check for leaked temporary files in the parent process, + # since the deletion of temporary files can happen late during + # Python finalization: too late for libregrtest. + if not support.is_wasi: + # Don't check for leaked temporary files and directories if Python is + # run on WASI. WASI don't pass environment variables like TMPDIR to + # worker processes. + tmp_dir = tempfile.mkdtemp(prefix="test_python_") + tmp_dir = os.path.abspath(tmp_dir) + try: + retcode = self._run_process(worker_runtests, + stdout_fd, tmp_dir) + finally: + tmp_files = os.listdir(tmp_dir) + os_helper.rmtree(tmp_dir) + else: + retcode = self._run_process(worker_runtests, stdout_fd) + tmp_files = [] + + return (retcode, tmp_files) + + def read_stdout(self, stdout_file: TextIO) -> str: + stdout_file.seek(0) + try: + return stdout_file.read().strip() + except Exception as exc: + # gh-101634: Catch UnicodeDecodeError if stdout cannot be + # decoded from encoding + raise WorkerError(self.test_name, + f"Cannot read process stdout: {exc}", + stdout=None, + state=State.WORKER_BUG) + + def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None, + stdout: str) -> tuple[TestResult, str]: + try: + if json_tmpfile is not None: + json_tmpfile.seek(0) + worker_json = json_tmpfile.read() + elif json_file.file_type == JsonFileType.STDOUT: + stdout, _, worker_json = stdout.rpartition("\n") + stdout = stdout.rstrip() + else: + with json_file.open(encoding='utf8') as json_fp: + worker_json = json_fp.read() + except Exception as exc: + # gh-101634: Catch UnicodeDecodeError if stdout cannot be + # decoded from encoding + err_msg = f"Failed to read worker process JSON: {exc}" + raise WorkerError(self.test_name, err_msg, stdout, + state=State.WORKER_BUG) + + if not worker_json: + raise WorkerError(self.test_name, "empty JSON", stdout, + state=State.WORKER_BUG) + + try: + result = TestResult.from_json(worker_json) + except Exception as exc: + # gh-101634: Catch UnicodeDecodeError if stdout cannot be + # decoded from encoding + err_msg = f"Failed to parse worker process JSON: {exc}" + raise WorkerError(self.test_name, err_msg, stdout, + state=State.WORKER_BUG) + + return (result, stdout) + + def _runtest(self, test_name: TestName) -> MultiprocessResult: + with contextlib.ExitStack() as stack: + stdout_file = self.create_stdout(stack) + json_file, json_tmpfile = self.create_json_file(stack) + worker_runtests = self.create_worker_runtests(test_name, json_file) + + retcode, tmp_files = self.run_tmp_files(worker_runtests, + stdout_file.fileno()) + + stdout = self.read_stdout(stdout_file) + + if retcode is None: + raise WorkerError(self.test_name, stdout=stdout, + err_msg=None, + state=State.TIMEOUT) + if retcode != 0: + name = get_signal_name(retcode) + if name: + retcode = f"{retcode} ({name})" + raise WorkerError(self.test_name, f"Exit code {retcode}", stdout, + state=State.WORKER_FAILED) + + result, stdout = self.read_json(json_file, json_tmpfile, stdout) + + if tmp_files: + msg = (f'\n\n' + f'Warning -- {test_name} leaked temporary files ' + f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}') + stdout += msg + result.set_env_changed() + + return MultiprocessResult(result, stdout) + + def run(self) -> None: + fail_fast = self.runtests.fail_fast + fail_env_changed = self.runtests.fail_env_changed + while not self._stopped: + try: + try: + test_name = next(self.pending) + except StopIteration: + break + + self.start_time = time.monotonic() + self.test_name = test_name + try: + mp_result = self._runtest(test_name) + except WorkerError as exc: + mp_result = exc.mp_result + finally: + self.test_name = None + mp_result.result.duration = time.monotonic() - self.start_time + self.output.put((False, mp_result)) + + if mp_result.result.must_stop(fail_fast, fail_env_changed): + break + except ExitThread: + break + except BaseException: + self.output.put((True, traceback.format_exc())) + break + + def _wait_completed(self) -> None: + popen = self._popen + + try: + popen.wait(WAIT_COMPLETED_TIMEOUT) + except (subprocess.TimeoutExpired, OSError) as exc: + print_warning(f"Failed to wait for {self} completion " + f"(timeout={format_duration(WAIT_COMPLETED_TIMEOUT)}): " + f"{exc!r}") + + def wait_stopped(self, start_time: float) -> None: + # bpo-38207: RunWorkers.stop_workers() called self.stop() + # which killed the process. Sometimes, killing the process from the + # main thread does not interrupt popen.communicate() in + # WorkerThread thread. This loop with a timeout is a workaround + # for that. + # + # Moreover, if this method fails to join the thread, it is likely + # that Python will hang at exit while calling threading._shutdown() + # which tries again to join the blocked thread. Regrtest.main() + # uses EXIT_TIMEOUT to workaround this second bug. + while True: + # Write a message every second + self.join(1.0) + if not self.is_alive(): + break + dt = time.monotonic() - start_time + self.log(f"Waiting for {self} thread for {format_duration(dt)}") + if dt > WAIT_KILLED_TIMEOUT: + print_warning(f"Failed to join {self} in {format_duration(dt)}") + break + + +def get_running(workers: list[WorkerThread]) -> str | None: + running: list[str] = [] + for worker in workers: + test_name = worker.test_name + if not test_name: + continue + dt = time.monotonic() - worker.start_time + if dt >= PROGRESS_MIN_TIME: + text = f'{test_name} ({format_duration(dt)})' + running.append(text) + if not running: + return None + return f"running ({len(running)}): {', '.join(running)}" + + +class RunWorkers: + def __init__(self, num_workers: int, runtests: RunTests, + logger: Logger, results: TestResults) -> None: + self.num_workers = num_workers + self.runtests = runtests + self.log = logger.log + self.display_progress = logger.display_progress + self.results: TestResults = results + + self.output: queue.Queue[QueueOutput] = queue.Queue() + tests_iter = runtests.iter_tests() + self.pending = MultiprocessIterator(tests_iter) + self.timeout = runtests.timeout + if self.timeout is not None: + # Rely on faulthandler to kill a worker process. This timouet is + # when faulthandler fails to kill a worker process. Give a maximum + # of 5 minutes to faulthandler to kill the worker. + self.worker_timeout: float | None = min(self.timeout * 1.5, self.timeout + 5 * 60) + else: + self.worker_timeout = None + self.workers: list[WorkerThread] | None = None + + jobs = self.runtests.get_jobs() + if jobs is not None: + # Don't spawn more threads than the number of jobs: + # these worker threads would never get anything to do. + self.num_workers = min(self.num_workers, jobs) + + def start_workers(self) -> None: + self.workers = [WorkerThread(index, self) + for index in range(1, self.num_workers + 1)] + jobs = self.runtests.get_jobs() + if jobs is not None: + tests = count(jobs, 'test') + else: + tests = 'tests' + nworkers = len(self.workers) + processes = plural(nworkers, "process", "processes") + msg = (f"Run {tests} in parallel using " + f"{nworkers} worker {processes}") + if self.timeout: + msg += (" (timeout: %s, worker timeout: %s)" + % (format_duration(self.timeout), + format_duration(self.worker_timeout))) + self.log(msg) + for worker in self.workers: + worker.start() + + def stop_workers(self) -> None: + start_time = time.monotonic() + for worker in self.workers: + worker.stop() + for worker in self.workers: + worker.wait_stopped(start_time) + + def _get_result(self) -> QueueOutput | None: + pgo = self.runtests.pgo + use_faulthandler = (self.timeout is not None) + + # bpo-46205: check the status of workers every iteration to avoid + # waiting forever on an empty queue. + while any(worker.is_alive() for worker in self.workers): + if use_faulthandler: + faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT, + exit=True) + + # wait for a thread + try: + return self.output.get(timeout=PROGRESS_UPDATE) + except queue.Empty: + pass + + if not pgo: + # display progress + running = get_running(self.workers) + if running: + self.log(running) + + # all worker threads are done: consume pending results + try: + return self.output.get(timeout=0) + except queue.Empty: + return None + + def display_result(self, mp_result: MultiprocessResult) -> None: + result = mp_result.result + pgo = self.runtests.pgo + + text = str(result) + if mp_result.err_msg: + # WORKER_BUG + text += ' (%s)' % mp_result.err_msg + elif (result.duration >= PROGRESS_MIN_TIME and not pgo): + text += ' (%s)' % format_duration(result.duration) + if not pgo: + running = get_running(self.workers) + if running: + text += f' -- {running}' + self.display_progress(self.test_index, text) + + def _process_result(self, item: QueueOutput) -> TestResult: + """Returns True if test runner must stop.""" + if item[0]: + # Thread got an exception + format_exc = item[1] + print_warning(f"regrtest worker thread failed: {format_exc}") + result = TestResult("", state=State.WORKER_BUG) + self.results.accumulate_result(result, self.runtests) + return result + + self.test_index += 1 + mp_result = item[1] + result = mp_result.result + self.results.accumulate_result(result, self.runtests) + self.display_result(mp_result) + + # Display worker stdout + if not self.runtests.output_on_failure: + show_stdout = True + else: + # --verbose3 ignores stdout on success + show_stdout = (result.state != State.PASSED) + if show_stdout: + stdout = mp_result.worker_stdout + if stdout: + print(stdout, flush=True) + + return result + + def run(self) -> None: + fail_fast = self.runtests.fail_fast + fail_env_changed = self.runtests.fail_env_changed + + self.start_workers() + + self.test_index = 0 + try: + while True: + item = self._get_result() + if item is None: + break + + result = self._process_result(item) + if result.must_stop(fail_fast, fail_env_changed): + break + except KeyboardInterrupt: + print() + self.results.interrupted = True + finally: + if self.timeout is not None: + faulthandler.cancel_dump_traceback_later() + + # Always ensure that all worker processes are no longer + # worker when we exit this function + self.pending.stop() + self.stop_workers() diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py deleted file mode 100644 index f37093b..0000000 --- a/Lib/test/libregrtest/runtest.py +++ /dev/null @@ -1,479 +0,0 @@ -import dataclasses -import doctest -import faulthandler -import functools -import gc -import importlib -import io -import os -import sys -import time -import traceback -import unittest - -from test import support -from test.support import TestStats -from test.support import os_helper -from test.support import threading_helper -from test.libregrtest.cmdline import Namespace -from test.libregrtest.save_env import saved_test_environment -from test.libregrtest.utils import clear_caches, format_duration, print_warning - - -# Avoid enum.Enum to reduce the number of imports when tests are run -class State: - PASSED = "PASSED" - FAILED = "FAILED" - SKIPPED = "SKIPPED" - UNCAUGHT_EXC = "UNCAUGHT_EXC" - REFLEAK = "REFLEAK" - ENV_CHANGED = "ENV_CHANGED" - RESOURCE_DENIED = "RESOURCE_DENIED" - INTERRUPTED = "INTERRUPTED" - MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR" - DID_NOT_RUN = "DID_NOT_RUN" - TIMEOUT = "TIMEOUT" - - @staticmethod - def is_failed(state): - return state in { - State.FAILED, - State.UNCAUGHT_EXC, - State.REFLEAK, - State.MULTIPROCESSING_ERROR, - State.TIMEOUT} - - @staticmethod - def has_meaningful_duration(state): - # Consider that the duration is meaningless for these cases. - # For example, if a whole test file is skipped, its duration - # is unlikely to be the duration of executing its tests, - # but just the duration to execute code which skips the test. - return state not in { - State.SKIPPED, - State.RESOURCE_DENIED, - State.INTERRUPTED, - State.MULTIPROCESSING_ERROR, - State.DID_NOT_RUN} - - -@dataclasses.dataclass(slots=True) -class TestResult: - test_name: str - state: str | None = None - # Test duration in seconds - duration: float | None = None - xml_data: list[str] | None = None - stats: TestStats | None = None - - # errors and failures copied from support.TestFailedWithDetails - errors: list[tuple[str, str]] | None = None - failures: list[tuple[str, str]] | None = None - - def is_failed(self, fail_env_changed: bool) -> bool: - if self.state == State.ENV_CHANGED: - return fail_env_changed - return State.is_failed(self.state) - - def _format_failed(self): - if self.errors and self.failures: - le = len(self.errors) - lf = len(self.failures) - error_s = "error" + ("s" if le > 1 else "") - failure_s = "failure" + ("s" if lf > 1 else "") - return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})" - - if self.errors: - le = len(self.errors) - error_s = "error" + ("s" if le > 1 else "") - return f"{self.test_name} failed ({le} {error_s})" - - if self.failures: - lf = len(self.failures) - failure_s = "failure" + ("s" if lf > 1 else "") - return f"{self.test_name} failed ({lf} {failure_s})" - - return f"{self.test_name} failed" - - def __str__(self) -> str: - match self.state: - case State.PASSED: - return f"{self.test_name} passed" - case State.FAILED: - return self._format_failed() - case State.SKIPPED: - return f"{self.test_name} skipped" - case State.UNCAUGHT_EXC: - return f"{self.test_name} failed (uncaught exception)" - case State.REFLEAK: - return f"{self.test_name} failed (reference leak)" - case State.ENV_CHANGED: - return f"{self.test_name} failed (env changed)" - case State.RESOURCE_DENIED: - return f"{self.test_name} skipped (resource denied)" - case State.INTERRUPTED: - return f"{self.test_name} interrupted" - case State.MULTIPROCESSING_ERROR: - return f"{self.test_name} process crashed" - case State.DID_NOT_RUN: - return f"{self.test_name} ran no tests" - case State.TIMEOUT: - return f"{self.test_name} timed out ({format_duration(self.duration)})" - case _: - raise ValueError("unknown result state: {state!r}") - - def has_meaningful_duration(self): - return State.has_meaningful_duration(self.state) - - def set_env_changed(self): - if self.state is None or self.state == State.PASSED: - self.state = State.ENV_CHANGED - - -# Minimum duration of a test to display its duration or to mention that -# the test is running in background -PROGRESS_MIN_TIME = 30.0 # seconds - -#If these test directories are encountered recurse into them and treat each -# test_ .py or dir as a separate test module. This can increase parallelism. -# Beware this can't generally be done for any directory with sub-tests as the -# __init__.py may do things which alter what tests are to be run. - -SPLITTESTDIRS = { - "test_asyncio", - "test_concurrent_futures", - "test_future_stmt", - "test_gdb", - "test_multiprocessing_fork", - "test_multiprocessing_forkserver", - "test_multiprocessing_spawn", -} - -# Storage of uncollectable objects -FOUND_GARBAGE = [] - - -def findtestdir(path=None): - return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir - - -def findtests(*, testdir=None, exclude=(), - split_test_dirs=SPLITTESTDIRS, base_mod=""): - """Return a list of all applicable test modules.""" - testdir = findtestdir(testdir) - tests = [] - for name in os.listdir(testdir): - mod, ext = os.path.splitext(name) - if (not mod.startswith("test_")) or (mod in exclude): - continue - if mod in split_test_dirs: - subdir = os.path.join(testdir, mod) - mod = f"{base_mod or 'test'}.{mod}" - tests.extend(findtests(testdir=subdir, exclude=exclude, - split_test_dirs=split_test_dirs, base_mod=mod)) - elif ext in (".py", ""): - tests.append(f"{base_mod}.{mod}" if base_mod else mod) - return sorted(tests) - - -def split_test_packages(tests, *, testdir=None, exclude=(), - split_test_dirs=SPLITTESTDIRS): - testdir = findtestdir(testdir) - splitted = [] - for name in tests: - if name in split_test_dirs: - subdir = os.path.join(testdir, name) - splitted.extend(findtests(testdir=subdir, exclude=exclude, - split_test_dirs=split_test_dirs, - base_mod=name)) - else: - splitted.append(name) - return splitted - - -def get_abs_module(ns: Namespace, test_name: str) -> str: - if test_name.startswith('test.') or ns.testdir: - return test_name - else: - # Import it from the test package - return 'test.' + test_name - - -def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) -> None: - # Capture stdout and stderr, set faulthandler timeout, - # and create JUnit XML report. - - output_on_failure = ns.verbose3 - - use_timeout = ( - ns.timeout is not None and threading_helper.can_start_thread - ) - if use_timeout: - faulthandler.dump_traceback_later(ns.timeout, exit=True) - - try: - support.set_match_tests(ns.match_tests, ns.ignore_tests) - support.junit_xml_list = xml_list = [] if ns.xmlpath else None - if ns.failfast: - support.failfast = True - - if output_on_failure: - support.verbose = True - - stream = io.StringIO() - orig_stdout = sys.stdout - orig_stderr = sys.stderr - print_warning = support.print_warning - orig_print_warnings_stderr = print_warning.orig_stderr - - output = None - try: - sys.stdout = stream - sys.stderr = stream - # print_warning() writes into the temporary stream to preserve - # messages order. If support.environment_altered becomes true, - # warnings will be written to sys.stderr below. - print_warning.orig_stderr = stream - - _runtest_env_changed_exc(result, ns, display_failure=False) - # Ignore output if the test passed successfully - if result.state != State.PASSED: - output = stream.getvalue() - finally: - sys.stdout = orig_stdout - sys.stderr = orig_stderr - print_warning.orig_stderr = orig_print_warnings_stderr - - if output is not None: - sys.stderr.write(output) - sys.stderr.flush() - else: - # Tell tests to be moderately quiet - support.verbose = ns.verbose - - _runtest_env_changed_exc(result, ns, - display_failure=not ns.verbose) - - if xml_list: - import xml.etree.ElementTree as ET - result.xml_data = [ET.tostring(x).decode('us-ascii') - for x in xml_list] - finally: - if use_timeout: - faulthandler.cancel_dump_traceback_later() - support.junit_xml_list = None - - -def runtest(ns: Namespace, test_name: str) -> TestResult: - """Run a single test. - - ns -- regrtest namespace of options - test_name -- the name of the test - - Returns a TestResult. - - If ns.xmlpath is not None, xml_data is a list containing each - generated testsuite element. - """ - start_time = time.perf_counter() - result = TestResult(test_name) - try: - _runtest_capture_output_timeout_junit(result, ns) - except: - if not ns.pgo: - msg = traceback.format_exc() - print(f"test {test_name} crashed -- {msg}", - file=sys.stderr, flush=True) - result.state = State.UNCAUGHT_EXC - result.duration = time.perf_counter() - start_time - return result - - -def _test_module(the_module): - loader = unittest.TestLoader() - tests = loader.loadTestsFromModule(the_module) - for error in loader.errors: - print(error, file=sys.stderr) - if loader.errors: - raise Exception("errors while loading tests") - return support.run_unittest(tests) - - -def save_env(ns: Namespace, test_name: str): - return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) - - -def regrtest_runner(result, test_func, ns) -> None: - # Run test_func(), collect statistics, and detect reference and memory - # leaks. - - if ns.huntrleaks: - from test.libregrtest.refleak import dash_R - refleak, test_result = dash_R(ns, result.test_name, test_func) - else: - test_result = test_func() - refleak = False - - if refleak: - result.state = State.REFLEAK - - match test_result: - case TestStats(): - stats = test_result - case unittest.TestResult(): - stats = TestStats.from_unittest(test_result) - case doctest.TestResults(): - stats = TestStats.from_doctest(test_result) - case None: - print_warning(f"{result.test_name} test runner returned None: {test_func}") - stats = None - case _: - print_warning(f"Unknown test result type: {type(test_result)}") - stats = None - - result.stats = stats - - -def _load_run_test(result: TestResult, ns: Namespace) -> None: - # Load the test function, run the test function. - - abstest = get_abs_module(ns, result.test_name) - - # remove the module from sys.module to reload it if it was already imported - try: - del sys.modules[abstest] - except KeyError: - pass - - the_module = importlib.import_module(abstest) - - if hasattr(the_module, "test_main"): - # https://github.com/python/cpython/issues/89392 - raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest") - test_func = functools.partial(_test_module, the_module) - - try: - with save_env(ns, result.test_name): - regrtest_runner(result, test_func, ns) - finally: - # First kill any dangling references to open files etc. - # This can also issue some ResourceWarnings which would otherwise get - # triggered during the following test run, and possibly produce - # failures. - support.gc_collect() - - cleanup_test_droppings(result.test_name, ns.verbose) - - if gc.garbage: - support.environment_altered = True - print_warning(f"{result.test_name} created {len(gc.garbage)} " - f"uncollectable object(s).") - - # move the uncollectable objects somewhere, - # so we don't see them again - FOUND_GARBAGE.extend(gc.garbage) - gc.garbage.clear() - - support.reap_children() - - -def _runtest_env_changed_exc(result: TestResult, ns: Namespace, - display_failure: bool = True) -> None: - # Detect environment changes, handle exceptions. - - # Reset the environment_altered flag to detect if a test altered - # the environment - support.environment_altered = False - - if ns.pgo: - display_failure = False - - test_name = result.test_name - try: - clear_caches() - support.gc_collect() - - with save_env(ns, test_name): - _load_run_test(result, ns) - except support.ResourceDenied as msg: - if not ns.quiet and not ns.pgo: - print(f"{test_name} skipped -- {msg}", flush=True) - result.state = State.RESOURCE_DENIED - return - except unittest.SkipTest as msg: - if not ns.quiet and not ns.pgo: - print(f"{test_name} skipped -- {msg}", flush=True) - result.state = State.SKIPPED - return - except support.TestFailedWithDetails as exc: - msg = f"test {test_name} failed" - if display_failure: - msg = f"{msg} -- {exc}" - print(msg, file=sys.stderr, flush=True) - result.state = State.FAILED - result.errors = exc.errors - result.failures = exc.failures - result.stats = exc.stats - return - except support.TestFailed as exc: - msg = f"test {test_name} failed" - if display_failure: - msg = f"{msg} -- {exc}" - print(msg, file=sys.stderr, flush=True) - result.state = State.FAILED - result.stats = exc.stats - return - except support.TestDidNotRun: - result.state = State.DID_NOT_RUN - return - except KeyboardInterrupt: - print() - result.state = State.INTERRUPTED - return - except: - if not ns.pgo: - msg = traceback.format_exc() - print(f"test {test_name} crashed -- {msg}", - file=sys.stderr, flush=True) - result.state = State.UNCAUGHT_EXC - return - - if support.environment_altered: - result.set_env_changed() - # Don't override the state if it was already set (REFLEAK or ENV_CHANGED) - if result.state is None: - result.state = State.PASSED - - -def cleanup_test_droppings(test_name: str, verbose: int) -> None: - # Try to clean up junk commonly left behind. While tests shouldn't leave - # any files or directories behind, when a test fails that can be tedious - # for it to arrange. The consequences can be especially nasty on Windows, - # since if a test leaves a file open, it cannot be deleted by name (while - # there's nothing we can do about that here either, we can display the - # name of the offending test, which is a real help). - for name in (os_helper.TESTFN,): - if not os.path.exists(name): - continue - - if os.path.isdir(name): - import shutil - kind, nuker = "directory", shutil.rmtree - elif os.path.isfile(name): - kind, nuker = "file", os.unlink - else: - raise RuntimeError(f"os.path says {name!r} exists but is neither " - f"directory nor file") - - if verbose: - print_warning(f"{test_name} left behind {kind} {name!r}") - support.environment_altered = True - - try: - import stat - # fix possible permissions problems that might prevent cleanup - os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) - nuker(name) - except Exception as exc: - print_warning(f"{test_name} left behind {kind} {name!r} " - f"and it couldn't be removed: {exc}") diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py deleted file mode 100644 index fb1f80b..0000000 --- a/Lib/test/libregrtest/runtest_mp.py +++ /dev/null @@ -1,564 +0,0 @@ -import dataclasses -import faulthandler -import json -import os.path -import queue -import signal -import subprocess -import sys -import tempfile -import threading -import time -import traceback -from typing import NamedTuple, NoReturn, Literal, Any, TextIO - -from test import support -from test.support import os_helper -from test.support import TestStats - -from test.libregrtest.cmdline import Namespace -from test.libregrtest.main import Regrtest -from test.libregrtest.runtest import ( - runtest, TestResult, State, - PROGRESS_MIN_TIME) -from test.libregrtest.setup import setup_tests -from test.libregrtest.utils import format_duration, print_warning - -if sys.platform == 'win32': - import locale - - -# Display the running tests if nothing happened last N seconds -PROGRESS_UPDATE = 30.0 # seconds -assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME - -# Kill the main process after 5 minutes. It is supposed to write an update -# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest -# buildbot workers. -MAIN_PROCESS_TIMEOUT = 5 * 60.0 -assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE - -# Time to wait until a worker completes: should be immediate -JOIN_TIMEOUT = 30.0 # seconds - -USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg")) - - -def must_stop(result: TestResult, ns: Namespace) -> bool: - if result.state == State.INTERRUPTED: - return True - if ns.failfast and result.is_failed(ns.fail_env_changed): - return True - return False - - -def parse_worker_args(worker_args) -> tuple[Namespace, str]: - ns_dict, test_name = json.loads(worker_args) - ns = Namespace(**ns_dict) - return (ns, test_name) - - -def run_test_in_subprocess(testname: str, ns: Namespace, tmp_dir: str, stdout_fh: TextIO) -> subprocess.Popen: - ns_dict = vars(ns) - worker_args = (ns_dict, testname) - worker_args = json.dumps(worker_args) - if ns.python is not None: - executable = ns.python - else: - executable = [sys.executable] - cmd = [*executable, *support.args_from_interpreter_flags(), - '-u', # Unbuffered stdout and stderr - '-m', 'test.regrtest', - '--worker-args', worker_args] - - env = dict(os.environ) - if tmp_dir is not None: - env['TMPDIR'] = tmp_dir - env['TEMP'] = tmp_dir - env['TMP'] = tmp_dir - - # Running the child from the same working directory as regrtest's original - # invocation ensures that TEMPDIR for the child is the same when - # sysconfig.is_python_build() is true. See issue 15300. - kw = dict( - env=env, - stdout=stdout_fh, - # bpo-45410: Write stderr into stdout to keep messages order - stderr=stdout_fh, - text=True, - close_fds=(os.name != 'nt'), - cwd=os_helper.SAVEDCWD, - ) - if USE_PROCESS_GROUP: - kw['start_new_session'] = True - return subprocess.Popen(cmd, **kw) - - -def run_tests_worker(ns: Namespace, test_name: str) -> NoReturn: - setup_tests(ns) - - result = runtest(ns, test_name) - - print() # Force a newline (just in case) - - # Serialize TestResult as dict in JSON - print(json.dumps(result, cls=EncodeTestResult), flush=True) - sys.exit(0) - - -# We do not use a generator so multiple threads can call next(). -class MultiprocessIterator: - - """A thread-safe iterator over tests for multiprocess mode.""" - - def __init__(self, tests_iter): - self.lock = threading.Lock() - self.tests_iter = tests_iter - - def __iter__(self): - return self - - def __next__(self): - with self.lock: - if self.tests_iter is None: - raise StopIteration - return next(self.tests_iter) - - def stop(self): - with self.lock: - self.tests_iter = None - - -class MultiprocessResult(NamedTuple): - result: TestResult - # bpo-45410: stderr is written into stdout to keep messages order - worker_stdout: str | None = None - err_msg: str | None = None - - -ExcStr = str -QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr] - - -class ExitThread(Exception): - pass - - -class TestWorkerProcess(threading.Thread): - def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None: - super().__init__() - self.worker_id = worker_id - self.pending = runner.pending - self.output = runner.output - self.ns = runner.ns - self.timeout = runner.worker_timeout - self.regrtest = runner.regrtest - self.current_test_name = None - self.start_time = None - self._popen = None - self._killed = False - self._stopped = False - - def __repr__(self) -> str: - info = [f'TestWorkerProcess #{self.worker_id}'] - if self.is_alive(): - info.append("running") - else: - info.append('stopped') - test = self.current_test_name - if test: - info.append(f'test={test}') - popen = self._popen - if popen is not None: - dt = time.monotonic() - self.start_time - info.extend((f'pid={self._popen.pid}', - f'time={format_duration(dt)}')) - return '<%s>' % ' '.join(info) - - def _kill(self) -> None: - popen = self._popen - if popen is None: - return - - if self._killed: - return - self._killed = True - - if USE_PROCESS_GROUP: - what = f"{self} process group" - else: - what = f"{self}" - - print(f"Kill {what}", file=sys.stderr, flush=True) - try: - if USE_PROCESS_GROUP: - os.killpg(popen.pid, signal.SIGKILL) - else: - popen.kill() - except ProcessLookupError: - # popen.kill(): the process completed, the TestWorkerProcess thread - # read its exit status, but Popen.send_signal() read the returncode - # just before Popen.wait() set returncode. - pass - except OSError as exc: - print_warning(f"Failed to kill {what}: {exc!r}") - - def stop(self) -> None: - # Method called from a different thread to stop this thread - self._stopped = True - self._kill() - - def mp_result_error( - self, - test_result: TestResult, - stdout: str | None = None, - err_msg=None - ) -> MultiprocessResult: - return MultiprocessResult(test_result, stdout, err_msg) - - def _run_process(self, test_name: str, tmp_dir: str, stdout_fh: TextIO) -> int: - self.current_test_name = test_name - try: - popen = run_test_in_subprocess(test_name, self.ns, tmp_dir, stdout_fh) - - self._killed = False - self._popen = popen - except: - self.current_test_name = None - raise - - try: - if self._stopped: - # If kill() has been called before self._popen is set, - # self._popen is still running. Call again kill() - # to ensure that the process is killed. - self._kill() - raise ExitThread - - try: - # gh-94026: stdout+stderr are written to tempfile - retcode = popen.wait(timeout=self.timeout) - assert retcode is not None - return retcode - except subprocess.TimeoutExpired: - if self._stopped: - # kill() has been called: communicate() fails on reading - # closed stdout - raise ExitThread - - # On timeout, kill the process - self._kill() - - # None means TIMEOUT for the caller - retcode = None - # bpo-38207: Don't attempt to call communicate() again: on it - # can hang until all child processes using stdout - # pipes completes. - except OSError: - if self._stopped: - # kill() has been called: communicate() fails - # on reading closed stdout - raise ExitThread - raise - except: - self._kill() - raise - finally: - self._wait_completed() - self._popen = None - self.current_test_name = None - - def _runtest(self, test_name: str) -> MultiprocessResult: - if sys.platform == 'win32': - # gh-95027: When stdout is not a TTY, Python uses the ANSI code - # page for the sys.stdout encoding. If the main process runs in a - # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding. - encoding = locale.getencoding() - else: - encoding = sys.stdout.encoding - - # gh-94026: Write stdout+stderr to a tempfile as workaround for - # non-blocking pipes on Emscripten with NodeJS. - with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_fh: - # gh-93353: Check for leaked temporary files in the parent process, - # since the deletion of temporary files can happen late during - # Python finalization: too late for libregrtest. - if not support.is_wasi: - # Don't check for leaked temporary files and directories if Python is - # run on WASI. WASI don't pass environment variables like TMPDIR to - # worker processes. - tmp_dir = tempfile.mkdtemp(prefix="test_python_") - tmp_dir = os.path.abspath(tmp_dir) - try: - retcode = self._run_process(test_name, tmp_dir, stdout_fh) - finally: - tmp_files = os.listdir(tmp_dir) - os_helper.rmtree(tmp_dir) - else: - retcode = self._run_process(test_name, None, stdout_fh) - tmp_files = () - stdout_fh.seek(0) - - try: - stdout = stdout_fh.read().strip() - except Exception as exc: - # gh-101634: Catch UnicodeDecodeError if stdout cannot be - # decoded from encoding - err_msg = f"Cannot read process stdout: {exc}" - result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR) - return self.mp_result_error(result, err_msg=err_msg) - - if retcode is None: - result = TestResult(test_name, state=State.TIMEOUT) - return self.mp_result_error(result, stdout) - - err_msg = None - if retcode != 0: - err_msg = "Exit code %s" % retcode - else: - stdout, _, worker_json = stdout.rpartition("\n") - stdout = stdout.rstrip() - if not worker_json: - err_msg = "Failed to parse worker stdout" - else: - try: - # deserialize run_tests_worker() output - result = json.loads(worker_json, - object_hook=decode_test_result) - except Exception as exc: - err_msg = "Failed to parse worker JSON: %s" % exc - - if err_msg: - result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR) - return self.mp_result_error(result, stdout, err_msg) - - if tmp_files: - msg = (f'\n\n' - f'Warning -- {test_name} leaked temporary files ' - f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}') - stdout += msg - result.set_env_changed() - - return MultiprocessResult(result, stdout) - - def run(self) -> None: - while not self._stopped: - try: - try: - test_name = next(self.pending) - except StopIteration: - break - - self.start_time = time.monotonic() - mp_result = self._runtest(test_name) - mp_result.result.duration = time.monotonic() - self.start_time - self.output.put((False, mp_result)) - - if must_stop(mp_result.result, self.ns): - break - except ExitThread: - break - except BaseException: - self.output.put((True, traceback.format_exc())) - break - - def _wait_completed(self) -> None: - popen = self._popen - - try: - popen.wait(JOIN_TIMEOUT) - except (subprocess.TimeoutExpired, OSError) as exc: - print_warning(f"Failed to wait for {self} completion " - f"(timeout={format_duration(JOIN_TIMEOUT)}): " - f"{exc!r}") - - def wait_stopped(self, start_time: float) -> None: - # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop() - # which killed the process. Sometimes, killing the process from the - # main thread does not interrupt popen.communicate() in - # TestWorkerProcess thread. This loop with a timeout is a workaround - # for that. - # - # Moreover, if this method fails to join the thread, it is likely - # that Python will hang at exit while calling threading._shutdown() - # which tries again to join the blocked thread. Regrtest.main() - # uses EXIT_TIMEOUT to workaround this second bug. - while True: - # Write a message every second - self.join(1.0) - if not self.is_alive(): - break - dt = time.monotonic() - start_time - self.regrtest.log(f"Waiting for {self} thread " - f"for {format_duration(dt)}") - if dt > JOIN_TIMEOUT: - print_warning(f"Failed to join {self} in {format_duration(dt)}") - break - - -def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]: - running = [] - for worker in workers: - current_test_name = worker.current_test_name - if not current_test_name: - continue - dt = time.monotonic() - worker.start_time - if dt >= PROGRESS_MIN_TIME: - text = '%s (%s)' % (current_test_name, format_duration(dt)) - running.append(text) - return running - - -class MultiprocessTestRunner: - def __init__(self, regrtest: Regrtest) -> None: - self.regrtest = regrtest - self.log = self.regrtest.log - self.ns = regrtest.ns - self.output: queue.Queue[QueueOutput] = queue.Queue() - self.pending = MultiprocessIterator(self.regrtest.tests) - if self.ns.timeout is not None: - # Rely on faulthandler to kill a worker process. This timouet is - # when faulthandler fails to kill a worker process. Give a maximum - # of 5 minutes to faulthandler to kill the worker. - self.worker_timeout = min(self.ns.timeout * 1.5, - self.ns.timeout + 5 * 60) - else: - self.worker_timeout = None - self.workers = None - - def start_workers(self) -> None: - self.workers = [TestWorkerProcess(index, self) - for index in range(1, self.ns.use_mp + 1)] - msg = f"Run tests in parallel using {len(self.workers)} child processes" - if self.ns.timeout: - msg += (" (timeout: %s, worker timeout: %s)" - % (format_duration(self.ns.timeout), - format_duration(self.worker_timeout))) - self.log(msg) - for worker in self.workers: - worker.start() - - def stop_workers(self) -> None: - start_time = time.monotonic() - for worker in self.workers: - worker.stop() - for worker in self.workers: - worker.wait_stopped(start_time) - - def _get_result(self) -> QueueOutput | None: - use_faulthandler = (self.ns.timeout is not None) - timeout = PROGRESS_UPDATE - - # bpo-46205: check the status of workers every iteration to avoid - # waiting forever on an empty queue. - while any(worker.is_alive() for worker in self.workers): - if use_faulthandler: - faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT, - exit=True) - - # wait for a thread - try: - return self.output.get(timeout=timeout) - except queue.Empty: - pass - - # display progress - running = get_running(self.workers) - if running and not self.ns.pgo: - self.log('running: %s' % ', '.join(running)) - - # all worker threads are done: consume pending results - try: - return self.output.get(timeout=0) - except queue.Empty: - return None - - def display_result(self, mp_result: MultiprocessResult) -> None: - result = mp_result.result - - text = str(result) - if mp_result.err_msg: - # MULTIPROCESSING_ERROR - text += ' (%s)' % mp_result.err_msg - elif (result.duration >= PROGRESS_MIN_TIME and not self.ns.pgo): - text += ' (%s)' % format_duration(result.duration) - running = get_running(self.workers) - if running and not self.ns.pgo: - text += ' -- running: %s' % ', '.join(running) - self.regrtest.display_progress(self.test_index, text) - - def _process_result(self, item: QueueOutput) -> bool: - """Returns True if test runner must stop.""" - if item[0]: - # Thread got an exception - format_exc = item[1] - print_warning(f"regrtest worker thread failed: {format_exc}") - result = TestResult("", state=State.MULTIPROCESSING_ERROR) - self.regrtest.accumulate_result(result) - return True - - self.test_index += 1 - mp_result = item[1] - self.regrtest.accumulate_result(mp_result.result) - self.display_result(mp_result) - - if mp_result.worker_stdout: - print(mp_result.worker_stdout, flush=True) - - if must_stop(mp_result.result, self.ns): - return True - - return False - - def run_tests(self) -> None: - self.start_workers() - - self.test_index = 0 - try: - while True: - item = self._get_result() - if item is None: - break - - stop = self._process_result(item) - if stop: - break - except KeyboardInterrupt: - print() - self.regrtest.interrupted = True - finally: - if self.ns.timeout is not None: - faulthandler.cancel_dump_traceback_later() - - # Always ensure that all worker processes are no longer - # worker when we exit this function - self.pending.stop() - self.stop_workers() - - -def run_tests_multiprocess(regrtest: Regrtest) -> None: - MultiprocessTestRunner(regrtest).run_tests() - - -class EncodeTestResult(json.JSONEncoder): - """Encode a TestResult (sub)class object into a JSON dict.""" - - def default(self, o: Any) -> dict[str, Any]: - if isinstance(o, TestResult): - result = dataclasses.asdict(o) - result["__test_result__"] = o.__class__.__name__ - return result - - return super().default(o) - - -def decode_test_result(d: dict[str, Any]) -> TestResult | TestStats | dict[str, Any]: - """Decode a TestResult (sub)class object from a JSON dict.""" - - if "__test_result__" not in d: - return d - - d.pop('__test_result__') - if d['stats'] is not None: - d['stats'] = TestStats(**d['stats']) - return TestResult(**d) diff --git a/Lib/test/libregrtest/runtests.py b/Lib/test/libregrtest/runtests.py new file mode 100644 index 0000000..4da312d --- /dev/null +++ b/Lib/test/libregrtest/runtests.py @@ -0,0 +1,162 @@ +import contextlib +import dataclasses +import json +import os +import subprocess +from typing import Any + +from test import support + +from .utils import ( + StrPath, StrJSON, TestTuple, FilterTuple, FilterDict) + + +class JsonFileType: + UNIX_FD = "UNIX_FD" + WINDOWS_HANDLE = "WINDOWS_HANDLE" + STDOUT = "STDOUT" + + +@dataclasses.dataclass(slots=True, frozen=True) +class JsonFile: + # file type depends on file_type: + # - UNIX_FD: file descriptor (int) + # - WINDOWS_HANDLE: handle (int) + # - STDOUT: use process stdout (None) + file: int | None + file_type: str + + def configure_subprocess(self, popen_kwargs: dict) -> None: + match self.file_type: + case JsonFileType.UNIX_FD: + # Unix file descriptor + popen_kwargs['pass_fds'] = [self.file] + case JsonFileType.WINDOWS_HANDLE: + # Windows handle + startupinfo = subprocess.STARTUPINFO() + startupinfo.lpAttributeList = {"handle_list": [self.file]} + popen_kwargs['startupinfo'] = startupinfo + + @contextlib.contextmanager + def inherit_subprocess(self): + if self.file_type == JsonFileType.WINDOWS_HANDLE: + os.set_handle_inheritable(self.file, True) + try: + yield + finally: + os.set_handle_inheritable(self.file, False) + else: + yield + + def open(self, mode='r', *, encoding): + if self.file_type == JsonFileType.STDOUT: + raise ValueError("for STDOUT file type, just use sys.stdout") + + file = self.file + if self.file_type == JsonFileType.WINDOWS_HANDLE: + import msvcrt + # Create a file descriptor from the handle + file = msvcrt.open_osfhandle(file, os.O_WRONLY) + return open(file, mode, encoding=encoding) + + +@dataclasses.dataclass(slots=True, frozen=True) +class HuntRefleak: + warmups: int + runs: int + filename: StrPath + + +@dataclasses.dataclass(slots=True, frozen=True) +class RunTests: + tests: TestTuple + fail_fast: bool + fail_env_changed: bool + match_tests: FilterTuple | None + ignore_tests: FilterTuple | None + match_tests_dict: FilterDict | None + rerun: bool + forever: bool + pgo: bool + pgo_extended: bool + output_on_failure: bool + timeout: float | None + verbose: int + quiet: bool + hunt_refleak: HuntRefleak | None + test_dir: StrPath | None + use_junit: bool + memory_limit: str | None + gc_threshold: int | None + use_resources: tuple[str, ...] + python_cmd: tuple[str, ...] | None + randomize: bool + random_seed: int | None + json_file: JsonFile | None + + def copy(self, **override): + state = dataclasses.asdict(self) + state.update(override) + return RunTests(**state) + + def get_match_tests(self, test_name) -> FilterTuple | None: + if self.match_tests_dict is not None: + return self.match_tests_dict.get(test_name, None) + else: + return None + + def get_jobs(self): + # Number of run_single_test() calls needed to run all tests. + # None means that there is not bound limit (--forever option). + if self.forever: + return None + return len(self.tests) + + def iter_tests(self): + if self.forever: + while True: + yield from self.tests + else: + yield from self.tests + + def as_json(self) -> StrJSON: + return json.dumps(self, cls=_EncodeRunTests) + + @staticmethod + def from_json(worker_json: StrJSON) -> 'RunTests': + return json.loads(worker_json, object_hook=_decode_runtests) + + def json_file_use_stdout(self) -> bool: + # Use STDOUT in two cases: + # + # - If --python command line option is used; + # - On Emscripten and WASI. + # + # On other platforms, UNIX_FD or WINDOWS_HANDLE can be used. + return ( + bool(self.python_cmd) + or support.is_emscripten + or support.is_wasi + ) + + +class _EncodeRunTests(json.JSONEncoder): + def default(self, o: Any) -> dict[str, Any]: + if isinstance(o, RunTests): + result = dataclasses.asdict(o) + result["__runtests__"] = True + return result + else: + return super().default(o) + + +def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]: + if "__runtests__" in data: + data.pop('__runtests__') + if data['hunt_refleak']: + data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak']) + if data['json_file']: + data['json_file'] = JsonFile(**data['json_file']) + return RunTests(**data) + else: + return data diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py index 7e801a5..b2cc381 100644 --- a/Lib/test/libregrtest/save_env.py +++ b/Lib/test/libregrtest/save_env.py @@ -3,9 +3,11 @@ import locale import os import sys import threading + from test import support from test.support import os_helper -from test.libregrtest.utils import print_warning + +from .utils import print_warning class SkipTestEnvironment(Exception): @@ -34,7 +36,7 @@ class saved_test_environment: items is also printed. """ - def __init__(self, test_name, verbose=0, quiet=False, *, pgo=False): + def __init__(self, test_name, verbose, quiet, *, pgo): self.test_name = test_name self.verbose = verbose self.quiet = quiet @@ -161,11 +163,11 @@ class saved_test_environment: warnings.filters[:] = saved_filters[2] def get_asyncore_socket_map(self): - asyncore = sys.modules.get('asyncore') + asyncore = sys.modules.get('test.support.asyncore') # XXX Making a copy keeps objects alive until __exit__ gets called. return asyncore and asyncore.socket_map.copy() or {} def restore_asyncore_socket_map(self, saved_map): - asyncore = sys.modules.get('asyncore') + asyncore = sys.modules.get('test.support.asyncore') if asyncore is not None: asyncore.close_all(ignore_all=True) asyncore.socket_map.update(saved_map) @@ -257,8 +259,10 @@ class saved_test_environment: sysconfig._INSTALL_SCHEMES.update(saved[2]) def get_files(self): + # XXX: Maybe add an allow-list here? return sorted(fn + ('/' if os.path.isdir(fn) else '') - for fn in os.listdir()) + for fn in os.listdir() + if not fn.startswith(".hypothesis")) def restore_files(self, saved_value): fn = os_helper.TESTFN if fn not in saved_value and (fn + '/') not in saved_value: diff --git a/Lib/test/libregrtest/setup.py b/Lib/test/libregrtest/setup.py index ecd7fa3..793347f 100644 --- a/Lib/test/libregrtest/setup.py +++ b/Lib/test/libregrtest/setup.py @@ -1,24 +1,32 @@ -import atexit import faulthandler +import gc import os +import random import signal import sys import unittest from test import support from test.support.os_helper import TESTFN_UNDECODABLE, FS_NONASCII -try: - import gc -except ImportError: - gc = None -from test.libregrtest.utils import (setup_unraisable_hook, - setup_threading_excepthook) +from .runtests import RunTests +from .utils import ( + setup_unraisable_hook, setup_threading_excepthook, fix_umask, + adjust_rlimit_nofile) UNICODE_GUARD_ENV = "PYTHONREGRTEST_UNICODE_GUARD" -def setup_tests(ns): +def setup_test_dir(testdir: str | None) -> None: + if testdir: + # Prepend test directory to sys.path, so runtest() will be able + # to locate tests + sys.path.insert(0, os.path.abspath(testdir)) + + +def setup_process(): + fix_umask() + try: stderr_fd = sys.__stderr__.fileno() except (ValueError, AttributeError): @@ -40,14 +48,9 @@ def setup_tests(ns): for signum in signals: faulthandler.register(signum, chain=True, file=stderr_fd) - _adjust_resource_limits() - replace_stdout() - support.record_original_stdout(sys.stdout) + adjust_rlimit_nofile() - if ns.testdir: - # Prepend test directory to sys.path, so runtest() will be able - # to locate tests - sys.path.insert(0, os.path.abspath(ns.testdir)) + support.record_original_stdout(sys.stdout) # Some times __path__ and __file__ are not absolute (e.g. while running from # Lib/) and, if we change the CWD to run the tests in a temporary dir, some @@ -66,19 +69,6 @@ def setup_tests(ns): if getattr(module, '__file__', None): module.__file__ = os.path.abspath(module.__file__) - if ns.huntrleaks: - unittest.BaseTestSuite._cleanup = False - - if ns.memlimit is not None: - support.set_memlimit(ns.memlimit) - - if ns.threshold is not None: - gc.set_threshold(ns.threshold) - - support.suppress_msvcrt_asserts(ns.verbose and ns.verbose >= 2) - - support.use_resources = ns.use_resources - if hasattr(sys, 'addaudithook'): # Add an auditing hook for all tests to ensure PySys_Audit is tested def _test_audit_hook(name, args): @@ -88,7 +78,37 @@ def setup_tests(ns): setup_unraisable_hook() setup_threading_excepthook() - timeout = ns.timeout + # Ensure there's a non-ASCII character in env vars at all times to force + # tests consider this case. See BPO-44647 for details. + if TESTFN_UNDECODABLE and os.supports_bytes_environ: + os.environb.setdefault(UNICODE_GUARD_ENV.encode(), TESTFN_UNDECODABLE) + elif FS_NONASCII: + os.environ.setdefault(UNICODE_GUARD_ENV, FS_NONASCII) + + +def setup_tests(runtests: RunTests): + support.verbose = runtests.verbose + support.failfast = runtests.fail_fast + support.PGO = runtests.pgo + support.PGO_EXTENDED = runtests.pgo_extended + + support.set_match_tests(runtests.match_tests, runtests.ignore_tests) + + if runtests.use_junit: + support.junit_xml_list = [] + from test.support.testresult import RegressionTestResult + RegressionTestResult.USE_XML = True + else: + support.junit_xml_list = None + + if runtests.memory_limit is not None: + support.set_memlimit(runtests.memory_limit) + + support.suppress_msvcrt_asserts(runtests.verbose >= 2) + + support.use_resources = runtests.use_resources + + timeout = runtests.timeout if timeout is not None: # For a slow buildbot worker, increase SHORT_TIMEOUT and LONG_TIMEOUT support.LOOPBACK_TIMEOUT = max(support.LOOPBACK_TIMEOUT, timeout / 120) @@ -102,61 +122,10 @@ def setup_tests(ns): support.SHORT_TIMEOUT = min(support.SHORT_TIMEOUT, timeout) support.LONG_TIMEOUT = min(support.LONG_TIMEOUT, timeout) - if ns.xmlpath: - from test.support.testresult import RegressionTestResult - RegressionTestResult.USE_XML = True - - # Ensure there's a non-ASCII character in env vars at all times to force - # tests consider this case. See BPO-44647 for details. - if TESTFN_UNDECODABLE and os.supports_bytes_environ: - os.environb.setdefault(UNICODE_GUARD_ENV.encode(), TESTFN_UNDECODABLE) - elif FS_NONASCII: - os.environ.setdefault(UNICODE_GUARD_ENV, FS_NONASCII) - - -def replace_stdout(): - """Set stdout encoder error handler to backslashreplace (as stderr error - handler) to avoid UnicodeEncodeError when printing a traceback""" - stdout = sys.stdout - try: - fd = stdout.fileno() - except ValueError: - # On IDLE, sys.stdout has no file descriptor and is not a TextIOWrapper - # object. Leaving sys.stdout unchanged. - # - # Catch ValueError to catch io.UnsupportedOperation on TextIOBase - # and ValueError on a closed stream. - return - - sys.stdout = open(fd, 'w', - encoding=stdout.encoding, - errors="backslashreplace", - closefd=False, - newline='\n') - - def restore_stdout(): - sys.stdout.close() - sys.stdout = stdout - atexit.register(restore_stdout) + if runtests.hunt_refleak: + unittest.BaseTestSuite._cleanup = False + if runtests.gc_threshold is not None: + gc.set_threshold(runtests.gc_threshold) -def _adjust_resource_limits(): - """Adjust the system resource limits (ulimit) if needed.""" - try: - import resource - from resource import RLIMIT_NOFILE, RLIM_INFINITY - except ImportError: - return - fd_limit, max_fds = resource.getrlimit(RLIMIT_NOFILE) - # On macOS the default fd limit is sometimes too low (256) for our - # test suite to succeed. Raise it to something more reasonable. - # 1024 is a common Linux default. - desired_fds = 1024 - if fd_limit < desired_fds and fd_limit < max_fds: - new_fd_limit = min(desired_fds, max_fds) - try: - resource.setrlimit(RLIMIT_NOFILE, (new_fd_limit, max_fds)) - print(f"Raised RLIMIT_NOFILE: {fd_limit} -> {new_fd_limit}") - except (ValueError, OSError) as err: - print(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to " - f"{new_fd_limit}: {err}.") + random.seed(runtests.random_seed) diff --git a/Lib/test/libregrtest/single.py b/Lib/test/libregrtest/single.py new file mode 100644 index 0000000..0304f85 --- /dev/null +++ b/Lib/test/libregrtest/single.py @@ -0,0 +1,278 @@ +import doctest +import faulthandler +import gc +import importlib +import io +import sys +import time +import traceback +import unittest + +from test import support +from test.support import TestStats +from test.support import threading_helper + +from .result import State, TestResult +from .runtests import RunTests +from .save_env import saved_test_environment +from .setup import setup_tests +from .utils import ( + TestName, + clear_caches, remove_testfn, abs_module_name, print_warning) + + +# Minimum duration of a test to display its duration or to mention that +# the test is running in background +PROGRESS_MIN_TIME = 30.0 # seconds + + +def run_unittest(test_mod): + loader = unittest.TestLoader() + tests = loader.loadTestsFromModule(test_mod) + for error in loader.errors: + print(error, file=sys.stderr) + if loader.errors: + raise Exception("errors while loading tests") + return support.run_unittest(tests) + + +def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None: + # Run test_func(), collect statistics, and detect reference and memory + # leaks. + if runtests.hunt_refleak: + from .refleak import runtest_refleak + refleak, test_result = runtest_refleak(result.test_name, test_func, + runtests.hunt_refleak, + runtests.quiet) + else: + test_result = test_func() + refleak = False + + if refleak: + result.state = State.REFLEAK + + stats: TestStats | None + + match test_result: + case TestStats(): + stats = test_result + case unittest.TestResult(): + stats = TestStats.from_unittest(test_result) + case doctest.TestResults(): + stats = TestStats.from_doctest(test_result) + case None: + print_warning(f"{result.test_name} test runner returned None: {test_func}") + stats = None + case _: + print_warning(f"Unknown test result type: {type(test_result)}") + stats = None + + result.stats = stats + + +# Storage of uncollectable GC objects (gc.garbage) +GC_GARBAGE = [] + + +def _load_run_test(result: TestResult, runtests: RunTests) -> None: + # Load the test module and run the tests. + test_name = result.test_name + module_name = abs_module_name(test_name, runtests.test_dir) + + # Remove the module from sys.module to reload it if it was already imported + sys.modules.pop(module_name, None) + + test_mod = importlib.import_module(module_name) + + if hasattr(test_mod, "test_main"): + # https://github.com/python/cpython/issues/89392 + raise Exception(f"Module {test_name} defines test_main() which " + f"is no longer supported by regrtest") + def test_func(): + return run_unittest(test_mod) + + try: + regrtest_runner(result, test_func, runtests) + finally: + # First kill any dangling references to open files etc. + # This can also issue some ResourceWarnings which would otherwise get + # triggered during the following test run, and possibly produce + # failures. + support.gc_collect() + + remove_testfn(test_name, runtests.verbose) + + if gc.garbage: + support.environment_altered = True + print_warning(f"{test_name} created {len(gc.garbage)} " + f"uncollectable object(s)") + + # move the uncollectable objects somewhere, + # so we don't see them again + GC_GARBAGE.extend(gc.garbage) + gc.garbage.clear() + + support.reap_children() + + +def _runtest_env_changed_exc(result: TestResult, runtests: RunTests, + display_failure: bool = True) -> None: + # Handle exceptions, detect environment changes. + + # Reset the environment_altered flag to detect if a test altered + # the environment + support.environment_altered = False + + pgo = runtests.pgo + if pgo: + display_failure = False + quiet = runtests.quiet + + test_name = result.test_name + try: + clear_caches() + support.gc_collect() + + with saved_test_environment(test_name, + runtests.verbose, quiet, pgo=pgo): + _load_run_test(result, runtests) + except support.ResourceDenied as exc: + if not quiet and not pgo: + print(f"{test_name} skipped -- {exc}", flush=True) + result.state = State.RESOURCE_DENIED + return + except unittest.SkipTest as exc: + if not quiet and not pgo: + print(f"{test_name} skipped -- {exc}", flush=True) + result.state = State.SKIPPED + return + except support.TestFailedWithDetails as exc: + msg = f"test {test_name} failed" + if display_failure: + msg = f"{msg} -- {exc}" + print(msg, file=sys.stderr, flush=True) + result.state = State.FAILED + result.errors = exc.errors + result.failures = exc.failures + result.stats = exc.stats + return + except support.TestFailed as exc: + msg = f"test {test_name} failed" + if display_failure: + msg = f"{msg} -- {exc}" + print(msg, file=sys.stderr, flush=True) + result.state = State.FAILED + result.stats = exc.stats + return + except support.TestDidNotRun: + result.state = State.DID_NOT_RUN + return + except KeyboardInterrupt: + print() + result.state = State.INTERRUPTED + return + except: + if not pgo: + msg = traceback.format_exc() + print(f"test {test_name} crashed -- {msg}", + file=sys.stderr, flush=True) + result.state = State.UNCAUGHT_EXC + return + + if support.environment_altered: + result.set_env_changed() + # Don't override the state if it was already set (REFLEAK or ENV_CHANGED) + if result.state is None: + result.state = State.PASSED + + +def _runtest(result: TestResult, runtests: RunTests) -> None: + # Capture stdout and stderr, set faulthandler timeout, + # and create JUnit XML report. + verbose = runtests.verbose + output_on_failure = runtests.output_on_failure + timeout = runtests.timeout + + use_timeout = ( + timeout is not None and threading_helper.can_start_thread + ) + if use_timeout: + faulthandler.dump_traceback_later(timeout, exit=True) + + try: + setup_tests(runtests) + + if output_on_failure: + support.verbose = True + + stream = io.StringIO() + orig_stdout = sys.stdout + orig_stderr = sys.stderr + print_warning = support.print_warning + orig_print_warnings_stderr = print_warning.orig_stderr + + output = None + try: + sys.stdout = stream + sys.stderr = stream + # print_warning() writes into the temporary stream to preserve + # messages order. If support.environment_altered becomes true, + # warnings will be written to sys.stderr below. + print_warning.orig_stderr = stream + + _runtest_env_changed_exc(result, runtests, display_failure=False) + # Ignore output if the test passed successfully + if result.state != State.PASSED: + output = stream.getvalue() + finally: + sys.stdout = orig_stdout + sys.stderr = orig_stderr + print_warning.orig_stderr = orig_print_warnings_stderr + + if output is not None: + sys.stderr.write(output) + sys.stderr.flush() + else: + # Tell tests to be moderately quiet + support.verbose = verbose + _runtest_env_changed_exc(result, runtests, + display_failure=not verbose) + + xml_list = support.junit_xml_list + if xml_list: + import xml.etree.ElementTree as ET + result.xml_data = [ET.tostring(x).decode('us-ascii') + for x in xml_list] + finally: + if use_timeout: + faulthandler.cancel_dump_traceback_later() + support.junit_xml_list = None + + +def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult: + """Run a single test. + + test_name -- the name of the test + + Returns a TestResult. + + If runtests.use_junit, xml_data is a list containing each generated + testsuite element. + """ + start_time = time.perf_counter() + result = TestResult(test_name) + pgo = runtests.pgo + try: + _runtest(result, runtests) + except: + if not pgo: + msg = traceback.format_exc() + print(f"test {test_name} crashed -- {msg}", + file=sys.stderr, flush=True) + result.state = State.UNCAUGHT_EXC + + sys.stdout.flush() + sys.stderr.flush() + + result.duration = time.perf_counter() - start_time + return result diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py index 49f3a23..b5bbe0e 100644 --- a/Lib/test/libregrtest/utils.py +++ b/Lib/test/libregrtest/utils.py @@ -1,9 +1,59 @@ +import contextlib +import faulthandler +import locale import math import os.path +import platform +import random +import shlex +import signal +import subprocess import sys import sysconfig +import tempfile import textwrap +from collections.abc import Callable + from test import support +from test.support import os_helper +from test.support import threading_helper + + +# All temporary files and temporary directories created by libregrtest should +# use TMP_PREFIX so cleanup_temp_dir() can remove them all. +TMP_PREFIX = 'test_python_' +WORK_DIR_PREFIX = TMP_PREFIX +WORKER_WORK_DIR_PREFIX = WORK_DIR_PREFIX + 'worker_' + +# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()). +# Used to protect against threading._shutdown() hang. +# Must be smaller than buildbot "1200 seconds without output" limit. +EXIT_TIMEOUT = 120.0 + + +ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network', + 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui', 'walltime') + +# Other resources excluded from --use=all: +# +# - extralagefile (ex: test_zipfile64): really too slow to be enabled +# "by default" +# - tzdata: while needed to validate fully test_datetime, it makes +# test_datetime too slow (15-20 min on some buildbots) and so is disabled by +# default (see bpo-30822). +RESOURCE_NAMES = ALL_RESOURCES + ('extralargefile', 'tzdata') + + +# Types for types hints +StrPath = str +TestName = str +StrJSON = str +TestTuple = tuple[TestName, ...] +TestList = list[TestName] +# --match and --ignore options: list of patterns +# ('*' joker character can be used) +FilterTuple = tuple[TestName, ...] +FilterDict = dict[TestName, FilterTuple] def format_duration(seconds): @@ -31,7 +81,7 @@ def format_duration(seconds): return ' '.join(parts) -def removepy(names): +def strip_py_suffix(names: list[str] | None) -> None: if not names: return for idx, name in enumerate(names): @@ -40,11 +90,20 @@ def removepy(names): names[idx] = basename +def plural(n, singular, plural=None): + if n == 1: + return singular + elif plural is not None: + return plural + else: + return singular + 's' + + def count(n, word): if n == 1: - return "%d %s" % (n, word) + return f"{n} {word}" else: - return "%d %ss" % (n, word) + return f"{n} {word}s" def printlist(x, width=70, indent=4, file=None): @@ -125,15 +184,6 @@ def clear_caches(): if stream is not None: stream.flush() - # Clear assorted module caches. - # Don't worry about resetting the cache if the module is not loaded - try: - distutils_dir_util = sys.modules['distutils.dir_util'] - except KeyError: - pass - else: - distutils_dir_util._path_created.clear() - try: re = sys.modules['re'] except KeyError: @@ -212,6 +262,13 @@ def clear_caches(): for f in typing._cleanups: f() + try: + fractions = sys.modules['fractions'] + except KeyError: + pass + else: + fractions._hash_algorithm.cache_clear() + def get_build_info(): # Get most important configure and build options as a list of strings. @@ -292,3 +349,331 @@ def get_build_info(): build.append("dtrace") return build + + +def get_temp_dir(tmp_dir: StrPath | None = None) -> StrPath: + if tmp_dir: + tmp_dir = os.path.expanduser(tmp_dir) + else: + # When tests are run from the Python build directory, it is best practice + # to keep the test files in a subfolder. This eases the cleanup of leftover + # files using the "make distclean" command. + if sysconfig.is_python_build(): + if not support.is_wasi: + tmp_dir = sysconfig.get_config_var('abs_builddir') + if tmp_dir is None: + tmp_dir = sysconfig.get_config_var('abs_srcdir') + if not tmp_dir: + # gh-74470: On Windows, only srcdir is available. Using + # abs_builddir mostly matters on UNIX when building + # Python out of the source tree, especially when the + # source tree is read only. + tmp_dir = sysconfig.get_config_var('srcdir') + tmp_dir = os.path.join(tmp_dir, 'build') + else: + # WASI platform + tmp_dir = sysconfig.get_config_var('projectbase') + tmp_dir = os.path.join(tmp_dir, 'build') + + # When get_temp_dir() is called in a worker process, + # get_temp_dir() path is different than in the parent process + # which is not a WASI process. So the parent does not create + # the same "tmp_dir" than the test worker process. + os.makedirs(tmp_dir, exist_ok=True) + else: + tmp_dir = tempfile.gettempdir() + + return os.path.abspath(tmp_dir) + + +def fix_umask(): + if support.is_emscripten: + # Emscripten has default umask 0o777, which breaks some tests. + # see https://github.com/emscripten-core/emscripten/issues/17269 + old_mask = os.umask(0) + if old_mask == 0o777: + os.umask(0o027) + else: + os.umask(old_mask) + + +def get_work_dir(parent_dir: StrPath, worker: bool = False) -> StrPath: + # Define a writable temp dir that will be used as cwd while running + # the tests. The name of the dir includes the pid to allow parallel + # testing (see the -j option). + # Emscripten and WASI have stubbed getpid(), Emscripten has only + # milisecond clock resolution. Use randint() instead. + if support.is_emscripten or support.is_wasi: + nounce = random.randint(0, 1_000_000) + else: + nounce = os.getpid() + + if worker: + work_dir = WORK_DIR_PREFIX + str(nounce) + else: + work_dir = WORKER_WORK_DIR_PREFIX + str(nounce) + work_dir += os_helper.FS_NONASCII + work_dir = os.path.join(parent_dir, work_dir) + return work_dir + + +@contextlib.contextmanager +def exit_timeout(): + try: + yield + except SystemExit as exc: + # bpo-38203: Python can hang at exit in Py_Finalize(), especially + # on threading._shutdown() call: put a timeout + if threading_helper.can_start_thread: + faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True) + sys.exit(exc.code) + + +def remove_testfn(test_name: TestName, verbose: int) -> None: + # Try to clean up os_helper.TESTFN if left behind. + # + # While tests shouldn't leave any files or directories behind, when a test + # fails that can be tedious for it to arrange. The consequences can be + # especially nasty on Windows, since if a test leaves a file open, it + # cannot be deleted by name (while there's nothing we can do about that + # here either, we can display the name of the offending test, which is a + # real help). + name = os_helper.TESTFN + if not os.path.exists(name): + return + + nuker: Callable[[str], None] + if os.path.isdir(name): + import shutil + kind, nuker = "directory", shutil.rmtree + elif os.path.isfile(name): + kind, nuker = "file", os.unlink + else: + raise RuntimeError(f"os.path says {name!r} exists but is neither " + f"directory nor file") + + if verbose: + print_warning(f"{test_name} left behind {kind} {name!r}") + support.environment_altered = True + + try: + import stat + # fix possible permissions problems that might prevent cleanup + os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + nuker(name) + except Exception as exc: + print_warning(f"{test_name} left behind {kind} {name!r} " + f"and it couldn't be removed: {exc}") + + +def abs_module_name(test_name: TestName, test_dir: StrPath | None) -> TestName: + if test_name.startswith('test.') or test_dir: + return test_name + else: + # Import it from the test package + return 'test.' + test_name + + +# gh-90681: When rerunning tests, we might need to rerun the whole +# class or module suite if some its life-cycle hooks fail. +# Test level hooks are not affected. +_TEST_LIFECYCLE_HOOKS = frozenset(( + 'setUpClass', 'tearDownClass', + 'setUpModule', 'tearDownModule', +)) + +def normalize_test_name(test_full_name, *, is_error=False): + short_name = test_full_name.split(" ")[0] + if is_error and short_name in _TEST_LIFECYCLE_HOOKS: + if test_full_name.startswith(('setUpModule (', 'tearDownModule (')): + # if setUpModule() or tearDownModule() failed, don't filter + # tests with the test file name, don't use use filters. + return None + + # This means that we have a failure in a life-cycle hook, + # we need to rerun the whole module or class suite. + # Basically the error looks like this: + # ERROR: setUpClass (test.test_reg_ex.RegTest) + # or + # ERROR: setUpModule (test.test_reg_ex) + # So, we need to parse the class / module name. + lpar = test_full_name.index('(') + rpar = test_full_name.index(')') + return test_full_name[lpar + 1: rpar].split('.')[-1] + return short_name + + +def adjust_rlimit_nofile(): + """ + On macOS the default fd limit (RLIMIT_NOFILE) is sometimes too low (256) + for our test suite to succeed. Raise it to something more reasonable. 1024 + is a common Linux default. + """ + try: + import resource + except ImportError: + return + + fd_limit, max_fds = resource.getrlimit(resource.RLIMIT_NOFILE) + + desired_fds = 1024 + + if fd_limit < desired_fds and fd_limit < max_fds: + new_fd_limit = min(desired_fds, max_fds) + try: + resource.setrlimit(resource.RLIMIT_NOFILE, + (new_fd_limit, max_fds)) + print(f"Raised RLIMIT_NOFILE: {fd_limit} -> {new_fd_limit}") + except (ValueError, OSError) as err: + print_warning(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to " + f"{new_fd_limit}: {err}.") + + +def get_host_runner(): + if (hostrunner := os.environ.get("_PYTHON_HOSTRUNNER")) is None: + hostrunner = sysconfig.get_config_var("HOSTRUNNER") + return hostrunner + + +def is_cross_compiled(): + return ('_PYTHON_HOST_PLATFORM' in os.environ) + + +def format_resources(use_resources: tuple[str, ...]): + use_resources = set(use_resources) + all_resources = set(ALL_RESOURCES) + + # Express resources relative to "all" + relative_all = ['all'] + for name in sorted(all_resources - use_resources): + relative_all.append(f'-{name}') + for name in sorted(use_resources - all_resources): + relative_all.append(f'{name}') + all_text = ','.join(relative_all) + all_text = f"resources: {all_text}" + + # List of enabled resources + text = ','.join(sorted(use_resources)) + text = f"resources ({len(use_resources)}): {text}" + + # Pick the shortest string (prefer relative to all if lengths are equal) + if len(all_text) <= len(text): + return all_text + else: + return text + + +def process_cpu_count(): + if hasattr(os, 'sched_getaffinity'): + return len(os.sched_getaffinity(0)) + else: + return os.cpu_count() + + +def display_header(use_resources: tuple[str, ...], + python_cmd: tuple[str, ...] | None): + # Print basic platform information + print("==", platform.python_implementation(), *sys.version.split()) + print("==", platform.platform(aliased=True), + "%s-endian" % sys.byteorder) + print("== Python build:", ' '.join(get_build_info())) + print("== cwd:", os.getcwd()) + + cpu_count = os.cpu_count() + if cpu_count: + affinity = process_cpu_count() + if affinity and affinity != cpu_count: + cpu_count = f"{affinity} (process) / {cpu_count} (system)" + print("== CPU count:", cpu_count) + print("== encodings: locale=%s FS=%s" + % (locale.getencoding(), sys.getfilesystemencoding())) + + if use_resources: + text = format_resources(use_resources) + print(f"== {text}") + else: + print("== resources: all test resources are disabled, " + "use -u option to unskip tests") + + cross_compile = is_cross_compiled() + if cross_compile: + print("== cross compiled: Yes") + if python_cmd: + cmd = shlex.join(python_cmd) + print(f"== host python: {cmd}") + + get_cmd = [*python_cmd, '-m', 'platform'] + proc = subprocess.run( + get_cmd, + stdout=subprocess.PIPE, + text=True, + cwd=os_helper.SAVEDCWD) + stdout = proc.stdout.replace('\n', ' ').strip() + if stdout: + print(f"== host platform: {stdout}") + elif proc.returncode: + print(f"== host platform: ") + else: + hostrunner = get_host_runner() + if hostrunner: + print(f"== host runner: {hostrunner}") + + # This makes it easier to remember what to set in your local + # environment when trying to reproduce a sanitizer failure. + asan = support.check_sanitizer(address=True) + msan = support.check_sanitizer(memory=True) + ubsan = support.check_sanitizer(ub=True) + sanitizers = [] + if asan: + sanitizers.append("address") + if msan: + sanitizers.append("memory") + if ubsan: + sanitizers.append("undefined behavior") + if sanitizers: + print(f"== sanitizers: {', '.join(sanitizers)}") + for sanitizer, env_var in ( + (asan, "ASAN_OPTIONS"), + (msan, "MSAN_OPTIONS"), + (ubsan, "UBSAN_OPTIONS"), + ): + options= os.environ.get(env_var) + if sanitizer and options is not None: + print(f"== {env_var}={options!r}") + + print(flush=True) + + +def cleanup_temp_dir(tmp_dir: StrPath): + import glob + + path = os.path.join(glob.escape(tmp_dir), TMP_PREFIX + '*') + print("Cleanup %s directory" % tmp_dir) + for name in glob.glob(path): + if os.path.isdir(name): + print("Remove directory: %s" % name) + os_helper.rmtree(name) + else: + print("Remove file: %s" % name) + os_helper.unlink(name) + +WINDOWS_STATUS = { + 0xC0000005: "STATUS_ACCESS_VIOLATION", + 0xC00000FD: "STATUS_STACK_OVERFLOW", + 0xC000013A: "STATUS_CONTROL_C_EXIT", +} + +def get_signal_name(exitcode): + if exitcode < 0: + signum = -exitcode + try: + return signal.Signals(signum).name + except ValueError: + pass + + try: + return WINDOWS_STATUS[exitcode] + except KeyError: + pass + + return None diff --git a/Lib/test/libregrtest/worker.py b/Lib/test/libregrtest/worker.py new file mode 100644 index 0000000..a9c8be0 --- /dev/null +++ b/Lib/test/libregrtest/worker.py @@ -0,0 +1,116 @@ +import subprocess +import sys +import os +from typing import Any, NoReturn + +from test import support +from test.support import os_helper + +from .setup import setup_process, setup_test_dir +from .runtests import RunTests, JsonFile, JsonFileType +from .single import run_single_test +from .utils import ( + StrPath, StrJSON, FilterTuple, + get_temp_dir, get_work_dir, exit_timeout) + + +USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg")) + + +def create_worker_process(runtests: RunTests, output_fd: int, + tmp_dir: StrPath | None = None) -> subprocess.Popen: + python_cmd = runtests.python_cmd + worker_json = runtests.as_json() + + python_opts = support.args_from_interpreter_flags() + if python_cmd is not None: + executable = python_cmd + # Remove -E option, since --python=COMMAND can set PYTHON environment + # variables, such as PYTHONPATH, in the worker process. + python_opts = [opt for opt in python_opts if opt != "-E"] + else: + executable = (sys.executable,) + cmd = [*executable, *python_opts, + '-u', # Unbuffered stdout and stderr + '-m', 'test.libregrtest.worker', + worker_json] + + env = dict(os.environ) + if tmp_dir is not None: + env['TMPDIR'] = tmp_dir + env['TEMP'] = tmp_dir + env['TMP'] = tmp_dir + + # Running the child from the same working directory as regrtest's original + # invocation ensures that TEMPDIR for the child is the same when + # sysconfig.is_python_build() is true. See issue 15300. + # + # Emscripten and WASI Python must start in the Python source code directory + # to get 'python.js' or 'python.wasm' file. Then worker_process() changes + # to a temporary directory created to run tests. + work_dir = os_helper.SAVEDCWD + + kwargs: dict[str, Any] = dict( + env=env, + stdout=output_fd, + # bpo-45410: Write stderr into stdout to keep messages order + stderr=output_fd, + text=True, + close_fds=True, + cwd=work_dir, + ) + if USE_PROCESS_GROUP: + kwargs['start_new_session'] = True + + # Pass json_file to the worker process + json_file = runtests.json_file + json_file.configure_subprocess(kwargs) + + with json_file.inherit_subprocess(): + return subprocess.Popen(cmd, **kwargs) + + +def worker_process(worker_json: StrJSON) -> NoReturn: + runtests = RunTests.from_json(worker_json) + test_name = runtests.tests[0] + match_tests: FilterTuple | None = runtests.match_tests + json_file: JsonFile = runtests.json_file + + setup_test_dir(runtests.test_dir) + setup_process() + + if runtests.rerun: + if match_tests: + matching = "matching: " + ", ".join(match_tests) + print(f"Re-running {test_name} in verbose mode ({matching})", flush=True) + else: + print(f"Re-running {test_name} in verbose mode", flush=True) + + result = run_single_test(test_name, runtests) + + if json_file.file_type == JsonFileType.STDOUT: + print() + result.write_json_into(sys.stdout) + else: + with json_file.open('w', encoding='utf-8') as json_fp: + result.write_json_into(json_fp) + + sys.exit(0) + + +def main(): + if len(sys.argv) != 2: + print("usage: python -m test.libregrtest.worker JSON") + sys.exit(1) + worker_json = sys.argv[1] + + tmp_dir = get_temp_dir() + work_dir = get_work_dir(tmp_dir, worker=True) + + with exit_timeout(): + with os_helper.temp_cwd(work_dir, quiet=True): + worker_process(worker_json) + + +if __name__ == "__main__": + main() diff --git a/Lib/test/pythoninfo.py b/Lib/test/pythoninfo.py index 3dbcac2..4f3ebb1 100644 --- a/Lib/test/pythoninfo.py +++ b/Lib/test/pythoninfo.py @@ -1,18 +1,13 @@ """ Collect various information about Python to help debugging test failures. """ -from __future__ import print_function import errno import re import sys import traceback -import unittest import warnings -MS_WINDOWS = (sys.platform == 'win32') - - def normalize_text(text): if text is None: return None @@ -244,6 +239,7 @@ def collect_os(info_add): 'getresgid', 'getresuid', 'getuid', + 'process_cpu_count', 'uname', ): call_func(info_add, 'os.%s' % func, os, func) @@ -273,6 +269,7 @@ def collect_os(info_add): "ARCHFLAGS", "ARFLAGS", "AUDIODEV", + "BUILDPYTHON", "CC", "CFLAGS", "COLUMNS", @@ -317,7 +314,6 @@ def collect_os(info_add): "TEMP", "TERM", "TILE_LIBRARY", - "TIX_LIBRARY", "TMP", "TMPDIR", "TRAVIS", @@ -326,6 +322,7 @@ def collect_os(info_add): "VIRTUAL_ENV", "WAYLAND_DISPLAY", "WINDIR", + "_PYTHON_HOSTRUNNER", "_PYTHON_HOST_PLATFORM", "_PYTHON_PROJECT_BASE", "_PYTHON_SYSCONFIGDATA_NAME", @@ -341,7 +338,8 @@ def collect_os(info_add): for name, value in os.environ.items(): uname = name.upper() if (uname in ENV_VARS - # Copy PYTHON* and LC_* variables + # Copy PYTHON* variables like PYTHONPATH + # Copy LC_* variables like LC_ALL or uname.startswith(("PYTHON", "LC_")) # Visual Studio: VS140COMNTOOLS or (uname.startswith("VS") and uname.endswith("COMNTOOLS"))): @@ -494,13 +492,10 @@ def collect_datetime(info_add): def collect_sysconfig(info_add): - # On Windows, sysconfig is not reliable to get macros used - # to build Python - if MS_WINDOWS: - return - import sysconfig + info_add('sysconfig.is_python_build', sysconfig.is_python_build()) + for name in ( 'ABIFLAGS', 'ANDROID_API_LEVEL', @@ -509,6 +504,7 @@ def collect_sysconfig(info_add): 'CFLAGS', 'CFLAGSFORSHARED', 'CONFIG_ARGS', + 'HOSTRUNNER', 'HOST_GNU_TYPE', 'MACHDEP', 'MULTIARCH', @@ -634,7 +630,7 @@ def collect_sqlite(info_add): except ImportError: return - attributes = ('version', 'sqlite_version') + attributes = ('sqlite_version',) copy_attributes(info_add, sqlite3, 'sqlite3.%s', attributes) @@ -674,7 +670,29 @@ def collect_testcapi(info_add): except ImportError: return - call_func(info_add, 'pymem.allocator', _testcapi, 'pymem_getallocatorsname') + for name in ( + 'LONG_MAX', # always 32-bit on Windows, 64-bit on 64-bit Unix + 'PY_SSIZE_T_MAX', + 'Py_C_RECURSION_LIMIT', + 'SIZEOF_TIME_T', # 32-bit or 64-bit depending on the platform + 'SIZEOF_WCHAR_T', # 16-bit or 32-bit depending on the platform + ): + copy_attr(info_add, f'_testcapi.{name}', _testcapi, name) + + +def collect_testinternalcapi(info_add): + try: + import _testinternalcapi + except ImportError: + return + + call_func(info_add, 'pymem.allocator', _testinternalcapi, 'pymem_getallocatorsname') + + for name in ( + 'SIZEOF_PYGC_HEAD', + 'SIZEOF_PYOBJECT', + ): + copy_attr(info_add, f'_testinternalcapi.{name}', _testinternalcapi, name) def collect_resource(info_add): @@ -693,6 +711,7 @@ def collect_resource(info_add): def collect_test_socket(info_add): + import unittest try: from test import test_socket except (ImportError, unittest.SkipTest): @@ -704,15 +723,12 @@ def collect_test_socket(info_add): copy_attributes(info_add, test_socket, 'test_socket.%s', attributes) -def collect_test_support(info_add): +def collect_support(info_add): try: from test import support except ImportError: return - attributes = ('IPV6_ENABLED',) - copy_attributes(info_add, support, 'test_support.%s', attributes) - attributes = ( 'MS_WINDOWS', 'has_fork_support', @@ -726,17 +742,64 @@ def collect_test_support(info_add): ) copy_attributes(info_add, support, 'support.%s', attributes) - call_func(info_add, 'test_support._is_gui_available', support, '_is_gui_available') - call_func(info_add, 'test_support.python_is_optimized', support, 'python_is_optimized') + call_func(info_add, 'support._is_gui_available', support, '_is_gui_available') + call_func(info_add, 'support.python_is_optimized', support, 'python_is_optimized') - info_add('test_support.check_sanitizer(address=True)', + info_add('support.check_sanitizer(address=True)', support.check_sanitizer(address=True)) - info_add('test_support.check_sanitizer(memory=True)', + info_add('support.check_sanitizer(memory=True)', support.check_sanitizer(memory=True)) - info_add('test_support.check_sanitizer(ub=True)', + info_add('support.check_sanitizer(ub=True)', support.check_sanitizer(ub=True)) +def collect_support_os_helper(info_add): + try: + from test.support import os_helper + except ImportError: + return + + for name in ( + 'can_symlink', + 'can_xattr', + 'can_chmod', + 'can_dac_override', + ): + func = getattr(os_helper, name) + info_add(f'support_os_helper.{name}', func()) + + +def collect_support_socket_helper(info_add): + try: + from test.support import socket_helper + except ImportError: + return + + attributes = ( + 'IPV6_ENABLED', + 'has_gethostname', + ) + copy_attributes(info_add, socket_helper, 'support_socket_helper.%s', attributes) + + for name in ( + 'tcp_blackhole', + ): + func = getattr(socket_helper, name) + info_add(f'support_socket_helper.{name}', func()) + + +def collect_support_threading_helper(info_add): + try: + from test.support import threading_helper + except ImportError: + return + + attributes = ( + 'can_start_thread', + ) + copy_attributes(info_add, threading_helper, 'support_threading_helper.%s', attributes) + + def collect_cc(info_add): import subprocess import sysconfig @@ -891,6 +954,12 @@ def collect_fips(info_add): pass +def collect_tempfile(info_add): + import tempfile + + info_add('tempfile.gettempdir', tempfile.gettempdir()) + + def collect_libregrtest_utils(info_add): try: from test.libregrtest import utils @@ -933,6 +1002,8 @@ def collect_info(info): collect_sys, collect_sysconfig, collect_testcapi, + collect_testinternalcapi, + collect_tempfile, collect_time, collect_tkinter, collect_windows, @@ -941,7 +1012,10 @@ def collect_info(info): # Collecting from tests should be last as they have side effects. collect_test_socket, - collect_test_support, + collect_support, + collect_support_os_helper, + collect_support_socket_helper, + collect_support_threading_helper, ): try: collect_func(info_add) diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py index 0ffb3ed..46a74fe 100755 --- a/Lib/test/regrtest.py +++ b/Lib/test/regrtest.py @@ -8,7 +8,7 @@ Run this script with -h or --help for documentation. import os import sys -from test.libregrtest import main +from test.libregrtest.main import main # Alias for backward compatibility (just in case) diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index f5307b6..9066f0c 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -431,6 +431,14 @@ def skip_if_sanitizer(reason=None, *, address=False, memory=False, ub=False): HAVE_ASAN_FORK_BUG = check_sanitizer(address=True) +def set_sanitizer_env_var(env, option): + for name in ('ASAN_OPTIONS', 'MSAN_OPTIONS', 'UBSAN_OPTIONS'): + if name in env: + env[name] += f':{option}' + else: + env[name] = option + + def system_must_validate_cert(f): """Skip the test on TLS certificate validation failures.""" @functools.wraps(f) @@ -892,27 +900,31 @@ _4G = 4 * _1G MAX_Py_ssize_t = sys.maxsize -def set_memlimit(limit): - global max_memuse - global real_max_memuse +def _parse_memlimit(limit: str) -> int: sizes = { 'k': 1024, 'm': _1M, 'g': _1G, 't': 1024*_1G, } - m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, + m = re.match(r'(\d+(?:\.\d+)?) (K|M|G|T)b?$', limit, re.IGNORECASE | re.VERBOSE) if m is None: - raise ValueError('Invalid memory limit %r' % (limit,)) - memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) - real_max_memuse = memlimit - if memlimit > MAX_Py_ssize_t: - memlimit = MAX_Py_ssize_t + raise ValueError(f'Invalid memory limit: {limit!r}') + return int(float(m.group(1)) * sizes[m.group(2).lower()]) + +def set_memlimit(limit: str) -> None: + global max_memuse + global real_max_memuse + memlimit = _parse_memlimit(limit) if memlimit < _2G - 1: - raise ValueError('Memory limit %r too low to be useful' % (limit,)) + raise ValueError('Memory limit {limit!r} too low to be useful') + + real_max_memuse = memlimit + memlimit = min(memlimit, MAX_Py_ssize_t) max_memuse = memlimit + class _MemoryWatchdog: """An object which periodically watches the process' memory consumption and prints it out. @@ -1187,7 +1199,6 @@ def _is_full_match_test(pattern): def set_match_tests(accept_patterns=None, ignore_patterns=None): global _match_test_func, _accept_test_patterns, _ignore_test_patterns - if accept_patterns is None: accept_patterns = () if ignore_patterns is None: @@ -2139,31 +2150,26 @@ def wait_process(pid, *, exitcode, timeout=None): if timeout is None: timeout = LONG_TIMEOUT - t0 = time.monotonic() - sleep = 0.001 - max_sleep = 0.1 - while True: + + start_time = time.monotonic() + for _ in sleeping_retry(timeout, error=False): pid2, status = os.waitpid(pid, os.WNOHANG) if pid2 != 0: break - # process is still running - - dt = time.monotonic() - t0 - if dt > timeout: - try: - os.kill(pid, signal.SIGKILL) - os.waitpid(pid, 0) - except OSError: - # Ignore errors like ChildProcessError or PermissionError - pass - - raise AssertionError(f"process {pid} is still running " - f"after {dt:.1f} seconds") + # rety: the process is still running + else: + try: + os.kill(pid, signal.SIGKILL) + os.waitpid(pid, 0) + except OSError: + # Ignore errors like ChildProcessError or PermissionError + pass - sleep = min(sleep * 2, max_sleep) - time.sleep(sleep) + dt = time.monotonic() - start_time + raise AssertionError(f"process {pid} is still running " + f"after {dt:.1f} seconds") else: - # Windows implementation + # Windows implementation: don't support timeout :-( pid2, status = os.waitpid(pid, 0) exitcode2 = os.waitstatus_to_exitcode(status) @@ -2327,6 +2333,11 @@ def requires_venv_with_pip(): return unittest.skipUnless(ctypes, 'venv: pip requires ctypes') +# True if Python is built with the Py_DEBUG macro defined: if +# Python is built in debug mode (./configure --with-pydebug). +Py_DEBUG = hasattr(sys, 'gettotalrefcount') + + def busy_retry(timeout, err_msg=None, /, *, error=True): """ Run the loop body until "break" stops the loop. diff --git a/Lib/test/support/os_helper.py b/Lib/test/support/os_helper.py index f599cc7..f77cce8 100644 --- a/Lib/test/support/os_helper.py +++ b/Lib/test/support/os_helper.py @@ -567,7 +567,7 @@ def fs_is_case_insensitive(directory): class FakePath: - """Simple implementing of the path protocol. + """Simple implementation of the path protocol. """ def __init__(self, path): self.path = path diff --git a/Lib/test/support/testresult.py b/Lib/test/support/testresult.py index 2cd1366..c5c297f 100644 --- a/Lib/test/support/testresult.py +++ b/Lib/test/support/testresult.py @@ -8,6 +8,7 @@ import sys import time import traceback import unittest +from test import support class RegressionTestResult(unittest.TextTestResult): USE_XML = False @@ -109,6 +110,8 @@ class RegressionTestResult(unittest.TextTestResult): def addFailure(self, test, err): self._add_result(test, True, failure=self.__makeErrorDict(*err)) super().addFailure(test, err) + if support.failfast: + self.stop() def addSkip(self, test, reason): self._add_result(test, skipped=reason) diff --git a/Lib/test/support/threading_helper.py b/Lib/test/support/threading_helper.py index 26cbc6f..b9973c8 100644 --- a/Lib/test/support/threading_helper.py +++ b/Lib/test/support/threading_helper.py @@ -88,19 +88,17 @@ def wait_threads_exit(timeout=None): yield finally: start_time = time.monotonic() - deadline = start_time + timeout - while True: + for _ in support.sleeping_retry(timeout, error=False): + support.gc_collect() count = _thread._count() if count <= old_count: break - if time.monotonic() > deadline: - dt = time.monotonic() - start_time - msg = (f"wait_threads() failed to cleanup {count - old_count} " - f"threads after {dt:.1f} seconds " - f"(count: {count}, old count: {old_count})") - raise AssertionError(msg) - time.sleep(0.010) - support.gc_collect() + else: + dt = time.monotonic() - start_time + msg = (f"wait_threads() failed to cleanup {count - old_count} " + f"threads after {dt:.1f} seconds " + f"(count: {count}, old count: {old_count})") + raise AssertionError(msg) def join_thread(thread, timeout=None): diff --git a/Lib/test/test_faulthandler.py b/Lib/test/test_faulthandler.py index 50b40d3..2d19064 100644 --- a/Lib/test/test_faulthandler.py +++ b/Lib/test/test_faulthandler.py @@ -8,7 +8,6 @@ import subprocess import sys from test import support from test.support import os_helper, script_helper, is_android, MS_WINDOWS -from test.support import skip_if_sanitizer import tempfile import unittest from textwrap import dedent @@ -34,7 +33,7 @@ def expected_traceback(lineno1, lineno2, header, min_count=1): return '^' + regex + '$' def skip_segfault_on_android(test): - # Issue #32138: Raising SIGSEGV on Android may not cause a crash. + # gh-76319: Raising SIGSEGV on Android may not cause a crash. return unittest.skipIf(is_android, 'raising SIGSEGV on Android is unreliable')(test) @@ -62,8 +61,16 @@ class FaultHandlerTests(unittest.TestCase): pass_fds = [] if fd is not None: pass_fds.append(fd) + env = dict(os.environ) + + # Sanitizers must not handle SIGSEGV (ex: for test_enable_fd()) + option = 'handle_segv=0' + support.set_sanitizer_env_var(env, option) + with support.SuppressCrashReport(): - process = script_helper.spawn_python('-c', code, pass_fds=pass_fds) + process = script_helper.spawn_python('-c', code, + pass_fds=pass_fds, + env=env) with process: output, stderr = process.communicate() exitcode = process.wait() @@ -302,8 +309,6 @@ class FaultHandlerTests(unittest.TestCase): 3, 'Segmentation fault') - @skip_if_sanitizer(memory=True, ub=True, reason="sanitizer " - "builds change crashing process output.") @skip_segfault_on_android def test_enable_file(self): with temporary_filename() as filename: @@ -319,8 +324,6 @@ class FaultHandlerTests(unittest.TestCase): @unittest.skipIf(sys.platform == "win32", "subprocess doesn't support pass_fds on Windows") - @skip_if_sanitizer(memory=True, ub=True, reason="sanitizer " - "builds change crashing process output.") @skip_segfault_on_android def test_enable_fd(self): with tempfile.TemporaryFile('wb+') as fp: diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py index 88c59b7..45573a2 100644 --- a/Lib/test/test_regrtest.py +++ b/Lib/test/test_regrtest.py @@ -5,28 +5,33 @@ Note: test_regrtest cannot be run twice in parallel. """ import contextlib +import dataclasses import glob import io import locale import os.path import platform +import random import re +import shlex +import signal import subprocess import sys import sysconfig import tempfile import textwrap -import time import unittest -from test import libregrtest from test import support from test.support import os_helper, TestStats -from test.libregrtest import utils, setup +from test.libregrtest import cmdline +from test.libregrtest import main +from test.libregrtest import setup +from test.libregrtest import utils +from test.libregrtest.utils import normalize_test_name if not support.has_subprocess_support: raise unittest.SkipTest("test module requires subprocess") -Py_DEBUG = hasattr(sys, 'gettotalrefcount') ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..') ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR)) LOG_PREFIX = r'[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?' @@ -34,6 +39,7 @@ LOG_PREFIX = r'[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?' EXITCODE_BAD_TEST = 2 EXITCODE_ENV_CHANGED = 3 EXITCODE_NO_TESTS_RAN = 4 +EXITCODE_RERUN_FAIL = 5 EXITCODE_INTERRUPTED = 130 TEST_INTERRUPTED = textwrap.dedent(""" @@ -51,9 +57,13 @@ class ParseArgsTestCase(unittest.TestCase): Test regrtest's argument parsing, function _parse_args(). """ + @staticmethod + def parse_args(args): + return cmdline._parse_args(args) + def checkError(self, args, msg): with support.captured_stderr() as err, self.assertRaises(SystemExit): - libregrtest._parse_args(args) + self.parse_args(args) self.assertIn(msg, err.getvalue()) def test_help(self): @@ -61,83 +71,93 @@ class ParseArgsTestCase(unittest.TestCase): with self.subTest(opt=opt): with support.captured_stdout() as out, \ self.assertRaises(SystemExit): - libregrtest._parse_args([opt]) + self.parse_args([opt]) self.assertIn('Run Python regression tests.', out.getvalue()) def test_timeout(self): - ns = libregrtest._parse_args(['--timeout', '4.2']) + ns = self.parse_args(['--timeout', '4.2']) self.assertEqual(ns.timeout, 4.2) + + # negative, zero and empty string are treated as "no timeout" + for value in ('-1', '0', ''): + with self.subTest(value=value): + ns = self.parse_args([f'--timeout={value}']) + self.assertEqual(ns.timeout, None) + self.checkError(['--timeout'], 'expected one argument') - self.checkError(['--timeout', 'foo'], 'invalid float value') + self.checkError(['--timeout', 'foo'], 'invalid timeout value:') def test_wait(self): - ns = libregrtest._parse_args(['--wait']) + ns = self.parse_args(['--wait']) self.assertTrue(ns.wait) - def test_worker_args(self): - ns = libregrtest._parse_args(['--worker-args', '[[], {}]']) - self.assertEqual(ns.worker_args, '[[], {}]') - self.checkError(['--worker-args'], 'expected one argument') - def test_start(self): for opt in '-S', '--start': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt, 'foo']) + ns = self.parse_args([opt, 'foo']) self.assertEqual(ns.start, 'foo') self.checkError([opt], 'expected one argument') def test_verbose(self): - ns = libregrtest._parse_args(['-v']) + ns = self.parse_args(['-v']) self.assertEqual(ns.verbose, 1) - ns = libregrtest._parse_args(['-vvv']) + ns = self.parse_args(['-vvv']) self.assertEqual(ns.verbose, 3) - ns = libregrtest._parse_args(['--verbose']) + ns = self.parse_args(['--verbose']) self.assertEqual(ns.verbose, 1) - ns = libregrtest._parse_args(['--verbose'] * 3) + ns = self.parse_args(['--verbose'] * 3) self.assertEqual(ns.verbose, 3) - ns = libregrtest._parse_args([]) + ns = self.parse_args([]) self.assertEqual(ns.verbose, 0) - def test_verbose2(self): - for opt in '-w', '--verbose2': + def test_rerun(self): + for opt in '-w', '--rerun', '--verbose2': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt]) - self.assertTrue(ns.verbose2) + ns = self.parse_args([opt]) + self.assertTrue(ns.rerun) def test_verbose3(self): for opt in '-W', '--verbose3': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt]) + ns = self.parse_args([opt]) self.assertTrue(ns.verbose3) def test_quiet(self): for opt in '-q', '--quiet': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt]) + ns = self.parse_args([opt]) self.assertTrue(ns.quiet) self.assertEqual(ns.verbose, 0) def test_slowest(self): for opt in '-o', '--slowest': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt]) + ns = self.parse_args([opt]) self.assertTrue(ns.print_slow) def test_header(self): - ns = libregrtest._parse_args(['--header']) + ns = self.parse_args(['--header']) self.assertTrue(ns.header) - ns = libregrtest._parse_args(['--verbose']) + ns = self.parse_args(['--verbose']) self.assertTrue(ns.header) def test_randomize(self): for opt in '-r', '--randomize': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt]) + ns = self.parse_args([opt]) self.assertTrue(ns.randomize) + with os_helper.EnvironmentVarGuard() as env: + env['SOURCE_DATE_EPOCH'] = '1' + + ns = self.parse_args(['--randomize']) + regrtest = main.Regrtest(ns) + self.assertFalse(regrtest.randomize) + self.assertIsNone(regrtest.random_seed) + def test_randseed(self): - ns = libregrtest._parse_args(['--randseed', '12345']) + ns = self.parse_args(['--randseed', '12345']) self.assertEqual(ns.random_seed, 12345) self.assertTrue(ns.randomize) self.checkError(['--randseed'], 'expected one argument') @@ -146,7 +166,7 @@ class ParseArgsTestCase(unittest.TestCase): def test_fromfile(self): for opt in '-f', '--fromfile': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt, 'foo']) + ns = self.parse_args([opt, 'foo']) self.assertEqual(ns.fromfile, 'foo') self.checkError([opt], 'expected one argument') self.checkError([opt, 'foo', '-s'], "don't go together") @@ -154,20 +174,20 @@ class ParseArgsTestCase(unittest.TestCase): def test_exclude(self): for opt in '-x', '--exclude': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt]) + ns = self.parse_args([opt]) self.assertTrue(ns.exclude) def test_single(self): for opt in '-s', '--single': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt]) + ns = self.parse_args([opt]) self.assertTrue(ns.single) self.checkError([opt, '-f', 'foo'], "don't go together") def test_ignore(self): for opt in '-i', '--ignore': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt, 'pattern']) + ns = self.parse_args([opt, 'pattern']) self.assertEqual(ns.ignore_tests, ['pattern']) self.checkError([opt], 'expected one argument') @@ -177,7 +197,7 @@ class ParseArgsTestCase(unittest.TestCase): print('matchfile2', file=fp) filename = os.path.abspath(os_helper.TESTFN) - ns = libregrtest._parse_args(['-m', 'match', + ns = self.parse_args(['-m', 'match', '--ignorefile', filename]) self.assertEqual(ns.ignore_tests, ['matchfile1', 'matchfile2']) @@ -185,11 +205,11 @@ class ParseArgsTestCase(unittest.TestCase): def test_match(self): for opt in '-m', '--match': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt, 'pattern']) + ns = self.parse_args([opt, 'pattern']) self.assertEqual(ns.match_tests, ['pattern']) self.checkError([opt], 'expected one argument') - ns = libregrtest._parse_args(['-m', 'pattern1', + ns = self.parse_args(['-m', 'pattern1', '-m', 'pattern2']) self.assertEqual(ns.match_tests, ['pattern1', 'pattern2']) @@ -199,7 +219,7 @@ class ParseArgsTestCase(unittest.TestCase): print('matchfile2', file=fp) filename = os.path.abspath(os_helper.TESTFN) - ns = libregrtest._parse_args(['-m', 'match', + ns = self.parse_args(['-m', 'match', '--matchfile', filename]) self.assertEqual(ns.match_tests, ['match', 'matchfile1', 'matchfile2']) @@ -207,65 +227,65 @@ class ParseArgsTestCase(unittest.TestCase): def test_failfast(self): for opt in '-G', '--failfast': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt, '-v']) + ns = self.parse_args([opt, '-v']) self.assertTrue(ns.failfast) - ns = libregrtest._parse_args([opt, '-W']) + ns = self.parse_args([opt, '-W']) self.assertTrue(ns.failfast) self.checkError([opt], '-G/--failfast needs either -v or -W') def test_use(self): for opt in '-u', '--use': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt, 'gui,network']) + ns = self.parse_args([opt, 'gui,network']) self.assertEqual(ns.use_resources, ['gui', 'network']) - ns = libregrtest._parse_args([opt, 'gui,none,network']) + ns = self.parse_args([opt, 'gui,none,network']) self.assertEqual(ns.use_resources, ['network']) - expected = list(libregrtest.ALL_RESOURCES) + expected = list(cmdline.ALL_RESOURCES) expected.remove('gui') - ns = libregrtest._parse_args([opt, 'all,-gui']) + ns = self.parse_args([opt, 'all,-gui']) self.assertEqual(ns.use_resources, expected) self.checkError([opt], 'expected one argument') self.checkError([opt, 'foo'], 'invalid resource') # all + a resource not part of "all" - ns = libregrtest._parse_args([opt, 'all,tzdata']) + ns = self.parse_args([opt, 'all,tzdata']) self.assertEqual(ns.use_resources, - list(libregrtest.ALL_RESOURCES) + ['tzdata']) + list(cmdline.ALL_RESOURCES) + ['tzdata']) # test another resource which is not part of "all" - ns = libregrtest._parse_args([opt, 'extralargefile']) + ns = self.parse_args([opt, 'extralargefile']) self.assertEqual(ns.use_resources, ['extralargefile']) def test_memlimit(self): for opt in '-M', '--memlimit': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt, '4G']) + ns = self.parse_args([opt, '4G']) self.assertEqual(ns.memlimit, '4G') self.checkError([opt], 'expected one argument') def test_testdir(self): - ns = libregrtest._parse_args(['--testdir', 'foo']) + ns = self.parse_args(['--testdir', 'foo']) self.assertEqual(ns.testdir, os.path.join(os_helper.SAVEDCWD, 'foo')) self.checkError(['--testdir'], 'expected one argument') def test_runleaks(self): for opt in '-L', '--runleaks': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt]) + ns = self.parse_args([opt]) self.assertTrue(ns.runleaks) def test_huntrleaks(self): for opt in '-R', '--huntrleaks': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt, ':']) + ns = self.parse_args([opt, ':']) self.assertEqual(ns.huntrleaks, (5, 4, 'reflog.txt')) - ns = libregrtest._parse_args([opt, '6:']) + ns = self.parse_args([opt, '6:']) self.assertEqual(ns.huntrleaks, (6, 4, 'reflog.txt')) - ns = libregrtest._parse_args([opt, ':3']) + ns = self.parse_args([opt, ':3']) self.assertEqual(ns.huntrleaks, (5, 3, 'reflog.txt')) - ns = libregrtest._parse_args([opt, '6:3:leaks.log']) + ns = self.parse_args([opt, '6:3:leaks.log']) self.assertEqual(ns.huntrleaks, (6, 3, 'leaks.log')) self.checkError([opt], 'expected one argument') self.checkError([opt, '6'], @@ -276,7 +296,7 @@ class ParseArgsTestCase(unittest.TestCase): def test_multiprocess(self): for opt in '-j', '--multiprocess': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt, '2']) + ns = self.parse_args([opt, '2']) self.assertEqual(ns.use_mp, 2) self.checkError([opt], 'expected one argument') self.checkError([opt, 'foo'], 'invalid int value') @@ -286,13 +306,13 @@ class ParseArgsTestCase(unittest.TestCase): def test_coverage(self): for opt in '-T', '--coverage': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt]) + ns = self.parse_args([opt]) self.assertTrue(ns.trace) def test_coverdir(self): for opt in '-D', '--coverdir': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt, 'foo']) + ns = self.parse_args([opt, 'foo']) self.assertEqual(ns.coverdir, os.path.join(os_helper.SAVEDCWD, 'foo')) self.checkError([opt], 'expected one argument') @@ -300,13 +320,13 @@ class ParseArgsTestCase(unittest.TestCase): def test_nocoverdir(self): for opt in '-N', '--nocoverdir': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt]) + ns = self.parse_args([opt]) self.assertIsNone(ns.coverdir) def test_threshold(self): for opt in '-t', '--threshold': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt, '1000']) + ns = self.parse_args([opt, '1000']) self.assertEqual(ns.threshold, 1000) self.checkError([opt], 'expected one argument') self.checkError([opt, 'foo'], 'invalid int value') @@ -315,7 +335,7 @@ class ParseArgsTestCase(unittest.TestCase): for opt in '-n', '--nowindows': with self.subTest(opt=opt): with contextlib.redirect_stderr(io.StringIO()) as stderr: - ns = libregrtest._parse_args([opt]) + ns = self.parse_args([opt]) self.assertTrue(ns.nowindows) err = stderr.getvalue() self.assertIn('the --nowindows (-n) option is deprecated', err) @@ -323,39 +343,39 @@ class ParseArgsTestCase(unittest.TestCase): def test_forever(self): for opt in '-F', '--forever': with self.subTest(opt=opt): - ns = libregrtest._parse_args([opt]) + ns = self.parse_args([opt]) self.assertTrue(ns.forever) def test_unrecognized_argument(self): self.checkError(['--xxx'], 'usage:') def test_long_option__partial(self): - ns = libregrtest._parse_args(['--qui']) + ns = self.parse_args(['--qui']) self.assertTrue(ns.quiet) self.assertEqual(ns.verbose, 0) def test_two_options(self): - ns = libregrtest._parse_args(['--quiet', '--exclude']) + ns = self.parse_args(['--quiet', '--exclude']) self.assertTrue(ns.quiet) self.assertEqual(ns.verbose, 0) self.assertTrue(ns.exclude) def test_option_with_empty_string_value(self): - ns = libregrtest._parse_args(['--start', '']) + ns = self.parse_args(['--start', '']) self.assertEqual(ns.start, '') def test_arg(self): - ns = libregrtest._parse_args(['foo']) + ns = self.parse_args(['foo']) self.assertEqual(ns.args, ['foo']) def test_option_and_arg(self): - ns = libregrtest._parse_args(['--quiet', 'foo']) + ns = self.parse_args(['--quiet', 'foo']) self.assertTrue(ns.quiet) self.assertEqual(ns.verbose, 0) self.assertEqual(ns.args, ['foo']) def test_arg_option_arg(self): - ns = libregrtest._parse_args(['test_unaryop', '-v', 'test_binop']) + ns = self.parse_args(['test_unaryop', '-v', 'test_binop']) self.assertEqual(ns.verbose, 1) self.assertEqual(ns.args, ['test_unaryop', 'test_binop']) @@ -363,6 +383,62 @@ class ParseArgsTestCase(unittest.TestCase): self.checkError(['--unknown-option'], 'unrecognized arguments: --unknown-option') + def check_ci_mode(self, args, use_resources, rerun=True): + ns = cmdline._parse_args(args) + + # Check Regrtest attributes which are more reliable than Namespace + # which has an unclear API + regrtest = main.Regrtest(ns) + self.assertEqual(regrtest.num_workers, -1) + self.assertEqual(regrtest.want_rerun, rerun) + self.assertTrue(regrtest.randomize) + self.assertIsInstance(regrtest.random_seed, int) + self.assertTrue(regrtest.fail_env_changed) + self.assertTrue(regrtest.fail_rerun) + self.assertTrue(regrtest.print_slowest) + self.assertTrue(regrtest.output_on_failure) + self.assertEqual(sorted(regrtest.use_resources), sorted(use_resources)) + return regrtest + + def test_fast_ci(self): + args = ['--fast-ci'] + use_resources = sorted(cmdline.ALL_RESOURCES) + use_resources.remove('cpu') + regrtest = self.check_ci_mode(args, use_resources) + self.assertEqual(regrtest.timeout, 10 * 60) + + def test_fast_ci_python_cmd(self): + args = ['--fast-ci', '--python', 'python -X dev'] + use_resources = sorted(cmdline.ALL_RESOURCES) + use_resources.remove('cpu') + regrtest = self.check_ci_mode(args, use_resources, rerun=False) + self.assertEqual(regrtest.timeout, 10 * 60) + self.assertEqual(regrtest.python_cmd, ('python', '-X', 'dev')) + + def test_fast_ci_resource(self): + # it should be possible to override resources + args = ['--fast-ci', '-u', 'network'] + use_resources = ['network'] + self.check_ci_mode(args, use_resources) + + def test_slow_ci(self): + args = ['--slow-ci'] + use_resources = sorted(cmdline.ALL_RESOURCES) + regrtest = self.check_ci_mode(args, use_resources) + self.assertEqual(regrtest.timeout, 20 * 60) + + def test_dont_add_python_opts(self): + args = ['--dont-add-python-opts'] + ns = cmdline._parse_args(args) + self.assertFalse(ns._add_python_opts) + + +@dataclasses.dataclass(slots=True) +class Rerun: + name: str + match: str | None + success: bool + class BaseTestCase(unittest.TestCase): TEST_UNIQUE_ID = 1 @@ -411,10 +487,12 @@ class BaseTestCase(unittest.TestCase): self.fail("%r not found in %r" % (regex, output)) return match - def check_line(self, output, regex, full=False): + def check_line(self, output, pattern, full=False, regex=True): + if not regex: + pattern = re.escape(pattern) if full: - regex += '\n' - regex = re.compile(r'^' + regex, re.MULTILINE) + pattern += '\n' + regex = re.compile(r'^' + pattern, re.MULTILINE) self.assertRegex(output, regex) def parse_executed_tests(self, output): @@ -423,13 +501,14 @@ class BaseTestCase(unittest.TestCase): parser = re.finditer(regex, output, re.MULTILINE) return list(match.group(1) for match in parser) - def check_executed_tests(self, output, tests, skipped=(), failed=(), + def check_executed_tests(self, output, tests, *, stats, + skipped=(), failed=(), env_changed=(), omitted=(), - rerun={}, run_no_tests=(), + rerun=None, run_no_tests=(), resource_denied=(), - randomize=False, interrupted=False, + randomize=False, parallel=False, interrupted=False, fail_env_changed=False, - *, stats): + forever=False, filtered=False): if isinstance(tests, str): tests = [tests] if isinstance(skipped, str): @@ -446,12 +525,23 @@ class BaseTestCase(unittest.TestCase): run_no_tests = [run_no_tests] if isinstance(stats, int): stats = TestStats(stats) + if parallel: + randomize = True + + rerun_failed = [] + if rerun is not None and not env_changed: + failed = [rerun.name] + if not rerun.success: + rerun_failed.append(rerun.name) executed = self.parse_executed_tests(output) + total_tests = list(tests) + if rerun is not None: + total_tests.append(rerun.name) if randomize: - self.assertEqual(set(executed), set(tests), output) + self.assertEqual(set(executed), set(total_tests), output) else: - self.assertEqual(executed, tests, output) + self.assertEqual(executed, total_tests, output) def plural(count): return 's' if count != 1 else '' @@ -467,12 +557,17 @@ class BaseTestCase(unittest.TestCase): regex = list_regex('%s test%s skipped', skipped) self.check_line(output, regex) + if resource_denied: + regex = list_regex(r'%s test%s skipped \(resource denied\)', resource_denied) + self.check_line(output, regex) + if failed: regex = list_regex('%s test%s failed', failed) self.check_line(output, regex) if env_changed: - regex = list_regex('%s test%s altered the execution environment', + regex = list_regex(r'%s test%s altered the execution environment ' + r'\(env changed\)', env_changed) self.check_line(output, regex) @@ -480,32 +575,36 @@ class BaseTestCase(unittest.TestCase): regex = list_regex('%s test%s omitted', omitted) self.check_line(output, regex) - if rerun: - regex = list_regex('%s re-run test%s', rerun.keys()) + if rerun is not None: + regex = list_regex('%s re-run test%s', [rerun.name]) self.check_line(output, regex) - regex = LOG_PREFIX + r"Re-running failed tests in verbose mode" + regex = LOG_PREFIX + r"Re-running 1 failed tests in verbose mode" + self.check_line(output, regex) + regex = fr"Re-running {rerun.name} in verbose mode" + if rerun.match: + regex = fr"{regex} \(matching: {rerun.match}\)" self.check_line(output, regex) - for name, match in rerun.items(): - regex = LOG_PREFIX + f"Re-running {name} in verbose mode \\(matching: {match}\\)" - self.check_line(output, regex) if run_no_tests: regex = list_regex('%s test%s run no tests', run_no_tests) self.check_line(output, regex) - good = (len(tests) - len(skipped) - len(failed) + good = (len(tests) - len(skipped) - len(resource_denied) - len(failed) - len(omitted) - len(env_changed) - len(run_no_tests)) if good: - regex = r'%s test%s OK\.$' % (good, plural(good)) - if not skipped and not failed and good > 1: + regex = r'%s test%s OK\.' % (good, plural(good)) + if not skipped and not failed and (rerun is None or rerun.success) and good > 1: regex = 'All %s' % regex - self.check_line(output, regex) + self.check_line(output, regex, full=True) if interrupted: self.check_line(output, 'Test suite interrupted by signal SIGINT.') # Total tests - parts = [f'run={stats.tests_run:,}'] + text = f'run={stats.tests_run:,}' + if filtered: + text = fr'{text} \(filtered\)' + parts = [text] if stats.failures: parts.append(f'failures={stats.failures:,}') if stats.skipped: @@ -514,44 +613,57 @@ class BaseTestCase(unittest.TestCase): self.check_line(output, line, full=True) # Total test files - report = [f'success={good}'] - if failed: - report.append(f'failed={len(failed)}') - if env_changed: - report.append(f'env_changed={len(env_changed)}') - if skipped: - report.append(f'skipped={len(skipped)}') - if resource_denied: - report.append(f'resource_denied={len(resource_denied)}') - if rerun: - report.append(f'rerun={len(rerun)}') - if run_no_tests: - report.append(f'run_no_tests={len(run_no_tests)}') + run = len(total_tests) - len(resource_denied) + if rerun is not None: + total_failed = len(rerun_failed) + total_rerun = 1 + else: + total_failed = len(failed) + total_rerun = 0 + if interrupted: + run = 0 + text = f'run={run}' + if not forever: + text = f'{text}/{len(tests)}' + if filtered: + text = fr'{text} \(filtered\)' + report = [text] + for name, ntest in ( + ('failed', total_failed), + ('env_changed', len(env_changed)), + ('skipped', len(skipped)), + ('resource_denied', len(resource_denied)), + ('rerun', total_rerun), + ('run_no_tests', len(run_no_tests)), + ): + if ntest: + report.append(f'{name}={ntest}') line = fr'Total test files: {" ".join(report)}' self.check_line(output, line, full=True) # Result - result = [] + state = [] if failed: - result.append('FAILURE') + state.append('FAILURE') elif fail_env_changed and env_changed: - result.append('ENV CHANGED') + state.append('ENV CHANGED') if interrupted: - result.append('INTERRUPTED') - if not any((good, result, failed, interrupted, skipped, + state.append('INTERRUPTED') + if not any((good, failed, interrupted, skipped, env_changed, fail_env_changed)): - result.append("NO TESTS RAN") - elif not result: - result.append('SUCCESS') - result = ', '.join(result) - if rerun: - result = 'FAILURE then %s' % result - self.check_line(output, f'Result: {result}', full=True) + state.append("NO TESTS RAN") + elif not state: + state.append('SUCCESS') + state = ', '.join(state) + if rerun is not None: + new_state = 'SUCCESS' if rerun.success else 'FAILURE' + state = f'{state} then {new_state}' + self.check_line(output, f'Result: {state}', full=True) def parse_random_seed(self, output): match = self.regex_search(r'Using random seed ([0-9]+)', output) randseed = int(match.group(1)) - self.assertTrue(0 <= randseed <= 10000000, randseed) + self.assertTrue(0 <= randseed, randseed) return randseed def run_command(self, args, input=None, exitcode=0, **kw): @@ -560,18 +672,18 @@ class BaseTestCase(unittest.TestCase): if 'stderr' not in kw: kw['stderr'] = subprocess.STDOUT proc = subprocess.run(args, - universal_newlines=True, + text=True, input=input, stdout=subprocess.PIPE, **kw) if proc.returncode != exitcode: - msg = ("Command %s failed with exit code %s\n" + msg = ("Command %s failed with exit code %s, but exit code %s expected!\n" "\n" "stdout:\n" "---\n" "%s\n" "---\n" - % (str(args), proc.returncode, proc.stdout)) + % (str(args), proc.returncode, exitcode, proc.stdout)) if proc.stderr: msg += ("\n" "stderr:\n" @@ -583,7 +695,11 @@ class BaseTestCase(unittest.TestCase): return proc def run_python(self, args, **kw): - args = [sys.executable, '-X', 'faulthandler', '-I', *args] + extraargs = [] + if 'uops' in sys._xoptions: + # Pass -X uops along + extraargs.extend(['-X', 'uops']) + args = [sys.executable, *extraargs, '-X', 'faulthandler', '-I', *args] proc = self.run_command(args, **kw) return proc.stdout @@ -638,8 +754,8 @@ class ProgramsTestCase(BaseTestCase): self.check_executed_tests(output, self.tests, randomize=True, stats=len(self.tests)) - def run_tests(self, args): - output = self.run_python(args) + def run_tests(self, args, env=None): + output = self.run_python(args, env=env) self.check_output(output) def test_script_regrtest(self): @@ -680,14 +796,6 @@ class ProgramsTestCase(BaseTestCase): args = [*self.python_args, script, *self.regrtest_args, *self.tests] self.run_tests(args) - @unittest.skipUnless(sysconfig.is_python_build(), - 'run_tests.py script is not installed') - def test_tools_script_run_tests(self): - # Tools/scripts/run_tests.py - script = os.path.join(ROOT_DIR, 'Tools', 'scripts', 'run_tests.py') - args = [script, *self.regrtest_args, *self.tests] - self.run_tests(args) - def run_batch(self, *args): proc = self.run_command(args) self.check_output(proc.stdout) @@ -705,7 +813,7 @@ class ProgramsTestCase(BaseTestCase): test_args.append('-arm32') # 32-bit ARM build elif platform.architecture()[0] == '64bit': test_args.append('-x64') # 64-bit build - if not Py_DEBUG: + if not support.Py_DEBUG: test_args.append('+d') # Release build, use python.exe self.run_batch(script, *test_args, *self.tests) @@ -722,7 +830,7 @@ class ProgramsTestCase(BaseTestCase): rt_args.append('-arm32') # 32-bit ARM build elif platform.architecture()[0] == '64bit': rt_args.append('-x64') # 64-bit build - if Py_DEBUG: + if support.Py_DEBUG: rt_args.append('-d') # Debug build, use python_d.exe self.run_batch(script, *rt_args, *self.regrtest_args, *self.tests) @@ -736,6 +844,40 @@ class ArgsTestCase(BaseTestCase): cmdargs = ['-m', 'test', '--testdir=%s' % self.tmptestdir, *testargs] return self.run_python(cmdargs, **kw) + def test_success(self): + code = textwrap.dedent(""" + import unittest + + class PassingTests(unittest.TestCase): + def test_test1(self): + pass + + def test_test2(self): + pass + + def test_test3(self): + pass + """) + tests = [self.create_test(f'ok{i}', code=code) for i in range(1, 6)] + + output = self.run_tests(*tests) + self.check_executed_tests(output, tests, + stats=3 * len(tests)) + + def test_skip(self): + code = textwrap.dedent(""" + import unittest + raise unittest.SkipTest("nope") + """) + test_ok = self.create_test('ok') + test_skip = self.create_test('skip', code=code) + tests = [test_ok, test_skip] + + output = self.run_tests(*tests) + self.check_executed_tests(output, tests, + skipped=[test_skip], + stats=1) + def test_failing_test(self): # test a failing test code = textwrap.dedent(""" @@ -775,14 +917,12 @@ class ArgsTestCase(BaseTestCase): # -u audio: 1 resource enabled output = self.run_tests('-uaudio', *test_names) self.check_executed_tests(output, test_names, - skipped=tests['network'], resource_denied=tests['network'], stats=1) # no option: 0 resources enabled - output = self.run_tests(*test_names) + output = self.run_tests(*test_names, exitcode=EXITCODE_NO_TESTS_RAN) self.check_executed_tests(output, test_names, - skipped=test_names, resource_denied=test_names, stats=0) @@ -810,6 +950,10 @@ class ArgsTestCase(BaseTestCase): test_random2 = int(match.group(1)) self.assertEqual(test_random2, test_random) + # check that random.seed is used by default + output = self.run_tests(test, exitcode=EXITCODE_NO_TESTS_RAN) + self.assertIsInstance(self.parse_random_seed(output), int) + def test_fromfile(self): # test --fromfile tests = [self.create_test() for index in range(5)] @@ -928,16 +1072,32 @@ class ArgsTestCase(BaseTestCase): builtins.__dict__['RUN'] = 1 """) test = self.create_test('forever', code=code) + + # --forever output = self.run_tests('--forever', test, exitcode=EXITCODE_BAD_TEST) self.check_executed_tests(output, [test]*3, failed=test, - stats=TestStats(1, 1)) - - def check_leak(self, code, what): + stats=TestStats(3, 1), + forever=True) + + # --forever --rerun + output = self.run_tests('--forever', '--rerun', test, exitcode=0) + self.check_executed_tests(output, [test]*3, + rerun=Rerun(test, + match='test_run', + success=True), + stats=TestStats(4, 1), + forever=True) + + def check_leak(self, code, what, *, run_workers=False): test = self.create_test('huntrleaks', code=code) filename = 'reflog.txt' self.addCleanup(os_helper.unlink, filename) - output = self.run_tests('--huntrleaks', '3:3:', test, + cmd = ['--huntrleaks', '3:3:'] + if run_workers: + cmd.append('-j1') + cmd.append(test) + output = self.run_tests(*cmd, exitcode=EXITCODE_BAD_TEST, stderr=subprocess.STDOUT) self.check_executed_tests(output, [test], failed=test, stats=1) @@ -952,8 +1112,8 @@ class ArgsTestCase(BaseTestCase): reflog = fp.read() self.assertIn(line2, reflog) - @unittest.skipUnless(Py_DEBUG, 'need a debug build') - def test_huntrleaks(self): + @unittest.skipUnless(support.Py_DEBUG, 'need a debug build') + def check_huntrleaks(self, *, run_workers: bool): # test --huntrleaks code = textwrap.dedent(""" import unittest @@ -964,9 +1124,15 @@ class ArgsTestCase(BaseTestCase): def test_leak(self): GLOBAL_LIST.append(object()) """) - self.check_leak(code, 'references') + self.check_leak(code, 'references', run_workers=run_workers) - @unittest.skipUnless(Py_DEBUG, 'need a debug build') + def test_huntrleaks(self): + self.check_huntrleaks(run_workers=False) + + def test_huntrleaks_mp(self): + self.check_huntrleaks(run_workers=True) + + @unittest.skipUnless(support.Py_DEBUG, 'need a debug build') def test_huntrleaks_fd_leak(self): # test --huntrleaks for file descriptor leak code = textwrap.dedent(""" @@ -1022,7 +1188,7 @@ class ArgsTestCase(BaseTestCase): tests = [crash_test] output = self.run_tests("-j2", *tests, exitcode=EXITCODE_BAD_TEST) self.check_executed_tests(output, tests, failed=crash_test, - randomize=True, stats=0) + parallel=True, stats=0) def parse_methods(self, output): regex = re.compile("^(test[^ ]+).*ok$", flags=re.MULTILINE) @@ -1042,8 +1208,6 @@ class ArgsTestCase(BaseTestCase): def test_method4(self): pass """) - all_methods = ['test_method1', 'test_method2', - 'test_method3', 'test_method4'] testname = self.create_test(code=code) # only run a subset @@ -1126,6 +1290,15 @@ class ArgsTestCase(BaseTestCase): self.check_executed_tests(output, [testname], env_changed=testname, fail_env_changed=True, stats=1) + # rerun + output = self.run_tests("--rerun", testname) + self.check_executed_tests(output, [testname], + env_changed=testname, + rerun=Rerun(testname, + match=None, + success=True), + stats=2) + def test_rerun_fail(self): # FAILURE then FAILURE code = textwrap.dedent(""" @@ -1141,33 +1314,55 @@ class ArgsTestCase(BaseTestCase): """) testname = self.create_test(code=code) - output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST) + output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST) self.check_executed_tests(output, [testname], - failed=testname, - rerun={testname: "test_fail_always"}, - stats=TestStats(1, 1)) + rerun=Rerun(testname, + "test_fail_always", + success=False), + stats=TestStats(3, 2)) def test_rerun_success(self): # FAILURE then SUCCESS - code = textwrap.dedent(""" - import builtins + marker_filename = os.path.abspath("regrtest_marker_filename") + self.addCleanup(os_helper.unlink, marker_filename) + self.assertFalse(os.path.exists(marker_filename)) + + code = textwrap.dedent(f""" + import os.path import unittest + marker_filename = {marker_filename!r} + class Tests(unittest.TestCase): def test_succeed(self): return def test_fail_once(self): - if not hasattr(builtins, '_test_failed'): - builtins._test_failed = True + if not os.path.exists(marker_filename): + open(marker_filename, "w").close() self.fail("bug") """) testname = self.create_test(code=code) - output = self.run_tests("-w", testname, exitcode=0) + # FAILURE then SUCCESS => exit code 0 + output = self.run_tests("--rerun", testname, exitcode=0) self.check_executed_tests(output, [testname], - rerun={testname: "test_fail_once"}, - stats=1) + rerun=Rerun(testname, + match="test_fail_once", + success=True), + stats=TestStats(3, 1)) + os_helper.unlink(marker_filename) + + # with --fail-rerun, exit code EXITCODE_RERUN_FAIL + # on "FAILURE then SUCCESS" state. + output = self.run_tests("--rerun", "--fail-rerun", testname, + exitcode=EXITCODE_RERUN_FAIL) + self.check_executed_tests(output, [testname], + rerun=Rerun(testname, + match="test_fail_once", + success=True), + stats=TestStats(3, 1)) + os_helper.unlink(marker_filename) def test_rerun_setup_class_hook_failure(self): # FAILURE then FAILURE @@ -1184,10 +1379,12 @@ class ArgsTestCase(BaseTestCase): """) testname = self.create_test(code=code) - output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST) + output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST) self.check_executed_tests(output, testname, failed=[testname], - rerun={testname: "ExampleTests"}, + rerun=Rerun(testname, + match="ExampleTests", + success=False), stats=0) def test_rerun_teardown_class_hook_failure(self): @@ -1205,11 +1402,13 @@ class ArgsTestCase(BaseTestCase): """) testname = self.create_test(code=code) - output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST) + output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST) self.check_executed_tests(output, testname, failed=[testname], - rerun={testname: "ExampleTests"}, - stats=1) + rerun=Rerun(testname, + match="ExampleTests", + success=False), + stats=2) def test_rerun_setup_module_hook_failure(self): # FAILURE then FAILURE @@ -1225,10 +1424,12 @@ class ArgsTestCase(BaseTestCase): """) testname = self.create_test(code=code) - output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST) + output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST) self.check_executed_tests(output, testname, failed=[testname], - rerun={testname: testname}, + rerun=Rerun(testname, + match=None, + success=False), stats=0) def test_rerun_teardown_module_hook_failure(self): @@ -1245,11 +1446,13 @@ class ArgsTestCase(BaseTestCase): """) testname = self.create_test(code=code) - output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST) - self.check_executed_tests(output, testname, + output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST) + self.check_executed_tests(output, [testname], failed=[testname], - rerun={testname: testname}, - stats=1) + rerun=Rerun(testname, + match=None, + success=False), + stats=2) def test_rerun_setup_hook_failure(self): # FAILURE then FAILURE @@ -1265,11 +1468,13 @@ class ArgsTestCase(BaseTestCase): """) testname = self.create_test(code=code) - output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST) + output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST) self.check_executed_tests(output, testname, failed=[testname], - rerun={testname: "test_success"}, - stats=1) + rerun=Rerun(testname, + match="test_success", + success=False), + stats=2) def test_rerun_teardown_hook_failure(self): # FAILURE then FAILURE @@ -1285,11 +1490,13 @@ class ArgsTestCase(BaseTestCase): """) testname = self.create_test(code=code) - output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST) + output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST) self.check_executed_tests(output, testname, failed=[testname], - rerun={testname: "test_success"}, - stats=1) + rerun=Rerun(testname, + match="test_success", + success=False), + stats=2) def test_rerun_async_setup_hook_failure(self): # FAILURE then FAILURE @@ -1305,11 +1512,12 @@ class ArgsTestCase(BaseTestCase): """) testname = self.create_test(code=code) - output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST) + output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST) self.check_executed_tests(output, testname, - failed=[testname], - rerun={testname: "test_success"}, - stats=1) + rerun=Rerun(testname, + match="test_success", + success=False), + stats=2) def test_rerun_async_teardown_hook_failure(self): # FAILURE then FAILURE @@ -1325,11 +1533,13 @@ class ArgsTestCase(BaseTestCase): """) testname = self.create_test(code=code) - output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST) + output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST) self.check_executed_tests(output, testname, failed=[testname], - rerun={testname: "test_success"}, - stats=1) + rerun=Rerun(testname, + match="test_success", + success=False), + stats=2) def test_no_tests_ran(self): code = textwrap.dedent(""" @@ -1345,7 +1555,7 @@ class ArgsTestCase(BaseTestCase): exitcode=EXITCODE_NO_TESTS_RAN) self.check_executed_tests(output, [testname], run_no_tests=testname, - stats=0) + stats=0, filtered=True) def test_no_tests_ran_skip(self): code = textwrap.dedent(""" @@ -1376,7 +1586,7 @@ class ArgsTestCase(BaseTestCase): exitcode=EXITCODE_NO_TESTS_RAN) self.check_executed_tests(output, [testname, testname2], run_no_tests=[testname, testname2], - stats=0) + stats=0, filtered=True) def test_no_test_ran_some_test_exist_some_not(self): code = textwrap.dedent(""" @@ -1400,7 +1610,7 @@ class ArgsTestCase(BaseTestCase): "-m", "test_other_bug", exitcode=0) self.check_executed_tests(output, [testname, testname2], run_no_tests=[testname], - stats=1) + stats=1, filtered=True) @support.cpython_only def test_uncollectable(self): @@ -1611,16 +1821,15 @@ class ArgsTestCase(BaseTestCase): self.check_executed_tests(output, testnames, env_changed=testnames, fail_env_changed=True, - randomize=True, + parallel=True, stats=len(testnames)) for testname in testnames: self.assertIn(f"Warning -- {testname} leaked temporary " f"files (1): mytmpfile", output) - def test_mp_decode_error(self): - # gh-101634: If a worker stdout cannot be decoded, report a failed test - # and a non-zero exit code. + def test_worker_decode_error(self): + # gh-109425: Use "backslashreplace" error handler to decode stdout. if sys.platform == 'win32': encoding = locale.getencoding() else: @@ -1628,34 +1837,46 @@ class ArgsTestCase(BaseTestCase): if encoding is None: encoding = sys.__stdout__.encoding if encoding is None: - self.skipTest(f"cannot get regrtest worker encoding") - - nonascii = b"byte:\xa0\xa9\xff\n" + self.skipTest("cannot get regrtest worker encoding") + + nonascii = bytes(ch for ch in range(128, 256)) + corrupted_output = b"nonascii:%s\n" % (nonascii,) + # gh-108989: On Windows, assertion errors are written in UTF-16: when + # decoded each letter is follow by a NUL character. + assertion_failed = 'Assertion failed: tstate_is_alive(tstate)\n' + corrupted_output += assertion_failed.encode('utf-16-le') try: - nonascii.decode(encoding) + corrupted_output.decode(encoding) except UnicodeDecodeError: pass else: - self.skipTest(f"{encoding} can decode non-ASCII bytes {nonascii!a}") + self.skipTest(f"{encoding} can decode non-ASCII bytes") + + expected_line = corrupted_output.decode(encoding, 'backslashreplace') code = textwrap.dedent(fr""" import sys + import unittest + + class Tests(unittest.TestCase): + def test_pass(self): + pass + # bytes which cannot be decoded from UTF-8 - nonascii = {nonascii!a} - sys.stdout.buffer.write(nonascii) + corrupted_output = {corrupted_output!a} + sys.stdout.buffer.write(corrupted_output) sys.stdout.buffer.flush() """) testname = self.create_test(code=code) - output = self.run_tests("--fail-env-changed", "-v", "-j1", testname, - exitcode=EXITCODE_BAD_TEST) + output = self.run_tests("--fail-env-changed", "-v", "-j1", testname) self.check_executed_tests(output, [testname], - failed=[testname], - randomize=True, - stats=0) + parallel=True, + stats=1) + self.check_line(output, expected_line, regex=False) def test_doctest(self): - code = textwrap.dedent(fr''' + code = textwrap.dedent(r''' import doctest import sys from test import support @@ -1690,9 +1911,169 @@ class ArgsTestCase(BaseTestCase): exitcode=EXITCODE_BAD_TEST) self.check_executed_tests(output, [testname], failed=[testname], - randomize=True, + parallel=True, stats=TestStats(1, 1, 0)) + def _check_random_seed(self, run_workers: bool): + # gh-109276: When -r/--randomize is used, random.seed() is called + # with the same random seed before running each test file. + code = textwrap.dedent(r''' + import random + import unittest + + class RandomSeedTest(unittest.TestCase): + def test_randint(self): + numbers = [random.randint(0, 1000) for _ in range(10)] + print(f"Random numbers: {numbers}") + ''') + tests = [self.create_test(name=f'test_random{i}', code=code) + for i in range(1, 3+1)] + + random_seed = 856_656_202 + cmd = ["--randomize", f"--randseed={random_seed}"] + if run_workers: + # run as many worker processes than the number of tests + cmd.append(f'-j{len(tests)}') + cmd.extend(tests) + output = self.run_tests(*cmd) + + random.seed(random_seed) + # Make the assumption that nothing consume entropy between libregrest + # setup_tests() which calls random.seed() and RandomSeedTest calling + # random.randint(). + numbers = [random.randint(0, 1000) for _ in range(10)] + expected = f"Random numbers: {numbers}" + + regex = r'^Random numbers: .*$' + matches = re.findall(regex, output, flags=re.MULTILINE) + self.assertEqual(matches, [expected] * len(tests)) + + def test_random_seed(self): + self._check_random_seed(run_workers=False) + + def test_random_seed_workers(self): + self._check_random_seed(run_workers=True) + + def test_python_command(self): + code = textwrap.dedent(r""" + import sys + import unittest + + class WorkerTests(unittest.TestCase): + def test_dev_mode(self): + self.assertTrue(sys.flags.dev_mode) + """) + tests = [self.create_test(code=code) for _ in range(3)] + + # Custom Python command: "python -X dev" + python_cmd = [sys.executable, '-X', 'dev'] + # test.libregrtest.cmdline uses shlex.split() to parse the Python + # command line string + python_cmd = shlex.join(python_cmd) + + output = self.run_tests("--python", python_cmd, "-j0", *tests) + self.check_executed_tests(output, tests, + stats=len(tests), parallel=True) + + def check_add_python_opts(self, option): + # --fast-ci and --slow-ci add "-u -W default -bb -E" options to Python + code = textwrap.dedent(r""" + import sys + import unittest + from test import support + try: + from _testinternalcapi import get_config + except ImportError: + get_config = None + + # WASI/WASM buildbots don't use -E option + use_environment = (support.is_emscripten or support.is_wasi) + + class WorkerTests(unittest.TestCase): + @unittest.skipUnless(get_config is None, 'need get_config()') + def test_config(self): + config = get_config()['config'] + # -u option + self.assertEqual(config['buffered_stdio'], 0) + # -W default option + self.assertTrue(config['warnoptions'], ['default']) + # -bb option + self.assertTrue(config['bytes_warning'], 2) + # -E option + self.assertTrue(config['use_environment'], use_environment) + + def test_python_opts(self): + # -u option + self.assertTrue(sys.__stdout__.write_through) + self.assertTrue(sys.__stderr__.write_through) + + # -W default option + self.assertTrue(sys.warnoptions, ['default']) + + # -bb option + self.assertEqual(sys.flags.bytes_warning, 2) + + # -E option + self.assertEqual(not sys.flags.ignore_environment, + use_environment) + """) + testname = self.create_test(code=code) + + # Use directly subprocess to control the exact command line + cmd = [sys.executable, + "-m", "test", option, + f'--testdir={self.tmptestdir}', + testname] + proc = subprocess.run(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True) + self.assertEqual(proc.returncode, 0, proc) + + def test_add_python_opts(self): + for opt in ("--fast-ci", "--slow-ci"): + with self.subTest(opt=opt): + self.check_add_python_opts(opt) + + # gh-76319: Raising SIGSEGV on Android may not cause a crash. + @unittest.skipIf(support.is_android, + 'raising SIGSEGV on Android is unreliable') + def test_worker_output_on_failure(self): + try: + from faulthandler import _sigsegv + except ImportError: + self.skipTest("need faulthandler._sigsegv") + + code = textwrap.dedent(r""" + import faulthandler + import unittest + from test import support + + class CrashTests(unittest.TestCase): + def test_crash(self): + print("just before crash!", flush=True) + + with support.SuppressCrashReport(): + faulthandler._sigsegv(True) + """) + testname = self.create_test(code=code) + + # Sanitizers must not handle SIGSEGV (ex: for test_enable_fd()) + env = dict(os.environ) + option = 'handle_segv=0' + support.set_sanitizer_env_var(env, option) + + output = self.run_tests("-j1", testname, + exitcode=EXITCODE_BAD_TEST, + env=env) + self.check_executed_tests(output, testname, + failed=[testname], + stats=0, parallel=True) + if not support.MS_WINDOWS: + exitcode = -int(signal.SIGSEGV) + self.assertIn(f"Exit code {exitcode} (SIGSEGV)", output) + self.check_line(output, "just before crash!", full=True, regex=False) + class TestUtils(unittest.TestCase): def test_format_duration(self): @@ -1717,6 +2098,46 @@ class TestUtils(unittest.TestCase): self.assertEqual(utils.format_duration(3 * 3600 + 1), '3 hour 1 sec') + def test_normalize_test_name(self): + normalize = normalize_test_name + self.assertEqual(normalize('test_access (test.test_os.FileTests.test_access)'), + 'test_access') + self.assertEqual(normalize('setUpClass (test.test_os.ChownFileTests)', is_error=True), + 'ChownFileTests') + self.assertEqual(normalize('test_success (test.test_bug.ExampleTests.test_success)', is_error=True), + 'test_success') + self.assertIsNone(normalize('setUpModule (test.test_x)', is_error=True)) + self.assertIsNone(normalize('tearDownModule (test.test_module)', is_error=True)) + + def test_get_signal_name(self): + for exitcode, expected in ( + (-int(signal.SIGINT), 'SIGINT'), + (-int(signal.SIGSEGV), 'SIGSEGV'), + (3221225477, "STATUS_ACCESS_VIOLATION"), + (0xC00000FD, "STATUS_STACK_OVERFLOW"), + ): + self.assertEqual(utils.get_signal_name(exitcode), expected, exitcode) + + def test_format_resources(self): + format_resources = utils.format_resources + ALL_RESOURCES = utils.ALL_RESOURCES + self.assertEqual( + format_resources(("network",)), + 'resources (1): network') + self.assertEqual( + format_resources(("audio", "decimal", "network")), + 'resources (3): audio,decimal,network') + self.assertEqual( + format_resources(ALL_RESOURCES), + 'resources: all') + self.assertEqual( + format_resources(tuple(name for name in ALL_RESOURCES + if name != "cpu")), + 'resources: all,-cpu') + self.assertEqual( + format_resources((*ALL_RESOURCES, "tzdata")), + 'resources: all,tzdata') + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index fec96be..01ceeec 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -31,7 +31,7 @@ class TestSupport(unittest.TestCase): "test.support.warnings_helper", like=".*used in test_support.*" ) cls._test_support_token = support.ignore_deprecations_from( - "test.test_support", like=".*You should NOT be seeing this.*" + __name__, like=".*You should NOT be seeing this.*" ) assert len(warnings.filters) == orig_filter_len + 2 @@ -775,7 +775,45 @@ class TestSupport(unittest.TestCase): else: self.fail("RecursionError was not raised") - #self.assertEqual(available, 2) + def test_parse_memlimit(self): + parse = support._parse_memlimit + KiB = 1024 + MiB = KiB * 1024 + GiB = MiB * 1024 + TiB = GiB * 1024 + self.assertEqual(parse('0k'), 0) + self.assertEqual(parse('3k'), 3 * KiB) + self.assertEqual(parse('2.4m'), int(2.4 * MiB)) + self.assertEqual(parse('4g'), int(4 * GiB)) + self.assertEqual(parse('1t'), TiB) + + for limit in ('', '3', '3.5.10k', '10x'): + with self.subTest(limit=limit): + with self.assertRaises(ValueError): + parse(limit) + + def test_set_memlimit(self): + _4GiB = 4 * 1024 ** 3 + TiB = 1024 ** 4 + old_max_memuse = support.max_memuse + old_real_max_memuse = support.real_max_memuse + try: + if sys.maxsize > 2**32: + support.set_memlimit('4g') + self.assertEqual(support.max_memuse, _4GiB) + self.assertEqual(support.real_max_memuse, _4GiB) + + big = 2**100 // TiB + support.set_memlimit(f'{big}t') + self.assertEqual(support.max_memuse, sys.maxsize) + self.assertEqual(support.real_max_memuse, big * TiB) + else: + support.set_memlimit('4g') + self.assertEqual(support.max_memuse, sys.maxsize) + self.assertEqual(support.real_max_memuse, _4GiB) + finally: + support.max_memuse = old_max_memuse + support.real_max_memuse = old_real_max_memuse def test_copy_python_src_ignore(self): # Get source directory @@ -824,7 +862,6 @@ class TestSupport(unittest.TestCase): # EnvironmentVarGuard # transient_internet # run_with_locale - # set_memlimit # bigmemtest # precisionbigmemtest # bigaddrspacetest -- cgit v0.12