summaryrefslogtreecommitdiffstats
path: root/Lib/test/libregrtest
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-10-12 21:45:36 (GMT)
committerGitHub <noreply@github.com>2023-10-12 21:45:36 (GMT)
commit26748ed4f61520c59af15547792d1e73144a4314 (patch)
treed7e86888379c3ade062978fcc9070ef62fb02d45 /Lib/test/libregrtest
parente16922f07010a620298d7fb8abfae5c578d5d337 (diff)
downloadcpython-26748ed4f61520c59af15547792d1e73144a4314.zip
cpython-26748ed4f61520c59af15547792d1e73144a4314.tar.gz
cpython-26748ed4f61520c59af15547792d1e73144a4314.tar.bz2
gh-110756: Sync regrtest with main branch (#110758) (#110781)
Copy files from main to this branch: * Lib/test/libregrtest/*.py * Lib/test/__init__.py * Lib/test/__main__.py * Lib/test/autotest.py * Lib/test/pythoninfo.py * Lib/test/regrtest.py * Lib/test/test_regrtest.py Copy also changes from: * Lib/test/support/__init__.py * Lib/test/support/os_helper.py * Lib/test/support/testresult.py * Lib/test/support/threading_helper.py * Lib/test/test_support.py Do not modify scripts running tests such as Makefile.pre.in, .github/workflows/build.yml or Tools/scripts/run_tests.py: do not use --fast-ci and --slow-ci in this change. Changes: * SPLITTESTDIRS: don't include test_inspect. * Add utils.process_cpu_count() using len(os.sched_getaffinity(0)). * test_regrtest doesn't use @support.without_optimizer which doesn't exist in Python 3.11. * Add support.set_sanitizer_env_var(). * Update test_faulthandler to use support.set_sanitizer_env_var(). * @support.without_optimizer doesn't exist in 3.11. * Add support.Py_DEBUG. * regrtest.refleak: 3.11 doesn't have sys.getunicodeinternedsize.
Diffstat (limited to 'Lib/test/libregrtest')
-rw-r--r--Lib/test/libregrtest/__init__.py2
-rw-r--r--Lib/test/libregrtest/cmdline.py125
-rw-r--r--Lib/test/libregrtest/findtests.py105
-rw-r--r--Lib/test/libregrtest/logger.py86
-rw-r--r--Lib/test/libregrtest/main.py1199
-rw-r--r--Lib/test/libregrtest/pgo.py8
-rw-r--r--Lib/test/libregrtest/refleak.py35
-rw-r--r--Lib/test/libregrtest/result.py190
-rw-r--r--Lib/test/libregrtest/results.py261
-rw-r--r--Lib/test/libregrtest/run_workers.py607
-rw-r--r--Lib/test/libregrtest/runtest.py479
-rw-r--r--Lib/test/libregrtest/runtest_mp.py564
-rw-r--r--Lib/test/libregrtest/runtests.py162
-rw-r--r--Lib/test/libregrtest/save_env.py14
-rw-r--r--Lib/test/libregrtest/setup.py139
-rw-r--r--Lib/test/libregrtest/single.py278
-rw-r--r--Lib/test/libregrtest/utils.py409
-rw-r--r--Lib/test/libregrtest/worker.py116
18 files changed, 2884 insertions, 1895 deletions
diff --git a/Lib/test/libregrtest/__init__.py b/Lib/test/libregrtest/__init__.py
index 5e8dba5..e69de29 100644
--- a/Lib/test/libregrtest/__init__.py
+++ b/Lib/test/libregrtest/__init__.py
@@ -1,2 +0,0 @@
-from test.libregrtest.cmdline import _parse_args, RESOURCE_NAMES, ALL_RESOURCES
-from test.libregrtest.main import main
diff --git a/Lib/test/libregrtest/cmdline.py b/Lib/test/libregrtest/cmdline.py
index 40ae8fc..dd4cd33 100644
--- a/Lib/test/libregrtest/cmdline.py
+++ b/Lib/test/libregrtest/cmdline.py
@@ -1,8 +1,9 @@
import argparse
-import os
+import os.path
import shlex
import sys
from test.support import os_helper
+from .utils import ALL_RESOURCES, RESOURCE_NAMES
USAGE = """\
@@ -27,8 +28,10 @@ EPILOG = """\
Additional option details:
-r randomizes test execution order. You can use --randseed=int to provide an
-int seed value for the randomizer; this is useful for reproducing troublesome
-test orders.
+int seed value for the randomizer. The randseed value will be used
+to set seeds for all random usages in tests
+(including randomizing the tests order if -r is set).
+By default we always set random seed, but do not randomize test order.
-s On the first invocation of regrtest using -s, the first test file found
or the first test file given on the command line is run, and the name of
@@ -130,25 +133,17 @@ Pattern examples:
"""
-ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network',
- 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui', 'walltime')
-
-# Other resources excluded from --use=all:
-#
-# - extralagefile (ex: test_zipfile64): really too slow to be enabled
-# "by default"
-# - tzdata: while needed to validate fully test_datetime, it makes
-# test_datetime too slow (15-20 min on some buildbots) and so is disabled by
-# default (see bpo-30822).
-RESOURCE_NAMES = ALL_RESOURCES + ('extralargefile', 'tzdata')
-
-
class Namespace(argparse.Namespace):
def __init__(self, **kwargs) -> None:
+ self.ci = False
self.testdir = None
self.verbose = 0
self.quiet = False
self.exclude = False
+ self.cleanup = False
+ self.wait = False
+ self.list_cases = False
+ self.list_tests = False
self.single = False
self.randomize = False
self.fromfile = None
@@ -157,8 +152,8 @@ class Namespace(argparse.Namespace):
self.trace = False
self.coverdir = 'coverage'
self.runleaks = False
- self.huntrleaks = False
- self.verbose2 = False
+ self.huntrleaks: tuple[int, int, str] | None = None
+ self.rerun = False
self.verbose3 = False
self.print_slow = False
self.random_seed = None
@@ -170,6 +165,14 @@ class Namespace(argparse.Namespace):
self.ignore_tests = None
self.pgo = False
self.pgo_extended = False
+ self.worker_json = None
+ self.start = None
+ self.timeout = None
+ self.memlimit = None
+ self.threshold = None
+ self.fail_rerun = False
+ self.tempdir = None
+ self._add_python_opts = True
super().__init__(**kwargs)
@@ -198,25 +201,35 @@ def _create_parser():
# We add help explicitly to control what argument group it renders under.
group.add_argument('-h', '--help', action='help',
help='show this help message and exit')
- group.add_argument('--timeout', metavar='TIMEOUT', type=float,
+ group.add_argument('--fast-ci', action='store_true',
+ help='Fast Continuous Integration (CI) mode used by '
+ 'GitHub Actions')
+ group.add_argument('--slow-ci', action='store_true',
+ help='Slow Continuous Integration (CI) mode used by '
+ 'buildbot workers')
+ group.add_argument('--timeout', metavar='TIMEOUT',
help='dump the traceback and exit if a test takes '
'more than TIMEOUT seconds; disabled if TIMEOUT '
'is negative or equals to zero')
group.add_argument('--wait', action='store_true',
help='wait for user input, e.g., allow a debugger '
'to be attached')
- group.add_argument('--worker-args', metavar='ARGS')
group.add_argument('-S', '--start', metavar='START',
help='the name of the test at which to start.' +
more_details)
group.add_argument('-p', '--python', metavar='PYTHON',
help='Command to run Python test subprocesses with.')
+ group.add_argument('--randseed', metavar='SEED',
+ dest='random_seed', type=int,
+ help='pass a global random seed')
group = parser.add_argument_group('Verbosity')
group.add_argument('-v', '--verbose', action='count',
help='run tests in verbose mode with output to stdout')
- group.add_argument('-w', '--verbose2', action='store_true',
+ group.add_argument('-w', '--rerun', action='store_true',
help='re-run failed tests in verbose mode')
+ group.add_argument('--verbose2', action='store_true', dest='rerun',
+ help='deprecated alias to --rerun')
group.add_argument('-W', '--verbose3', action='store_true',
help='display test output on failure')
group.add_argument('-q', '--quiet', action='store_true',
@@ -229,10 +242,6 @@ def _create_parser():
group = parser.add_argument_group('Selecting tests')
group.add_argument('-r', '--randomize', action='store_true',
help='randomize test execution order.' + more_details)
- group.add_argument('--randseed', metavar='SEED',
- dest='random_seed', type=int,
- help='pass a random seed to reproduce a previous '
- 'random run')
group.add_argument('-f', '--fromfile', metavar='FILE',
help='read names of tests to run from a file.' +
more_details)
@@ -311,6 +320,9 @@ def _create_parser():
group.add_argument('--fail-env-changed', action='store_true',
help='if a test file alters the environment, mark '
'the test as failed')
+ group.add_argument('--fail-rerun', action='store_true',
+ help='if a test failed and then passed when re-run, '
+ 'mark the tests as failed')
group.add_argument('--junit-xml', dest='xmlpath', metavar='FILENAME',
help='writes JUnit-style XML results to the specified '
@@ -319,6 +331,9 @@ def _create_parser():
help='override the working directory for the test run')
group.add_argument('--cleanup', action='store_true',
help='remove old test_python_* directories')
+ group.add_argument('--dont-add-python-opts', dest='_add_python_opts',
+ action='store_false',
+ help="internal option, don't use it")
return parser
@@ -369,7 +384,50 @@ def _parse_args(args, **kwargs):
for arg in ns.args:
if arg.startswith('-'):
parser.error("unrecognized arguments: %s" % arg)
- sys.exit(1)
+
+ if ns.timeout is not None:
+ # Support "--timeout=" (no value) so Makefile.pre.pre TESTTIMEOUT
+ # can be used by "make buildbottest" and "make test".
+ if ns.timeout != "":
+ try:
+ ns.timeout = float(ns.timeout)
+ except ValueError:
+ parser.error(f"invalid timeout value: {ns.timeout!r}")
+ else:
+ ns.timeout = None
+
+ # Continuous Integration (CI): common options for fast/slow CI modes
+ if ns.slow_ci or ns.fast_ci:
+ # Similar to options:
+ #
+ # -j0 --randomize --fail-env-changed --fail-rerun --rerun
+ # --slowest --verbose3
+ if ns.use_mp is None:
+ ns.use_mp = 0
+ ns.randomize = True
+ ns.fail_env_changed = True
+ ns.fail_rerun = True
+ if ns.python is None:
+ ns.rerun = True
+ ns.print_slow = True
+ ns.verbose3 = True
+ else:
+ ns._add_python_opts = False
+
+ # When both --slow-ci and --fast-ci options are present,
+ # --slow-ci has the priority
+ if ns.slow_ci:
+ # Similar to: -u "all" --timeout=1200
+ if not ns.use:
+ ns.use = [['all']]
+ if ns.timeout is None:
+ ns.timeout = 1200 # 20 minutes
+ elif ns.fast_ci:
+ # Similar to: -u "all,-cpu" --timeout=600
+ if not ns.use:
+ ns.use = [['all', '-cpu']]
+ if ns.timeout is None:
+ ns.timeout = 600 # 10 minutes
if ns.single and ns.fromfile:
parser.error("-s and -f don't go together!")
@@ -382,7 +440,7 @@ def _parse_args(args, **kwargs):
ns.python = shlex.split(ns.python)
if ns.failfast and not (ns.verbose or ns.verbose3):
parser.error("-G/--failfast needs either -v or -W")
- if ns.pgo and (ns.verbose or ns.verbose2 or ns.verbose3):
+ if ns.pgo and (ns.verbose or ns.rerun or ns.verbose3):
parser.error("--pgo/-v don't go together!")
if ns.pgo_extended:
ns.pgo = True # pgo_extended implies pgo
@@ -396,10 +454,6 @@ def _parse_args(args, **kwargs):
if ns.timeout is not None:
if ns.timeout <= 0:
ns.timeout = None
- if ns.use_mp is not None:
- if ns.use_mp <= 0:
- # Use all cores + extras for tests that like to sleep
- ns.use_mp = 2 + (os.cpu_count() or 1)
if ns.use:
for a in ns.use:
for r in a:
@@ -443,4 +497,13 @@ def _parse_args(args, **kwargs):
# --forever implies --failfast
ns.failfast = True
+ if ns.huntrleaks:
+ warmup, repetitions, _ = ns.huntrleaks
+ if warmup < 1 or repetitions < 1:
+ msg = ("Invalid values for the --huntrleaks/-R parameters. The "
+ "number of warmups and repetitions must be at least 1 "
+ "each (1:1).")
+ print(msg, file=sys.stderr, flush=True)
+ sys.exit(2)
+
return ns
diff --git a/Lib/test/libregrtest/findtests.py b/Lib/test/libregrtest/findtests.py
new file mode 100644
index 0000000..96cc3e0
--- /dev/null
+++ b/Lib/test/libregrtest/findtests.py
@@ -0,0 +1,105 @@
+import os
+import sys
+import unittest
+
+from test import support
+
+from .utils import (
+ StrPath, TestName, TestTuple, TestList, FilterTuple,
+ abs_module_name, count, printlist)
+
+
+# If these test directories are encountered recurse into them and treat each
+# "test_*.py" file or each sub-directory as a separate test module. This can
+# increase parallelism.
+#
+# Beware this can't generally be done for any directory with sub-tests as the
+# __init__.py may do things which alter what tests are to be run.
+SPLITTESTDIRS: set[TestName] = {
+ "test_asyncio",
+ "test_concurrent_futures",
+ "test_future_stmt",
+ "test_gdb",
+ "test_multiprocessing_fork",
+ "test_multiprocessing_forkserver",
+ "test_multiprocessing_spawn",
+}
+
+
+def findtestdir(path: StrPath | None = None) -> StrPath:
+ return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
+
+
+def findtests(*, testdir: StrPath | None = None, exclude=(),
+ split_test_dirs: set[TestName] = SPLITTESTDIRS,
+ base_mod: str = "") -> TestList:
+ """Return a list of all applicable test modules."""
+ testdir = findtestdir(testdir)
+ tests = []
+ for name in os.listdir(testdir):
+ mod, ext = os.path.splitext(name)
+ if (not mod.startswith("test_")) or (mod in exclude):
+ continue
+ if base_mod:
+ fullname = f"{base_mod}.{mod}"
+ else:
+ fullname = mod
+ if fullname in split_test_dirs:
+ subdir = os.path.join(testdir, mod)
+ if not base_mod:
+ fullname = f"test.{mod}"
+ tests.extend(findtests(testdir=subdir, exclude=exclude,
+ split_test_dirs=split_test_dirs,
+ base_mod=fullname))
+ elif ext in (".py", ""):
+ tests.append(fullname)
+ return sorted(tests)
+
+
+def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(),
+ split_test_dirs=SPLITTESTDIRS):
+ testdir = findtestdir(testdir)
+ splitted = []
+ for name in tests:
+ if name in split_test_dirs:
+ subdir = os.path.join(testdir, name)
+ splitted.extend(findtests(testdir=subdir, exclude=exclude,
+ split_test_dirs=split_test_dirs,
+ base_mod=name))
+ else:
+ splitted.append(name)
+ return splitted
+
+
+def _list_cases(suite):
+ for test in suite:
+ if isinstance(test, unittest.loader._FailedTest):
+ continue
+ if isinstance(test, unittest.TestSuite):
+ _list_cases(test)
+ elif isinstance(test, unittest.TestCase):
+ if support.match_test(test):
+ print(test.id())
+
+def list_cases(tests: TestTuple, *,
+ match_tests: FilterTuple | None = None,
+ ignore_tests: FilterTuple | None = None,
+ test_dir: StrPath | None = None):
+ support.verbose = False
+ support.set_match_tests(match_tests, ignore_tests)
+
+ skipped = []
+ for test_name in tests:
+ module_name = abs_module_name(test_name, test_dir)
+ try:
+ suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
+ _list_cases(suite)
+ except unittest.SkipTest:
+ skipped.append(test_name)
+
+ if skipped:
+ sys.stdout.flush()
+ stderr = sys.stderr
+ print(file=stderr)
+ print(count(len(skipped), "test"), "skipped:", file=stderr)
+ printlist(skipped, file=stderr)
diff --git a/Lib/test/libregrtest/logger.py b/Lib/test/libregrtest/logger.py
new file mode 100644
index 0000000..a125706
--- /dev/null
+++ b/Lib/test/libregrtest/logger.py
@@ -0,0 +1,86 @@
+import os
+import time
+
+from test.support import MS_WINDOWS
+from .results import TestResults
+from .runtests import RunTests
+from .utils import print_warning
+
+if MS_WINDOWS:
+ from .win_utils import WindowsLoadTracker
+
+
+class Logger:
+ def __init__(self, results: TestResults, quiet: bool, pgo: bool):
+ self.start_time = time.perf_counter()
+ self.test_count_text = ''
+ self.test_count_width = 3
+ self.win_load_tracker: WindowsLoadTracker | None = None
+ self._results: TestResults = results
+ self._quiet: bool = quiet
+ self._pgo: bool = pgo
+
+ def log(self, line: str = '') -> None:
+ empty = not line
+
+ # add the system load prefix: "load avg: 1.80 "
+ load_avg = self.get_load_avg()
+ if load_avg is not None:
+ line = f"load avg: {load_avg:.2f} {line}"
+
+ # add the timestamp prefix: "0:01:05 "
+ log_time = time.perf_counter() - self.start_time
+
+ mins, secs = divmod(int(log_time), 60)
+ hours, mins = divmod(mins, 60)
+ formatted_log_time = "%d:%02d:%02d" % (hours, mins, secs)
+
+ line = f"{formatted_log_time} {line}"
+ if empty:
+ line = line[:-1]
+
+ print(line, flush=True)
+
+ def get_load_avg(self) -> float | None:
+ if hasattr(os, 'getloadavg'):
+ return os.getloadavg()[0]
+ if self.win_load_tracker is not None:
+ return self.win_load_tracker.getloadavg()
+ return None
+
+ def display_progress(self, test_index: int, text: str) -> None:
+ if self._quiet:
+ return
+ results = self._results
+
+ # "[ 51/405/1] test_tcl passed"
+ line = f"{test_index:{self.test_count_width}}{self.test_count_text}"
+ fails = len(results.bad) + len(results.env_changed)
+ if fails and not self._pgo:
+ line = f"{line}/{fails}"
+ self.log(f"[{line}] {text}")
+
+ def set_tests(self, runtests: RunTests) -> None:
+ if runtests.forever:
+ self.test_count_text = ''
+ self.test_count_width = 3
+ else:
+ self.test_count_text = '/{}'.format(len(runtests.tests))
+ self.test_count_width = len(self.test_count_text) - 1
+
+ def start_load_tracker(self) -> None:
+ if not MS_WINDOWS:
+ return
+
+ try:
+ self.win_load_tracker = WindowsLoadTracker()
+ except PermissionError as error:
+ # Standard accounts may not have access to the performance
+ # counters.
+ print_warning(f'Failed to create WindowsLoadTracker: {error}')
+
+ def stop_load_tracker(self) -> None:
+ if self.win_load_tracker is None:
+ return
+ self.win_load_tracker.close()
+ self.win_load_tracker = None
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index 7192244..fe35df0 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -1,45 +1,30 @@
-import faulthandler
-import locale
import os
-import platform
import random
import re
+import shlex
import sys
import sysconfig
-import tempfile
import time
-import unittest
-from test.libregrtest.cmdline import _parse_args
-from test.libregrtest.runtest import (
- findtests, split_test_packages, runtest, get_abs_module,
- PROGRESS_MIN_TIME, State)
-from test.libregrtest.setup import setup_tests
-from test.libregrtest.pgo import setup_pgo_tests
-from test.libregrtest.utils import (removepy, count, format_duration,
- printlist, get_build_info)
-from test import support
-from test.support import TestStats
-from test.support import os_helper
-from test.support import threading_helper
-
-
-# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
-# Used to protect against threading._shutdown() hang.
-# Must be smaller than buildbot "1200 seconds without output" limit.
-EXIT_TIMEOUT = 120.0
-# gh-90681: When rerunning tests, we might need to rerun the whole
-# class or module suite if some its life-cycle hooks fail.
-# Test level hooks are not affected.
-_TEST_LIFECYCLE_HOOKS = frozenset((
- 'setUpClass', 'tearDownClass',
- 'setUpModule', 'tearDownModule',
-))
-
-EXITCODE_BAD_TEST = 2
-EXITCODE_INTERRUPTED = 130
-EXITCODE_ENV_CHANGED = 3
-EXITCODE_NO_TESTS_RAN = 4
+from test import support
+from test.support import os_helper, MS_WINDOWS
+
+from .cmdline import _parse_args, Namespace
+from .findtests import findtests, split_test_packages, list_cases
+from .logger import Logger
+from .pgo import setup_pgo_tests
+from .result import State
+from .results import TestResults, EXITCODE_INTERRUPTED
+from .runtests import RunTests, HuntRefleak
+from .setup import setup_process, setup_test_dir
+from .single import run_single_test, PROGRESS_MIN_TIME
+from .utils import (
+ StrPath, StrJSON, TestName, TestList, TestTuple, FilterTuple,
+ strip_py_suffix, count, format_duration,
+ printlist, get_temp_dir, get_work_dir, exit_timeout,
+ display_header, cleanup_temp_dir, print_warning,
+ is_cross_compiled, get_host_runner, process_cpu_count,
+ EXIT_TIMEOUT)
class Regrtest:
@@ -65,266 +50,212 @@ class Regrtest:
directly to set the values that would normally be set by flags
on the command line.
"""
- def __init__(self):
- # Namespace of command line options
- self.ns = None
+ def __init__(self, ns: Namespace, _add_python_opts: bool = False):
+ # Log verbosity
+ self.verbose: int = int(ns.verbose)
+ self.quiet: bool = ns.quiet
+ self.pgo: bool = ns.pgo
+ self.pgo_extended: bool = ns.pgo_extended
+
+ # Test results
+ self.results: TestResults = TestResults()
+ self.first_state: str | None = None
+
+ # Logger
+ self.logger = Logger(self.results, self.quiet, self.pgo)
+
+ # Actions
+ self.want_header: bool = ns.header
+ self.want_list_tests: bool = ns.list_tests
+ self.want_list_cases: bool = ns.list_cases
+ self.want_wait: bool = ns.wait
+ self.want_cleanup: bool = ns.cleanup
+ self.want_rerun: bool = ns.rerun
+ self.want_run_leaks: bool = ns.runleaks
+
+ self.ci_mode: bool = (ns.fast_ci or ns.slow_ci)
+ self.want_add_python_opts: bool = (_add_python_opts
+ and ns._add_python_opts)
+
+ # Select tests
+ if ns.match_tests:
+ self.match_tests: FilterTuple | None = tuple(ns.match_tests)
+ else:
+ self.match_tests = None
+ if ns.ignore_tests:
+ self.ignore_tests: FilterTuple | None = tuple(ns.ignore_tests)
+ else:
+ self.ignore_tests = None
+ self.exclude: bool = ns.exclude
+ self.fromfile: StrPath | None = ns.fromfile
+ self.starting_test: TestName | None = ns.start
+ self.cmdline_args: TestList = ns.args
+
+ # Workers
+ if ns.use_mp is None:
+ num_workers = 0 # run sequentially
+ elif ns.use_mp <= 0:
+ num_workers = -1 # use the number of CPUs
+ else:
+ num_workers = ns.use_mp
+ self.num_workers: int = num_workers
+ self.worker_json: StrJSON | None = ns.worker_json
+
+ # Options to run tests
+ self.fail_fast: bool = ns.failfast
+ self.fail_env_changed: bool = ns.fail_env_changed
+ self.fail_rerun: bool = ns.fail_rerun
+ self.forever: bool = ns.forever
+ self.output_on_failure: bool = ns.verbose3
+ self.timeout: float | None = ns.timeout
+ if ns.huntrleaks:
+ warmups, runs, filename = ns.huntrleaks
+ filename = os.path.abspath(filename)
+ self.hunt_refleak: HuntRefleak | None = HuntRefleak(warmups, runs, filename)
+ else:
+ self.hunt_refleak = None
+ self.test_dir: StrPath | None = ns.testdir
+ self.junit_filename: StrPath | None = ns.xmlpath
+ self.memory_limit: str | None = ns.memlimit
+ self.gc_threshold: int | None = ns.threshold
+ self.use_resources: tuple[str, ...] = tuple(ns.use_resources)
+ if ns.python:
+ self.python_cmd: tuple[str, ...] | None = tuple(ns.python)
+ else:
+ self.python_cmd = None
+ self.coverage: bool = ns.trace
+ self.coverage_dir: StrPath | None = ns.coverdir
+ self.tmp_dir: StrPath | None = ns.tempdir
+
+ # Randomize
+ self.randomize: bool = ns.randomize
+ self.random_seed: int | None = (
+ ns.random_seed
+ if ns.random_seed is not None
+ else random.getrandbits(32)
+ )
+ if 'SOURCE_DATE_EPOCH' in os.environ:
+ self.randomize = False
+ self.random_seed = None
# tests
- self.tests = []
- self.selected = []
-
- # test results
- self.good = []
- self.bad = []
- self.skipped = []
- self.resource_denied = []
- self.environment_changed = []
- self.run_no_tests = []
- self.need_rerun = []
- self.rerun = []
- self.first_result = None
- self.interrupted = False
- self.stats_dict: dict[str, TestStats] = {}
-
- # used by --slow
- self.test_times = []
-
- # used by --coverage, trace.Trace instance
- self.tracer = None
+ self.first_runtests: RunTests | None = None
+
+ # used by --slowest
+ self.print_slowest: bool = ns.print_slow
# used to display the progress bar "[ 3/100]"
self.start_time = time.perf_counter()
- self.test_count = ''
- self.test_count_width = 1
# used by --single
- self.next_single_test = None
- self.next_single_filename = None
-
- # used by --junit-xml
- self.testsuite_xml = None
-
- # misc
- self.win_load_tracker = None
- self.tmp_dir = None
- self.worker_test_name = None
-
- def get_executed(self):
- return (set(self.good) | set(self.bad) | set(self.skipped)
- | set(self.resource_denied) | set(self.environment_changed)
- | set(self.run_no_tests))
-
- def accumulate_result(self, result, rerun=False):
- test_name = result.test_name
-
- if result.has_meaningful_duration() and not rerun:
- self.test_times.append((result.duration, test_name))
-
- match result.state:
- case State.PASSED:
- self.good.append(test_name)
- case State.ENV_CHANGED:
- self.environment_changed.append(test_name)
- case State.SKIPPED:
- self.skipped.append(test_name)
- case State.RESOURCE_DENIED:
- self.skipped.append(test_name)
- self.resource_denied.append(test_name)
- case State.INTERRUPTED:
- self.interrupted = True
- case State.DID_NOT_RUN:
- self.run_no_tests.append(test_name)
- case _:
- if result.is_failed(self.ns.fail_env_changed):
- if not rerun:
- self.bad.append(test_name)
- self.need_rerun.append(result)
- else:
- raise ValueError(f"invalid test state: {state!r}")
-
- if result.stats is not None:
- self.stats_dict[result.test_name] = result.stats
-
- if rerun and not(result.is_failed(False) or result.state == State.INTERRUPTED):
- self.bad.remove(test_name)
-
- xml_data = result.xml_data
- if xml_data:
- import xml.etree.ElementTree as ET
- for e in xml_data:
- try:
- self.testsuite_xml.append(ET.fromstring(e))
- except ET.ParseError:
- print(xml_data, file=sys.__stderr__)
- raise
+ self.single_test_run: bool = ns.single
+ self.next_single_test: TestName | None = None
+ self.next_single_filename: StrPath | None = None
def log(self, line=''):
- empty = not line
-
- # add the system load prefix: "load avg: 1.80 "
- load_avg = self.getloadavg()
- if load_avg is not None:
- line = f"load avg: {load_avg:.2f} {line}"
-
- # add the timestamp prefix: "0:01:05 "
- test_time = time.perf_counter() - self.start_time
-
- mins, secs = divmod(int(test_time), 60)
- hours, mins = divmod(mins, 60)
- test_time = "%d:%02d:%02d" % (hours, mins, secs)
-
- line = f"{test_time} {line}"
- if empty:
- line = line[:-1]
-
- print(line, flush=True)
-
- def display_progress(self, test_index, text):
- if self.ns.quiet:
- return
-
- # "[ 51/405/1] test_tcl passed"
- line = f"{test_index:{self.test_count_width}}{self.test_count}"
- fails = len(self.bad) + len(self.environment_changed)
- if fails and not self.ns.pgo:
- line = f"{line}/{fails}"
- self.log(f"[{line}] {text}")
-
- def parse_args(self, kwargs):
- ns = _parse_args(sys.argv[1:], **kwargs)
-
- if ns.xmlpath:
- support.junit_xml_list = self.testsuite_xml = []
-
- worker_args = ns.worker_args
- if worker_args is not None:
- from test.libregrtest.runtest_mp import parse_worker_args
- ns, test_name = parse_worker_args(ns.worker_args)
- ns.worker_args = worker_args
- self.worker_test_name = test_name
-
- # Strip .py extensions.
- removepy(ns.args)
+ self.logger.log(line)
- if ns.huntrleaks:
- warmup, repetitions, _ = ns.huntrleaks
- if warmup < 1 or repetitions < 1:
- msg = ("Invalid values for the --huntrleaks/-R parameters. The "
- "number of warmups and repetitions must be at least 1 "
- "each (1:1).")
- print(msg, file=sys.stderr, flush=True)
- sys.exit(2)
-
- if ns.tempdir:
- ns.tempdir = os.path.expanduser(ns.tempdir)
-
- self.ns = ns
-
- def find_tests(self, tests):
- self.tests = tests
-
- if self.ns.single:
+ def find_tests(self, tests: TestList | None = None) -> tuple[TestTuple, TestList | None]:
+ if self.single_test_run:
self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
try:
with open(self.next_single_filename, 'r') as fp:
next_test = fp.read().strip()
- self.tests = [next_test]
+ tests = [next_test]
except OSError:
pass
- if self.ns.fromfile:
- self.tests = []
+ if self.fromfile:
+ tests = []
# regex to match 'test_builtin' in line:
# '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec'
regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
- with open(os.path.join(os_helper.SAVEDCWD, self.ns.fromfile)) as fp:
+ with open(os.path.join(os_helper.SAVEDCWD, self.fromfile)) as fp:
for line in fp:
line = line.split('#', 1)[0]
line = line.strip()
match = regex.search(line)
if match is not None:
- self.tests.append(match.group())
+ tests.append(match.group())
- removepy(self.tests)
+ strip_py_suffix(tests)
- if self.ns.pgo:
+ if self.pgo:
# add default PGO tests if no tests are specified
- setup_pgo_tests(self.ns)
+ setup_pgo_tests(self.cmdline_args, self.pgo_extended)
- exclude = set()
- if self.ns.exclude:
- for arg in self.ns.args:
- exclude.add(arg)
- self.ns.args = []
+ exclude_tests = set()
+ if self.exclude:
+ for arg in self.cmdline_args:
+ exclude_tests.add(arg)
+ self.cmdline_args = []
- alltests = findtests(testdir=self.ns.testdir, exclude=exclude)
+ alltests = findtests(testdir=self.test_dir,
+ exclude=exclude_tests)
- if not self.ns.fromfile:
- self.selected = self.tests or self.ns.args
- if self.selected:
- self.selected = split_test_packages(self.selected)
+ if not self.fromfile:
+ selected = tests or self.cmdline_args
+ if selected:
+ selected = split_test_packages(selected)
else:
- self.selected = alltests
+ selected = alltests
else:
- self.selected = self.tests
+ selected = tests
- if self.ns.single:
- self.selected = self.selected[:1]
+ if self.single_test_run:
+ selected = selected[:1]
try:
- pos = alltests.index(self.selected[0])
+ pos = alltests.index(selected[0])
self.next_single_test = alltests[pos + 1]
except IndexError:
pass
# Remove all the selected tests that precede start if it's set.
- if self.ns.start:
+ if self.starting_test:
try:
- del self.selected[:self.selected.index(self.ns.start)]
+ del selected[:selected.index(self.starting_test)]
except ValueError:
- print("Couldn't find starting test (%s), using all tests"
- % self.ns.start, file=sys.stderr)
-
- if self.ns.randomize:
- if self.ns.random_seed is None:
- self.ns.random_seed = random.randrange(10000000)
- random.seed(self.ns.random_seed)
- random.shuffle(self.selected)
+ print(f"Cannot find starting test: {self.starting_test}")
+ sys.exit(1)
- def list_tests(self):
- for name in self.selected:
- print(name)
-
- def _list_cases(self, suite):
- for test in suite:
- if isinstance(test, unittest.loader._FailedTest):
- continue
- if isinstance(test, unittest.TestSuite):
- self._list_cases(test)
- elif isinstance(test, unittest.TestCase):
- if support.match_test(test):
- print(test.id())
-
- def list_cases(self):
- support.verbose = False
- support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests)
-
- for test_name in self.selected:
- abstest = get_abs_module(self.ns, test_name)
- try:
- suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
- self._list_cases(suite)
- except unittest.SkipTest:
- self.skipped.append(test_name)
+ random.seed(self.random_seed)
+ if self.randomize:
+ random.shuffle(selected)
- if self.skipped:
- print(file=sys.stderr)
- print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr)
- printlist(self.skipped, file=sys.stderr)
+ return (tuple(selected), tests)
- def rerun_failed_tests(self):
- self.log()
+ @staticmethod
+ def list_tests(tests: TestTuple):
+ for name in tests:
+ print(name)
- if self.ns.python:
+ def _rerun_failed_tests(self, runtests: RunTests):
+ # Configure the runner to re-run tests
+ if self.num_workers == 0:
+ # Always run tests in fresh processes to have more deterministic
+ # initial state. Don't re-run tests in parallel but limit to a
+ # single worker process to have side effects (on the system load
+ # and timings) between tests.
+ self.num_workers = 1
+
+ tests, match_tests_dict = self.results.prepare_rerun()
+
+ # Re-run failed tests
+ self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses")
+ runtests = runtests.copy(
+ tests=tests,
+ rerun=True,
+ verbose=True,
+ forever=False,
+ fail_fast=False,
+ match_tests_dict=match_tests_dict,
+ output_on_failure=False)
+ self.logger.set_tests(runtests)
+ self._run_tests_mp(runtests, self.num_workers)
+ return runtests
+
+ def rerun_failed_tests(self, runtests: RunTests):
+ if self.python_cmd:
# Temp patch for https://github.com/python/cpython/issues/94052
self.log(
"Re-running failed tests is not supported with --python "
@@ -332,160 +263,81 @@ class Regrtest:
)
return
- self.ns.verbose = True
- self.ns.failfast = False
- self.ns.verbose3 = False
-
- self.first_result = self.get_tests_result()
-
- self.log("Re-running failed tests in verbose mode")
- rerun_list = list(self.need_rerun)
- self.need_rerun.clear()
- for result in rerun_list:
- test_name = result.test_name
- self.rerun.append(test_name)
-
- errors = result.errors or []
- failures = result.failures or []
- error_names = [
- self.normalize_test_name(test_full_name, is_error=True)
- for (test_full_name, *_) in errors]
- failure_names = [
- self.normalize_test_name(test_full_name)
- for (test_full_name, *_) in failures]
- self.ns.verbose = True
- orig_match_tests = self.ns.match_tests
- if errors or failures:
- if self.ns.match_tests is None:
- self.ns.match_tests = []
- self.ns.match_tests.extend(error_names)
- self.ns.match_tests.extend(failure_names)
- matching = "matching: " + ", ".join(self.ns.match_tests)
- self.log(f"Re-running {test_name} in verbose mode ({matching})")
- else:
- self.log(f"Re-running {test_name} in verbose mode")
- result = runtest(self.ns, test_name)
- self.ns.match_tests = orig_match_tests
+ self.first_state = self.get_state()
- self.accumulate_result(result, rerun=True)
+ print()
+ rerun_runtests = self._rerun_failed_tests(runtests)
- if result.state == State.INTERRUPTED:
- break
+ if self.results.bad:
+ print(count(len(self.results.bad), 'test'), "failed again:")
+ printlist(self.results.bad)
+
+ self.display_result(rerun_runtests)
- if self.bad:
- print(count(len(self.bad), 'test'), "failed again:")
- printlist(self.bad)
-
- self.display_result()
-
- def normalize_test_name(self, test_full_name, *, is_error=False):
- short_name = test_full_name.split(" ")[0]
- if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
- # This means that we have a failure in a life-cycle hook,
- # we need to rerun the whole module or class suite.
- # Basically the error looks like this:
- # ERROR: setUpClass (test.test_reg_ex.RegTest)
- # or
- # ERROR: setUpModule (test.test_reg_ex)
- # So, we need to parse the class / module name.
- lpar = test_full_name.index('(')
- rpar = test_full_name.index(')')
- return test_full_name[lpar + 1: rpar].split('.')[-1]
- return short_name
-
- def display_result(self):
+ def display_result(self, runtests):
# If running the test suite for PGO then no one cares about results.
- if self.ns.pgo:
+ if runtests.pgo:
return
+ state = self.get_state()
print()
- print("== Tests result: %s ==" % self.get_tests_result())
-
- if self.interrupted:
- print("Test suite interrupted by signal SIGINT.")
-
- omitted = set(self.selected) - self.get_executed()
- if omitted:
- print()
- print(count(len(omitted), "test"), "omitted:")
- printlist(omitted)
-
- if self.good and not self.ns.quiet:
- print()
- if (not self.bad
- and not self.skipped
- and not self.interrupted
- and len(self.good) > 1):
- print("All", end=' ')
- print(count(len(self.good), "test"), "OK.")
-
- if self.ns.print_slow:
- self.test_times.sort(reverse=True)
- print()
- print("10 slowest tests:")
- for test_time, test in self.test_times[:10]:
- print("- %s: %s" % (test, format_duration(test_time)))
-
- if self.bad:
- print()
- print(count(len(self.bad), "test"), "failed:")
- printlist(self.bad)
-
- if self.environment_changed:
- print()
- print("{} altered the execution environment:".format(
- count(len(self.environment_changed), "test")))
- printlist(self.environment_changed)
-
- if self.skipped and not self.ns.quiet:
- print()
- print(count(len(self.skipped), "test"), "skipped:")
- printlist(self.skipped)
-
- if self.rerun:
- print()
- print("%s:" % count(len(self.rerun), "re-run test"))
- printlist(self.rerun)
-
- if self.run_no_tests:
- print()
- print(count(len(self.run_no_tests), "test"), "run no tests:")
- printlist(self.run_no_tests)
-
- def run_tests_sequential(self):
- if self.ns.trace:
+ print(f"== Tests result: {state} ==")
+
+ self.results.display_result(runtests.tests,
+ self.quiet, self.print_slowest)
+
+ def run_test(self, test_name: TestName, runtests: RunTests, tracer):
+ if tracer is not None:
+ # If we're tracing code coverage, then we don't exit with status
+ # if on a false return value from main.
+ cmd = ('result = run_single_test(test_name, runtests)')
+ namespace = dict(locals())
+ tracer.runctx(cmd, globals=globals(), locals=namespace)
+ result = namespace['result']
+ else:
+ result = run_single_test(test_name, runtests)
+
+ self.results.accumulate_result(result, runtests)
+
+ return result
+
+ def run_tests_sequentially(self, runtests):
+ if self.coverage:
import trace
- self.tracer = trace.Trace(trace=False, count=True)
+ tracer = trace.Trace(trace=False, count=True)
+ else:
+ tracer = None
save_modules = sys.modules.keys()
- msg = "Run tests sequentially"
- if self.ns.timeout:
- msg += " (timeout: %s)" % format_duration(self.ns.timeout)
+ jobs = runtests.get_jobs()
+ if jobs is not None:
+ tests = count(jobs, 'test')
+ else:
+ tests = 'tests'
+ msg = f"Run {tests} sequentially"
+ if runtests.timeout:
+ msg += " (timeout: %s)" % format_duration(runtests.timeout)
self.log(msg)
previous_test = None
- for test_index, test_name in enumerate(self.tests, 1):
+ tests_iter = runtests.iter_tests()
+ for test_index, test_name in enumerate(tests_iter, 1):
start_time = time.perf_counter()
text = test_name
if previous_test:
text = '%s -- %s' % (text, previous_test)
- self.display_progress(test_index, text)
-
- if self.tracer:
- # If we're tracing code coverage, then we don't exit with status
- # if on a false return value from main.
- cmd = ('result = runtest(self.ns, test_name); '
- 'self.accumulate_result(result)')
- ns = dict(locals())
- self.tracer.runctx(cmd, globals=globals(), locals=ns)
- result = ns['result']
- else:
- result = runtest(self.ns, test_name)
- self.accumulate_result(result)
+ self.logger.display_progress(test_index, text)
+
+ result = self.run_test(test_name, runtests, tracer)
- if result.state == State.INTERRUPTED:
+ # Unload the newly imported modules (best effort finalization)
+ for module in sys.modules.keys():
+ if module not in save_modules and module.startswith("test."):
+ support.unload(module)
+
+ if result.must_stop(self.fail_fast, self.fail_env_changed):
break
previous_test = str(result)
@@ -496,140 +348,22 @@ class Regrtest:
# be quiet: say nothing if the test passed shortly
previous_test = None
- # Unload the newly imported modules (best effort finalization)
- for module in sys.modules.keys():
- if module not in save_modules and module.startswith("test."):
- support.unload(module)
-
- if self.ns.failfast and result.is_failed(self.ns.fail_env_changed):
- break
-
if previous_test:
print(previous_test)
- def _test_forever(self, tests):
- while True:
- for test_name in tests:
- yield test_name
- if self.bad:
- return
- if self.ns.fail_env_changed and self.environment_changed:
- return
-
- def display_header(self):
- # Print basic platform information
- print("==", platform.python_implementation(), *sys.version.split())
- print("==", platform.platform(aliased=True),
- "%s-endian" % sys.byteorder)
- print("== Python build:", ' '.join(get_build_info()))
- print("== cwd:", os.getcwd())
- cpu_count = os.cpu_count()
- if cpu_count:
- print("== CPU count:", cpu_count)
- print("== encodings: locale=%s, FS=%s"
- % (locale.getencoding(), sys.getfilesystemencoding()))
- self.display_sanitizers()
-
- def display_sanitizers(self):
- # This makes it easier to remember what to set in your local
- # environment when trying to reproduce a sanitizer failure.
- asan = support.check_sanitizer(address=True)
- msan = support.check_sanitizer(memory=True)
- ubsan = support.check_sanitizer(ub=True)
- sanitizers = []
- if asan:
- sanitizers.append("address")
- if msan:
- sanitizers.append("memory")
- if ubsan:
- sanitizers.append("undefined behavior")
- if not sanitizers:
- return
-
- print(f"== sanitizers: {', '.join(sanitizers)}")
- for sanitizer, env_var in (
- (asan, "ASAN_OPTIONS"),
- (msan, "MSAN_OPTIONS"),
- (ubsan, "UBSAN_OPTIONS"),
- ):
- options= os.environ.get(env_var)
- if sanitizer and options is not None:
- print(f"== {env_var}={options!r}")
-
- def no_tests_run(self):
- return not any((self.good, self.bad, self.skipped, self.interrupted,
- self.environment_changed))
-
- def get_tests_result(self):
- result = []
- if self.bad:
- result.append("FAILURE")
- elif self.ns.fail_env_changed and self.environment_changed:
- result.append("ENV CHANGED")
- elif self.no_tests_run():
- result.append("NO TESTS RAN")
-
- if self.interrupted:
- result.append("INTERRUPTED")
-
- if not result:
- result.append("SUCCESS")
-
- result = ', '.join(result)
- if self.first_result:
- result = '%s then %s' % (self.first_result, result)
- return result
+ return tracer
- def run_tests(self):
- # For a partial run, we do not need to clutter the output.
- if (self.ns.header
- or not(self.ns.pgo or self.ns.quiet or self.ns.single
- or self.tests or self.ns.args)):
- self.display_header()
-
- if self.ns.huntrleaks:
- warmup, repetitions, _ = self.ns.huntrleaks
- if warmup < 3:
- msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
- "3 warmup repetitions can give false positives!")
- print(msg, file=sys.stdout, flush=True)
-
- if self.ns.randomize:
- print("Using random seed", self.ns.random_seed)
-
- if self.ns.forever:
- self.tests = self._test_forever(list(self.selected))
- self.test_count = ''
- self.test_count_width = 3
- else:
- self.tests = iter(self.selected)
- self.test_count = '/{}'.format(len(self.selected))
- self.test_count_width = len(self.test_count) - 1
-
- if self.ns.use_mp:
- from test.libregrtest.runtest_mp import run_tests_multiprocess
- # If we're on windows and this is the parent runner (not a worker),
- # track the load average.
- if sys.platform == 'win32' and self.worker_test_name is None:
- from test.libregrtest.win_utils import WindowsLoadTracker
-
- try:
- self.win_load_tracker = WindowsLoadTracker()
- except PermissionError as error:
- # Standard accounts may not have access to the performance
- # counters.
- print(f'Failed to create WindowsLoadTracker: {error}')
+ def get_state(self):
+ state = self.results.get_state(self.fail_env_changed)
+ if self.first_state:
+ state = f'{self.first_state} then {state}'
+ return state
- try:
- run_tests_multiprocess(self)
- finally:
- if self.win_load_tracker is not None:
- self.win_load_tracker.close()
- self.win_load_tracker = None
- else:
- self.run_tests_sequential()
+ def _run_tests_mp(self, runtests: RunTests, num_workers: int) -> None:
+ from .run_workers import RunWorkers
+ RunWorkers(num_workers, runtests, self.logger, self.results).run()
- def finalize(self):
+ def finalize_tests(self, tracer):
if self.next_single_filename:
if self.next_single_test:
with open(self.next_single_filename, 'w') as fp:
@@ -637,232 +371,299 @@ class Regrtest:
else:
os.unlink(self.next_single_filename)
- if self.tracer:
- r = self.tracer.results()
- r.write_results(show_missing=True, summary=True,
- coverdir=self.ns.coverdir)
-
- print()
- self.display_summary()
+ if tracer is not None:
+ results = tracer.results()
+ results.write_results(show_missing=True, summary=True,
+ coverdir=self.coverage_dir)
- if self.ns.runleaks:
+ if self.want_run_leaks:
os.system("leaks %d" % os.getpid())
+ if self.junit_filename:
+ self.results.write_junit(self.junit_filename)
+
def display_summary(self):
- duration = time.perf_counter() - self.start_time
+ duration = time.perf_counter() - self.logger.start_time
+ filtered = bool(self.match_tests) or bool(self.ignore_tests)
# Total duration
+ print()
print("Total duration: %s" % format_duration(duration))
- # Total tests
- total = TestStats()
- for stats in self.stats_dict.values():
- total.accumulate(stats)
- stats = [f'run={total.tests_run:,}']
- if total.failures:
- stats.append(f'failures={total.failures:,}')
- if total.skipped:
- stats.append(f'skipped={total.skipped:,}')
- print(f"Total tests: {' '.join(stats)}")
-
- # Total test files
- report = [f'success={len(self.good)}']
- if self.bad:
- report.append(f'failed={len(self.bad)}')
- if self.environment_changed:
- report.append(f'env_changed={len(self.environment_changed)}')
- if self.skipped:
- report.append(f'skipped={len(self.skipped)}')
- if self.resource_denied:
- report.append(f'resource_denied={len(self.resource_denied)}')
- if self.rerun:
- report.append(f'rerun={len(self.rerun)}')
- if self.run_no_tests:
- report.append(f'run_no_tests={len(self.run_no_tests)}')
- print(f"Total test files: {' '.join(report)}")
+ self.results.display_summary(self.first_runtests, filtered)
# Result
- result = self.get_tests_result()
- print(f"Result: {result}")
+ state = self.get_state()
+ print(f"Result: {state}")
+
+ def create_run_tests(self, tests: TestTuple):
+ return RunTests(
+ tests,
+ fail_fast=self.fail_fast,
+ fail_env_changed=self.fail_env_changed,
+ match_tests=self.match_tests,
+ ignore_tests=self.ignore_tests,
+ match_tests_dict=None,
+ rerun=False,
+ forever=self.forever,
+ pgo=self.pgo,
+ pgo_extended=self.pgo_extended,
+ output_on_failure=self.output_on_failure,
+ timeout=self.timeout,
+ verbose=self.verbose,
+ quiet=self.quiet,
+ hunt_refleak=self.hunt_refleak,
+ test_dir=self.test_dir,
+ use_junit=(self.junit_filename is not None),
+ memory_limit=self.memory_limit,
+ gc_threshold=self.gc_threshold,
+ use_resources=self.use_resources,
+ python_cmd=self.python_cmd,
+ randomize=self.randomize,
+ random_seed=self.random_seed,
+ json_file=None,
+ )
+
+ def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
+ if self.hunt_refleak and self.hunt_refleak.warmups < 3:
+ msg = ("WARNING: Running tests with --huntrleaks/-R and "
+ "less than 3 warmup repetitions can give false positives!")
+ print(msg, file=sys.stdout, flush=True)
+
+ if self.num_workers < 0:
+ # Use all CPUs + 2 extra worker processes for tests
+ # that like to sleep
+ self.num_workers = (process_cpu_count() or 1) + 2
- def save_xml_result(self):
- if not self.ns.xmlpath and not self.testsuite_xml:
- return
+ # For a partial run, we do not need to clutter the output.
+ if (self.want_header
+ or not(self.pgo or self.quiet or self.single_test_run
+ or tests or self.cmdline_args)):
+ display_header(self.use_resources, self.python_cmd)
- import xml.etree.ElementTree as ET
- root = ET.Element("testsuites")
-
- # Manually count the totals for the overall summary
- totals = {'tests': 0, 'errors': 0, 'failures': 0}
- for suite in self.testsuite_xml:
- root.append(suite)
- for k in totals:
- try:
- totals[k] += int(suite.get(k, 0))
- except ValueError:
- pass
-
- for k, v in totals.items():
- root.set(k, str(v))
-
- xmlpath = os.path.join(os_helper.SAVEDCWD, self.ns.xmlpath)
- with open(xmlpath, 'wb') as f:
- for s in ET.tostringlist(root):
- f.write(s)
-
- def fix_umask(self):
- if support.is_emscripten:
- # Emscripten has default umask 0o777, which breaks some tests.
- # see https://github.com/emscripten-core/emscripten/issues/17269
- old_mask = os.umask(0)
- if old_mask == 0o777:
- os.umask(0o027)
- else:
- os.umask(old_mask)
-
- def set_temp_dir(self):
- if self.ns.tempdir:
- self.tmp_dir = self.ns.tempdir
-
- if not self.tmp_dir:
- # When tests are run from the Python build directory, it is best practice
- # to keep the test files in a subfolder. This eases the cleanup of leftover
- # files using the "make distclean" command.
- if sysconfig.is_python_build():
- self.tmp_dir = sysconfig.get_config_var('abs_builddir')
- if self.tmp_dir is None:
- self.tmp_dir = sysconfig.get_config_var('abs_srcdir')
- if not self.tmp_dir:
- # gh-74470: On Windows, only srcdir is available. Using
- # abs_builddir mostly matters on UNIX when building
- # Python out of the source tree, especially when the
- # source tree is read only.
- self.tmp_dir = sysconfig.get_config_var('srcdir')
- self.tmp_dir = os.path.join(self.tmp_dir, 'build')
- else:
- self.tmp_dir = tempfile.gettempdir()
+ print("Using random seed", self.random_seed)
- self.tmp_dir = os.path.abspath(self.tmp_dir)
+ runtests = self.create_run_tests(selected)
+ self.first_runtests = runtests
+ self.logger.set_tests(runtests)
- def create_temp_dir(self):
- os.makedirs(self.tmp_dir, exist_ok=True)
+ setup_process()
- # Define a writable temp dir that will be used as cwd while running
- # the tests. The name of the dir includes the pid to allow parallel
- # testing (see the -j option).
- # Emscripten and WASI have stubbed getpid(), Emscripten has only
- # milisecond clock resolution. Use randint() instead.
- if sys.platform in {"emscripten", "wasi"}:
- nounce = random.randint(0, 1_000_000)
- else:
- nounce = os.getpid()
- if self.worker_test_name is not None:
- test_cwd = 'test_python_worker_{}'.format(nounce)
+ if self.hunt_refleak and not self.num_workers:
+ # gh-109739: WindowsLoadTracker thread interfers with refleak check
+ use_load_tracker = False
else:
- test_cwd = 'test_python_{}'.format(nounce)
- test_cwd += os_helper.FS_NONASCII
- test_cwd = os.path.join(self.tmp_dir, test_cwd)
- return test_cwd
-
- def cleanup(self):
- import glob
-
- path = os.path.join(glob.escape(self.tmp_dir), 'test_python_*')
- print("Cleanup %s directory" % self.tmp_dir)
- for name in glob.glob(path):
- if os.path.isdir(name):
- print("Remove directory: %s" % name)
- os_helper.rmtree(name)
+ # WindowsLoadTracker is only needed on Windows
+ use_load_tracker = MS_WINDOWS
+
+ if use_load_tracker:
+ self.logger.start_load_tracker()
+ try:
+ if self.num_workers:
+ self._run_tests_mp(runtests, self.num_workers)
+ tracer = None
else:
- print("Remove file: %s" % name)
- os_helper.unlink(name)
+ tracer = self.run_tests_sequentially(runtests)
- def main(self, tests=None, **kwargs):
- self.parse_args(kwargs)
+ self.display_result(runtests)
- self.set_temp_dir()
+ if self.want_rerun and self.results.need_rerun():
+ self.rerun_failed_tests(runtests)
+ finally:
+ if use_load_tracker:
+ self.logger.stop_load_tracker()
- self.fix_umask()
+ self.display_summary()
+ self.finalize_tests(tracer)
- if self.ns.cleanup:
- self.cleanup()
- sys.exit(0)
+ return self.results.get_exitcode(self.fail_env_changed,
+ self.fail_rerun)
- test_cwd = self.create_temp_dir()
+ def run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
+ os.makedirs(self.tmp_dir, exist_ok=True)
+ work_dir = get_work_dir(self.tmp_dir)
- try:
- # Run the tests in a context manager that temporarily changes the CWD
- # to a temporary and writable directory. If it's not possible to
- # create or change the CWD, the original CWD will be used.
+ # Put a timeout on Python exit
+ with exit_timeout():
+ # Run the tests in a context manager that temporarily changes the
+ # CWD to a temporary and writable directory. If it's not possible
+ # to create or change the CWD, the original CWD will be used.
# The original CWD is available from os_helper.SAVEDCWD.
- with os_helper.temp_cwd(test_cwd, quiet=True):
- # When using multiprocessing, worker processes will use test_cwd
- # as their parent temporary directory. So when the main process
- # exit, it removes also subdirectories of worker processes.
- self.ns.tempdir = test_cwd
-
- self._main(tests, kwargs)
- except SystemExit as exc:
- # bpo-38203: Python can hang at exit in Py_Finalize(), especially
- # on threading._shutdown() call: put a timeout
- if threading_helper.can_start_thread:
- faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
+ with os_helper.temp_cwd(work_dir, quiet=True):
+ # When using multiprocessing, worker processes will use
+ # work_dir as their parent temporary directory. So when the
+ # main process exit, it removes also subdirectories of worker
+ # processes.
+ return self._run_tests(selected, tests)
+
+ def _add_cross_compile_opts(self, regrtest_opts):
+ # WASM/WASI buildbot builders pass multiple PYTHON environment
+ # variables such as PYTHONPATH and _PYTHON_HOSTRUNNER.
+ keep_environ = bool(self.python_cmd)
+ environ = None
+
+ # Are we using cross-compilation?
+ cross_compile = is_cross_compiled()
+
+ # Get HOSTRUNNER
+ hostrunner = get_host_runner()
+
+ if cross_compile:
+ # emulate -E, but keep PYTHONPATH + cross compile env vars,
+ # so test executable can load correct sysconfigdata file.
+ keep = {
+ '_PYTHON_PROJECT_BASE',
+ '_PYTHON_HOST_PLATFORM',
+ '_PYTHON_SYSCONFIGDATA_NAME',
+ 'PYTHONPATH'
+ }
+ old_environ = os.environ
+ new_environ = {
+ name: value for name, value in os.environ.items()
+ if not name.startswith(('PYTHON', '_PYTHON')) or name in keep
+ }
+ # Only set environ if at least one variable was removed
+ if new_environ != old_environ:
+ environ = new_environ
+ keep_environ = True
+
+ if cross_compile and hostrunner:
+ if self.num_workers == 0:
+ # For now use only two cores for cross-compiled builds;
+ # hostrunner can be expensive.
+ regrtest_opts.extend(['-j', '2'])
+
+ # If HOSTRUNNER is set and -p/--python option is not given, then
+ # use hostrunner to execute python binary for tests.
+ if not self.python_cmd:
+ buildpython = sysconfig.get_config_var("BUILDPYTHON")
+ python_cmd = f"{hostrunner} {buildpython}"
+ regrtest_opts.extend(["--python", python_cmd])
+ keep_environ = True
+
+ return (environ, keep_environ)
+
+ def _add_ci_python_opts(self, python_opts, keep_environ):
+ # --fast-ci and --slow-ci add options to Python:
+ # "-u -W default -bb -E"
+
+ # Unbuffered stdout and stderr
+ if not sys.stdout.write_through:
+ python_opts.append('-u')
+
+ # Add warnings filter 'default'
+ if 'default' not in sys.warnoptions:
+ python_opts.extend(('-W', 'default'))
+
+ # Error on bytes/str comparison
+ if sys.flags.bytes_warning < 2:
+ python_opts.append('-bb')
+
+ if not keep_environ:
+ # Ignore PYTHON* environment variables
+ if not sys.flags.ignore_environment:
+ python_opts.append('-E')
+
+ def _execute_python(self, cmd, environ):
+ # Make sure that messages before execv() are logged
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ cmd_text = shlex.join(cmd)
+ try:
+ print(f"+ {cmd_text}", flush=True)
- sys.exit(exc.code)
+ if hasattr(os, 'execv') and not MS_WINDOWS:
+ os.execv(cmd[0], cmd)
+ # On success, execv() do no return.
+ # On error, it raises an OSError.
+ else:
+ import subprocess
+ with subprocess.Popen(cmd, env=environ) as proc:
+ try:
+ proc.wait()
+ except KeyboardInterrupt:
+ # There is no need to call proc.terminate(): on CTRL+C,
+ # SIGTERM is also sent to the child process.
+ try:
+ proc.wait(timeout=EXIT_TIMEOUT)
+ except subprocess.TimeoutExpired:
+ proc.kill()
+ proc.wait()
+ sys.exit(EXITCODE_INTERRUPTED)
+
+ sys.exit(proc.returncode)
+ except Exception as exc:
+ print_warning(f"Failed to change Python options: {exc!r}\n"
+ f"Command: {cmd_text}")
+ # continue executing main()
+
+ def _add_python_opts(self):
+ python_opts = []
+ regrtest_opts = []
+
+ environ, keep_environ = self._add_cross_compile_opts(regrtest_opts)
+ if self.ci_mode:
+ self._add_ci_python_opts(python_opts, keep_environ)
+
+ if (not python_opts) and (not regrtest_opts) and (environ is None):
+ # Nothing changed: nothing to do
+ return
- def getloadavg(self):
- if self.win_load_tracker is not None:
- return self.win_load_tracker.getloadavg()
+ # Create new command line
+ cmd = list(sys.orig_argv)
+ if python_opts:
+ cmd[1:1] = python_opts
+ if regrtest_opts:
+ cmd.extend(regrtest_opts)
+ cmd.append("--dont-add-python-opts")
- if hasattr(os, 'getloadavg'):
- return os.getloadavg()[0]
+ self._execute_python(cmd, environ)
- return None
+ def _init(self):
+ # Set sys.stdout encoder error handler to backslashreplace,
+ # similar to sys.stderr error handler, to avoid UnicodeEncodeError
+ # when printing a traceback or any other non-encodable character.
+ sys.stdout.reconfigure(errors="backslashreplace")
- def _main(self, tests, kwargs):
- if self.worker_test_name is not None:
- from test.libregrtest.runtest_mp import run_tests_worker
- run_tests_worker(self.ns, self.worker_test_name)
+ if self.junit_filename and not os.path.isabs(self.junit_filename):
+ self.junit_filename = os.path.abspath(self.junit_filename)
- if self.ns.wait:
- input("Press any key to continue...")
+ strip_py_suffix(self.cmdline_args)
- support.PGO = self.ns.pgo
- support.PGO_EXTENDED = self.ns.pgo_extended
+ self.tmp_dir = get_temp_dir(self.tmp_dir)
- setup_tests(self.ns)
+ def main(self, tests: TestList | None = None):
+ if self.want_add_python_opts:
+ self._add_python_opts()
- self.find_tests(tests)
+ self._init()
- if self.ns.list_tests:
- self.list_tests()
+ if self.want_cleanup:
+ cleanup_temp_dir(self.tmp_dir)
sys.exit(0)
- if self.ns.list_cases:
- self.list_cases()
- sys.exit(0)
-
- self.run_tests()
- self.display_result()
-
- if self.ns.verbose2 and self.bad:
- self.rerun_failed_tests()
-
- self.finalize()
+ if self.want_wait:
+ input("Press any key to continue...")
- self.save_xml_result()
+ setup_test_dir(self.test_dir)
+ selected, tests = self.find_tests(tests)
+
+ exitcode = 0
+ if self.want_list_tests:
+ self.list_tests(selected)
+ elif self.want_list_cases:
+ list_cases(selected,
+ match_tests=self.match_tests,
+ ignore_tests=self.ignore_tests,
+ test_dir=self.test_dir)
+ else:
+ exitcode = self.run_tests(selected, tests)
- if self.bad:
- sys.exit(EXITCODE_BAD_TEST)
- if self.interrupted:
- sys.exit(EXITCODE_INTERRUPTED)
- if self.ns.fail_env_changed and self.environment_changed:
- sys.exit(EXITCODE_ENV_CHANGED)
- if self.no_tests_run():
- sys.exit(EXITCODE_NO_TESTS_RAN)
- sys.exit(0)
+ sys.exit(exitcode)
-def main(tests=None, **kwargs):
+def main(tests=None, _add_python_opts=False, **kwargs):
"""Run the Python suite."""
- Regrtest().main(tests=tests, **kwargs)
+ ns = _parse_args(sys.argv[1:], **kwargs)
+ Regrtest(ns, _add_python_opts=_add_python_opts).main(tests=tests)
diff --git a/Lib/test/libregrtest/pgo.py b/Lib/test/libregrtest/pgo.py
index 42ce5fb..e3a6927 100644
--- a/Lib/test/libregrtest/pgo.py
+++ b/Lib/test/libregrtest/pgo.py
@@ -42,15 +42,15 @@ PGO_TESTS = [
'test_set',
'test_sqlite3',
'test_statistics',
+ 'test_str',
'test_struct',
'test_tabnanny',
'test_time',
- 'test_unicode',
'test_xml_etree',
'test_xml_etree_c',
]
-def setup_pgo_tests(ns):
- if not ns.args and not ns.pgo_extended:
+def setup_pgo_tests(cmdline_args, pgo_extended: bool):
+ if not cmdline_args and not pgo_extended:
# run default set of tests for PGO training
- ns.args = PGO_TESTS[:]
+ cmdline_args[:] = PGO_TESTS[:]
diff --git a/Lib/test/libregrtest/refleak.py b/Lib/test/libregrtest/refleak.py
index 58a1419..59f48bd 100644
--- a/Lib/test/libregrtest/refleak.py
+++ b/Lib/test/libregrtest/refleak.py
@@ -1,10 +1,13 @@
-import os
import sys
import warnings
from inspect import isabstract
+from typing import Any
+
from test import support
from test.support import os_helper
-from test.libregrtest.utils import clear_caches
+
+from .runtests import HuntRefleak
+from .utils import clear_caches
try:
from _abc import _get_dump
@@ -19,7 +22,9 @@ except ImportError:
cls._abc_negative_cache, cls._abc_negative_cache_version)
-def dash_R(ns, test_name, test_func):
+def runtest_refleak(test_name, test_func,
+ hunt_refleak: HuntRefleak,
+ quiet: bool):
"""Run a test multiple times, looking for reference leaks.
Returns:
@@ -41,6 +46,7 @@ def dash_R(ns, test_name, test_func):
fs = warnings.filters[:]
ps = copyreg.dispatch_table.copy()
pic = sys.path_importer_cache.copy()
+ zdc: dict[str, Any] | None
try:
import zipimport
except ImportError:
@@ -62,9 +68,10 @@ def dash_R(ns, test_name, test_func):
def get_pooled_int(value):
return int_pool.setdefault(value, value)
- nwarmup, ntracked, fname = ns.huntrleaks
- fname = os.path.join(os_helper.SAVEDCWD, fname)
- repcount = nwarmup + ntracked
+ warmups = hunt_refleak.warmups
+ runs = hunt_refleak.runs
+ filename = hunt_refleak.filename
+ repcount = warmups + runs
# Pre-allocate to ensure that the loop doesn't allocate anything new
rep_range = list(range(repcount))
@@ -73,12 +80,11 @@ def dash_R(ns, test_name, test_func):
fd_deltas = [0] * repcount
getallocatedblocks = sys.getallocatedblocks
gettotalrefcount = sys.gettotalrefcount
- _getquickenedcount = sys._getquickenedcount
fd_count = os_helper.fd_count
# initialize variables to make pyflakes quiet
rc_before = alloc_before = fd_before = 0
- if not ns.quiet:
+ if not quiet:
print("beginning", repcount, "repetitions", file=sys.stderr)
print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr,
flush=True)
@@ -93,12 +99,12 @@ def dash_R(ns, test_name, test_func):
dash_R_cleanup(fs, ps, pic, zdc, abcs)
support.gc_collect()
- # Read memory statistics immediately after the garbage collection
- alloc_after = getallocatedblocks() - _getquickenedcount()
+ # Read memory statistics immediately after the garbage collection.
+ alloc_after = getallocatedblocks()
rc_after = gettotalrefcount()
fd_after = fd_count()
- if not ns.quiet:
+ if not quiet:
print('.', end='', file=sys.stderr, flush=True)
rc_deltas[i] = get_pooled_int(rc_after - rc_before)
@@ -109,7 +115,7 @@ def dash_R(ns, test_name, test_func):
rc_before = rc_after
fd_before = fd_after
- if not ns.quiet:
+ if not quiet:
print(file=sys.stderr)
# These checkers return False on success, True on failure
@@ -138,12 +144,12 @@ def dash_R(ns, test_name, test_func):
(fd_deltas, 'file descriptors', check_fd_deltas)
]:
# ignore warmup runs
- deltas = deltas[nwarmup:]
+ deltas = deltas[warmups:]
if checker(deltas):
msg = '%s leaked %s %s, sum=%s' % (
test_name, deltas, item_name, sum(deltas))
print(msg, file=sys.stderr, flush=True)
- with open(fname, "a", encoding="utf-8") as refrep:
+ with open(filename, "a", encoding="utf-8") as refrep:
print(msg, file=refrep)
refrep.flush()
failed = True
@@ -169,6 +175,7 @@ def dash_R_cleanup(fs, ps, pic, zdc, abcs):
zipimport._zip_directory_cache.update(zdc)
# Clear ABC registries, restoring previously saved ABC registries.
+ # ignore deprecation warning for collections.abc.ByteString
abs_classes = [getattr(collections.abc, a) for a in collections.abc.__all__]
abs_classes = filter(isabstract, abs_classes)
for abc in abs_classes:
diff --git a/Lib/test/libregrtest/result.py b/Lib/test/libregrtest/result.py
new file mode 100644
index 0000000..d6b0d5a
--- /dev/null
+++ b/Lib/test/libregrtest/result.py
@@ -0,0 +1,190 @@
+import dataclasses
+import json
+from typing import Any
+
+from test.support import TestStats
+
+from .utils import (
+ StrJSON, TestName, FilterTuple,
+ format_duration, normalize_test_name, print_warning)
+
+
+# Avoid enum.Enum to reduce the number of imports when tests are run
+class State:
+ PASSED = "PASSED"
+ FAILED = "FAILED"
+ SKIPPED = "SKIPPED"
+ UNCAUGHT_EXC = "UNCAUGHT_EXC"
+ REFLEAK = "REFLEAK"
+ ENV_CHANGED = "ENV_CHANGED"
+ RESOURCE_DENIED = "RESOURCE_DENIED"
+ INTERRUPTED = "INTERRUPTED"
+ WORKER_FAILED = "WORKER_FAILED" # non-zero worker process exit code
+ WORKER_BUG = "WORKER_BUG" # exception when running a worker
+ DID_NOT_RUN = "DID_NOT_RUN"
+ TIMEOUT = "TIMEOUT"
+
+ @staticmethod
+ def is_failed(state):
+ return state in {
+ State.FAILED,
+ State.UNCAUGHT_EXC,
+ State.REFLEAK,
+ State.WORKER_FAILED,
+ State.WORKER_BUG,
+ State.TIMEOUT}
+
+ @staticmethod
+ def has_meaningful_duration(state):
+ # Consider that the duration is meaningless for these cases.
+ # For example, if a whole test file is skipped, its duration
+ # is unlikely to be the duration of executing its tests,
+ # but just the duration to execute code which skips the test.
+ return state not in {
+ State.SKIPPED,
+ State.RESOURCE_DENIED,
+ State.INTERRUPTED,
+ State.WORKER_FAILED,
+ State.WORKER_BUG,
+ State.DID_NOT_RUN}
+
+ @staticmethod
+ def must_stop(state):
+ return state in {
+ State.INTERRUPTED,
+ State.WORKER_BUG,
+ }
+
+
+@dataclasses.dataclass(slots=True)
+class TestResult:
+ test_name: TestName
+ state: str | None = None
+ # Test duration in seconds
+ duration: float | None = None
+ xml_data: list[str] | None = None
+ stats: TestStats | None = None
+
+ # errors and failures copied from support.TestFailedWithDetails
+ errors: list[tuple[str, str]] | None = None
+ failures: list[tuple[str, str]] | None = None
+
+ def is_failed(self, fail_env_changed: bool) -> bool:
+ if self.state == State.ENV_CHANGED:
+ return fail_env_changed
+ return State.is_failed(self.state)
+
+ def _format_failed(self):
+ if self.errors and self.failures:
+ le = len(self.errors)
+ lf = len(self.failures)
+ error_s = "error" + ("s" if le > 1 else "")
+ failure_s = "failure" + ("s" if lf > 1 else "")
+ return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
+
+ if self.errors:
+ le = len(self.errors)
+ error_s = "error" + ("s" if le > 1 else "")
+ return f"{self.test_name} failed ({le} {error_s})"
+
+ if self.failures:
+ lf = len(self.failures)
+ failure_s = "failure" + ("s" if lf > 1 else "")
+ return f"{self.test_name} failed ({lf} {failure_s})"
+
+ return f"{self.test_name} failed"
+
+ def __str__(self) -> str:
+ match self.state:
+ case State.PASSED:
+ return f"{self.test_name} passed"
+ case State.FAILED:
+ return self._format_failed()
+ case State.SKIPPED:
+ return f"{self.test_name} skipped"
+ case State.UNCAUGHT_EXC:
+ return f"{self.test_name} failed (uncaught exception)"
+ case State.REFLEAK:
+ return f"{self.test_name} failed (reference leak)"
+ case State.ENV_CHANGED:
+ return f"{self.test_name} failed (env changed)"
+ case State.RESOURCE_DENIED:
+ return f"{self.test_name} skipped (resource denied)"
+ case State.INTERRUPTED:
+ return f"{self.test_name} interrupted"
+ case State.WORKER_FAILED:
+ return f"{self.test_name} worker non-zero exit code"
+ case State.WORKER_BUG:
+ return f"{self.test_name} worker bug"
+ case State.DID_NOT_RUN:
+ return f"{self.test_name} ran no tests"
+ case State.TIMEOUT:
+ return f"{self.test_name} timed out ({format_duration(self.duration)})"
+ case _:
+ raise ValueError("unknown result state: {state!r}")
+
+ def has_meaningful_duration(self):
+ return State.has_meaningful_duration(self.state)
+
+ def set_env_changed(self):
+ if self.state is None or self.state == State.PASSED:
+ self.state = State.ENV_CHANGED
+
+ def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
+ if State.must_stop(self.state):
+ return True
+ if fail_fast and self.is_failed(fail_env_changed):
+ return True
+ return False
+
+ def get_rerun_match_tests(self) -> FilterTuple | None:
+ match_tests = []
+
+ errors = self.errors or []
+ failures = self.failures or []
+ for error_list, is_error in (
+ (errors, True),
+ (failures, False),
+ ):
+ for full_name, *_ in error_list:
+ match_name = normalize_test_name(full_name, is_error=is_error)
+ if match_name is None:
+ # 'setUpModule (test.test_sys)': don't filter tests
+ return None
+ if not match_name:
+ error_type = "ERROR" if is_error else "FAIL"
+ print_warning(f"rerun failed to parse {error_type} test name: "
+ f"{full_name!r}: don't filter tests")
+ return None
+ match_tests.append(match_name)
+
+ if not match_tests:
+ return None
+ return tuple(match_tests)
+
+ def write_json_into(self, file) -> None:
+ json.dump(self, file, cls=_EncodeTestResult)
+
+ @staticmethod
+ def from_json(worker_json: StrJSON) -> 'TestResult':
+ return json.loads(worker_json, object_hook=_decode_test_result)
+
+
+class _EncodeTestResult(json.JSONEncoder):
+ def default(self, o: Any) -> dict[str, Any]:
+ if isinstance(o, TestResult):
+ result = dataclasses.asdict(o)
+ result["__test_result__"] = o.__class__.__name__
+ return result
+ else:
+ return super().default(o)
+
+
+def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]:
+ if "__test_result__" in data:
+ data.pop('__test_result__')
+ if data['stats'] is not None:
+ data['stats'] = TestStats(**data['stats'])
+ return TestResult(**data)
+ else:
+ return data
diff --git a/Lib/test/libregrtest/results.py b/Lib/test/libregrtest/results.py
new file mode 100644
index 0000000..3708078
--- /dev/null
+++ b/Lib/test/libregrtest/results.py
@@ -0,0 +1,261 @@
+import sys
+from test.support import TestStats
+
+from .runtests import RunTests
+from .result import State, TestResult
+from .utils import (
+ StrPath, TestName, TestTuple, TestList, FilterDict,
+ printlist, count, format_duration)
+
+
+# Python uses exit code 1 when an exception is not catched
+# argparse.ArgumentParser.error() uses exit code 2
+EXITCODE_BAD_TEST = 2
+EXITCODE_ENV_CHANGED = 3
+EXITCODE_NO_TESTS_RAN = 4
+EXITCODE_RERUN_FAIL = 5
+EXITCODE_INTERRUPTED = 130 # 128 + signal.SIGINT=2
+
+
+class TestResults:
+ def __init__(self):
+ self.bad: TestList = []
+ self.good: TestList = []
+ self.rerun_bad: TestList = []
+ self.skipped: TestList = []
+ self.resource_denied: TestList = []
+ self.env_changed: TestList = []
+ self.run_no_tests: TestList = []
+ self.rerun: TestList = []
+ self.rerun_results: list[TestResult] = []
+
+ self.interrupted: bool = False
+ self.worker_bug: bool = False
+ self.test_times: list[tuple[float, TestName]] = []
+ self.stats = TestStats()
+ # used by --junit-xml
+ self.testsuite_xml: list[str] = []
+
+ def is_all_good(self):
+ return (not self.bad
+ and not self.skipped
+ and not self.interrupted
+ and not self.worker_bug)
+
+ def get_executed(self):
+ return (set(self.good) | set(self.bad) | set(self.skipped)
+ | set(self.resource_denied) | set(self.env_changed)
+ | set(self.run_no_tests))
+
+ def no_tests_run(self):
+ return not any((self.good, self.bad, self.skipped, self.interrupted,
+ self.env_changed))
+
+ def get_state(self, fail_env_changed):
+ state = []
+ if self.bad:
+ state.append("FAILURE")
+ elif fail_env_changed and self.env_changed:
+ state.append("ENV CHANGED")
+ elif self.no_tests_run():
+ state.append("NO TESTS RAN")
+
+ if self.interrupted:
+ state.append("INTERRUPTED")
+ if self.worker_bug:
+ state.append("WORKER BUG")
+ if not state:
+ state.append("SUCCESS")
+
+ return ', '.join(state)
+
+ def get_exitcode(self, fail_env_changed, fail_rerun):
+ exitcode = 0
+ if self.bad:
+ exitcode = EXITCODE_BAD_TEST
+ elif self.interrupted:
+ exitcode = EXITCODE_INTERRUPTED
+ elif fail_env_changed and self.env_changed:
+ exitcode = EXITCODE_ENV_CHANGED
+ elif self.no_tests_run():
+ exitcode = EXITCODE_NO_TESTS_RAN
+ elif fail_rerun and self.rerun:
+ exitcode = EXITCODE_RERUN_FAIL
+ elif self.worker_bug:
+ exitcode = EXITCODE_BAD_TEST
+ return exitcode
+
+ def accumulate_result(self, result: TestResult, runtests: RunTests):
+ test_name = result.test_name
+ rerun = runtests.rerun
+ fail_env_changed = runtests.fail_env_changed
+
+ match result.state:
+ case State.PASSED:
+ self.good.append(test_name)
+ case State.ENV_CHANGED:
+ self.env_changed.append(test_name)
+ self.rerun_results.append(result)
+ case State.SKIPPED:
+ self.skipped.append(test_name)
+ case State.RESOURCE_DENIED:
+ self.resource_denied.append(test_name)
+ case State.INTERRUPTED:
+ self.interrupted = True
+ case State.DID_NOT_RUN:
+ self.run_no_tests.append(test_name)
+ case _:
+ if result.is_failed(fail_env_changed):
+ self.bad.append(test_name)
+ self.rerun_results.append(result)
+ else:
+ raise ValueError(f"invalid test state: {result.state!r}")
+
+ if result.state == State.WORKER_BUG:
+ self.worker_bug = True
+
+ if result.has_meaningful_duration() and not rerun:
+ self.test_times.append((result.duration, test_name))
+ if result.stats is not None:
+ self.stats.accumulate(result.stats)
+ if rerun:
+ self.rerun.append(test_name)
+
+ xml_data = result.xml_data
+ if xml_data:
+ self.add_junit(xml_data)
+
+ def need_rerun(self):
+ return bool(self.rerun_results)
+
+ def prepare_rerun(self) -> tuple[TestTuple, FilterDict]:
+ tests: TestList = []
+ match_tests_dict = {}
+ for result in self.rerun_results:
+ tests.append(result.test_name)
+
+ match_tests = result.get_rerun_match_tests()
+ # ignore empty match list
+ if match_tests:
+ match_tests_dict[result.test_name] = match_tests
+
+ # Clear previously failed tests
+ self.rerun_bad.extend(self.bad)
+ self.bad.clear()
+ self.env_changed.clear()
+ self.rerun_results.clear()
+
+ return (tuple(tests), match_tests_dict)
+
+ def add_junit(self, xml_data: list[str]):
+ import xml.etree.ElementTree as ET
+ for e in xml_data:
+ try:
+ self.testsuite_xml.append(ET.fromstring(e))
+ except ET.ParseError:
+ print(xml_data, file=sys.__stderr__)
+ raise
+
+ def write_junit(self, filename: StrPath):
+ if not self.testsuite_xml:
+ # Don't create empty XML file
+ return
+
+ import xml.etree.ElementTree as ET
+ root = ET.Element("testsuites")
+
+ # Manually count the totals for the overall summary
+ totals = {'tests': 0, 'errors': 0, 'failures': 0}
+ for suite in self.testsuite_xml:
+ root.append(suite)
+ for k in totals:
+ try:
+ totals[k] += int(suite.get(k, 0))
+ except ValueError:
+ pass
+
+ for k, v in totals.items():
+ root.set(k, str(v))
+
+ with open(filename, 'wb') as f:
+ for s in ET.tostringlist(root):
+ f.write(s)
+
+ def display_result(self, tests: TestTuple, quiet: bool, print_slowest: bool):
+ if print_slowest:
+ self.test_times.sort(reverse=True)
+ print()
+ print("10 slowest tests:")
+ for test_time, test in self.test_times[:10]:
+ print("- %s: %s" % (test, format_duration(test_time)))
+
+ all_tests = []
+ omitted = set(tests) - self.get_executed()
+
+ # less important
+ all_tests.append((omitted, "test", "{} omitted:"))
+ if not quiet:
+ all_tests.append((self.skipped, "test", "{} skipped:"))
+ all_tests.append((self.resource_denied, "test", "{} skipped (resource denied):"))
+ all_tests.append((self.run_no_tests, "test", "{} run no tests:"))
+
+ # more important
+ all_tests.append((self.env_changed, "test", "{} altered the execution environment (env changed):"))
+ all_tests.append((self.rerun, "re-run test", "{}:"))
+ all_tests.append((self.bad, "test", "{} failed:"))
+
+ for tests_list, count_text, title_format in all_tests:
+ if tests_list:
+ print()
+ count_text = count(len(tests_list), count_text)
+ print(title_format.format(count_text))
+ printlist(tests_list)
+
+ if self.good and not quiet:
+ print()
+ text = count(len(self.good), "test")
+ text = f"{text} OK."
+ if (self.is_all_good() and len(self.good) > 1):
+ text = f"All {text}"
+ print(text)
+
+ if self.interrupted:
+ print()
+ print("Test suite interrupted by signal SIGINT.")
+
+ def display_summary(self, first_runtests: RunTests, filtered: bool):
+ # Total tests
+ stats = self.stats
+ text = f'run={stats.tests_run:,}'
+ if filtered:
+ text = f"{text} (filtered)"
+ report = [text]
+ if stats.failures:
+ report.append(f'failures={stats.failures:,}')
+ if stats.skipped:
+ report.append(f'skipped={stats.skipped:,}')
+ print(f"Total tests: {' '.join(report)}")
+
+ # Total test files
+ all_tests = [self.good, self.bad, self.rerun,
+ self.skipped,
+ self.env_changed, self.run_no_tests]
+ run = sum(map(len, all_tests))
+ text = f'run={run}'
+ if not first_runtests.forever:
+ ntest = len(first_runtests.tests)
+ text = f"{text}/{ntest}"
+ if filtered:
+ text = f"{text} (filtered)"
+ report = [text]
+ for name, tests in (
+ ('failed', self.bad),
+ ('env_changed', self.env_changed),
+ ('skipped', self.skipped),
+ ('resource_denied', self.resource_denied),
+ ('rerun', self.rerun),
+ ('run_no_tests', self.run_no_tests),
+ ):
+ if tests:
+ report.append(f'{name}={len(tests)}')
+ print(f"Total test files: {' '.join(report)}")
diff --git a/Lib/test/libregrtest/run_workers.py b/Lib/test/libregrtest/run_workers.py
new file mode 100644
index 0000000..16f8331
--- /dev/null
+++ b/Lib/test/libregrtest/run_workers.py
@@ -0,0 +1,607 @@
+import contextlib
+import dataclasses
+import faulthandler
+import os.path
+import queue
+import signal
+import subprocess
+import sys
+import tempfile
+import threading
+import time
+import traceback
+from typing import Literal, TextIO
+
+from test import support
+from test.support import os_helper, MS_WINDOWS
+
+from .logger import Logger
+from .result import TestResult, State
+from .results import TestResults
+from .runtests import RunTests, JsonFile, JsonFileType
+from .single import PROGRESS_MIN_TIME
+from .utils import (
+ StrPath, TestName,
+ format_duration, print_warning, count, plural, get_signal_name)
+from .worker import create_worker_process, USE_PROCESS_GROUP
+
+if MS_WINDOWS:
+ import locale
+ import msvcrt
+
+
+
+# Display the running tests if nothing happened last N seconds
+PROGRESS_UPDATE = 30.0 # seconds
+assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
+
+# Kill the main process after 5 minutes. It is supposed to write an update
+# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
+# buildbot workers.
+MAIN_PROCESS_TIMEOUT = 5 * 60.0
+assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
+
+# Time to wait until a worker completes: should be immediate
+WAIT_COMPLETED_TIMEOUT = 30.0 # seconds
+
+# Time to wait a killed process (in seconds)
+WAIT_KILLED_TIMEOUT = 60.0
+
+
+# We do not use a generator so multiple threads can call next().
+class MultiprocessIterator:
+
+ """A thread-safe iterator over tests for multiprocess mode."""
+
+ def __init__(self, tests_iter):
+ self.lock = threading.Lock()
+ self.tests_iter = tests_iter
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ with self.lock:
+ if self.tests_iter is None:
+ raise StopIteration
+ return next(self.tests_iter)
+
+ def stop(self):
+ with self.lock:
+ self.tests_iter = None
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class MultiprocessResult:
+ result: TestResult
+ # bpo-45410: stderr is written into stdout to keep messages order
+ worker_stdout: str | None = None
+ err_msg: str | None = None
+
+
+ExcStr = str
+QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
+
+
+class ExitThread(Exception):
+ pass
+
+
+class WorkerError(Exception):
+ def __init__(self,
+ test_name: TestName,
+ err_msg: str | None,
+ stdout: str | None,
+ state: str):
+ result = TestResult(test_name, state=state)
+ self.mp_result = MultiprocessResult(result, stdout, err_msg)
+ super().__init__()
+
+
+class WorkerThread(threading.Thread):
+ def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
+ super().__init__()
+ self.worker_id = worker_id
+ self.runtests = runner.runtests
+ self.pending = runner.pending
+ self.output = runner.output
+ self.timeout = runner.worker_timeout
+ self.log = runner.log
+ self.test_name: TestName | None = None
+ self.start_time: float | None = None
+ self._popen: subprocess.Popen[str] | None = None
+ self._killed = False
+ self._stopped = False
+
+ def __repr__(self) -> str:
+ info = [f'WorkerThread #{self.worker_id}']
+ if self.is_alive():
+ info.append("running")
+ else:
+ info.append('stopped')
+ test = self.test_name
+ if test:
+ info.append(f'test={test}')
+ popen = self._popen
+ if popen is not None:
+ dt = time.monotonic() - self.start_time
+ info.extend((f'pid={self._popen.pid}',
+ f'time={format_duration(dt)}'))
+ return '<%s>' % ' '.join(info)
+
+ def _kill(self) -> None:
+ popen = self._popen
+ if popen is None:
+ return
+
+ if self._killed:
+ return
+ self._killed = True
+
+ if USE_PROCESS_GROUP:
+ what = f"{self} process group"
+ else:
+ what = f"{self} process"
+
+ print(f"Kill {what}", file=sys.stderr, flush=True)
+ try:
+ if USE_PROCESS_GROUP:
+ os.killpg(popen.pid, signal.SIGKILL)
+ else:
+ popen.kill()
+ except ProcessLookupError:
+ # popen.kill(): the process completed, the WorkerThread thread
+ # read its exit status, but Popen.send_signal() read the returncode
+ # just before Popen.wait() set returncode.
+ pass
+ except OSError as exc:
+ print_warning(f"Failed to kill {what}: {exc!r}")
+
+ def stop(self) -> None:
+ # Method called from a different thread to stop this thread
+ self._stopped = True
+ self._kill()
+
+ def _run_process(self, runtests: RunTests, output_fd: int,
+ tmp_dir: StrPath | None = None) -> int | None:
+ popen = create_worker_process(runtests, output_fd, tmp_dir)
+ self._popen = popen
+ self._killed = False
+
+ try:
+ if self._stopped:
+ # If kill() has been called before self._popen is set,
+ # self._popen is still running. Call again kill()
+ # to ensure that the process is killed.
+ self._kill()
+ raise ExitThread
+
+ try:
+ # gh-94026: stdout+stderr are written to tempfile
+ retcode = popen.wait(timeout=self.timeout)
+ assert retcode is not None
+ return retcode
+ except subprocess.TimeoutExpired:
+ if self._stopped:
+ # kill() has been called: communicate() fails on reading
+ # closed stdout
+ raise ExitThread
+
+ # On timeout, kill the process
+ self._kill()
+
+ # None means TIMEOUT for the caller
+ retcode = None
+ # bpo-38207: Don't attempt to call communicate() again: on it
+ # can hang until all child processes using stdout
+ # pipes completes.
+ except OSError:
+ if self._stopped:
+ # kill() has been called: communicate() fails
+ # on reading closed stdout
+ raise ExitThread
+ raise
+ except:
+ self._kill()
+ raise
+ finally:
+ self._wait_completed()
+ self._popen = None
+
+ def create_stdout(self, stack: contextlib.ExitStack) -> TextIO:
+ """Create stdout temporay file (file descriptor)."""
+
+ if MS_WINDOWS:
+ # gh-95027: When stdout is not a TTY, Python uses the ANSI code
+ # page for the sys.stdout encoding. If the main process runs in a
+ # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding.
+ encoding = locale.getencoding()
+ else:
+ encoding = sys.stdout.encoding
+
+ # gh-94026: Write stdout+stderr to a tempfile as workaround for
+ # non-blocking pipes on Emscripten with NodeJS.
+ # gh-109425: Use "backslashreplace" error handler: log corrupted
+ # stdout+stderr, instead of failing with a UnicodeDecodeError and not
+ # logging stdout+stderr at all.
+ stdout_file = tempfile.TemporaryFile('w+',
+ encoding=encoding,
+ errors='backslashreplace')
+ stack.enter_context(stdout_file)
+ return stdout_file
+
+ def create_json_file(self, stack: contextlib.ExitStack) -> tuple[JsonFile, TextIO | None]:
+ """Create JSON file."""
+
+ json_file_use_stdout = self.runtests.json_file_use_stdout()
+ if json_file_use_stdout:
+ json_file = JsonFile(None, JsonFileType.STDOUT)
+ json_tmpfile = None
+ else:
+ json_tmpfile = tempfile.TemporaryFile('w+', encoding='utf8')
+ stack.enter_context(json_tmpfile)
+
+ json_fd = json_tmpfile.fileno()
+ if MS_WINDOWS:
+ json_handle = msvcrt.get_osfhandle(json_fd)
+ json_file = JsonFile(json_handle,
+ JsonFileType.WINDOWS_HANDLE)
+ else:
+ json_file = JsonFile(json_fd, JsonFileType.UNIX_FD)
+ return (json_file, json_tmpfile)
+
+ def create_worker_runtests(self, test_name: TestName, json_file: JsonFile) -> RunTests:
+ """Create the worker RunTests."""
+
+ tests = (test_name,)
+ if self.runtests.rerun:
+ match_tests = self.runtests.get_match_tests(test_name)
+ else:
+ match_tests = None
+
+ kwargs = {}
+ if match_tests:
+ kwargs['match_tests'] = match_tests
+ if self.runtests.output_on_failure:
+ kwargs['verbose'] = True
+ kwargs['output_on_failure'] = False
+ return self.runtests.copy(
+ tests=tests,
+ json_file=json_file,
+ **kwargs)
+
+ def run_tmp_files(self, worker_runtests: RunTests,
+ stdout_fd: int) -> tuple[int | None, list[StrPath]]:
+ # gh-93353: Check for leaked temporary files in the parent process,
+ # since the deletion of temporary files can happen late during
+ # Python finalization: too late for libregrtest.
+ if not support.is_wasi:
+ # Don't check for leaked temporary files and directories if Python is
+ # run on WASI. WASI don't pass environment variables like TMPDIR to
+ # worker processes.
+ tmp_dir = tempfile.mkdtemp(prefix="test_python_")
+ tmp_dir = os.path.abspath(tmp_dir)
+ try:
+ retcode = self._run_process(worker_runtests,
+ stdout_fd, tmp_dir)
+ finally:
+ tmp_files = os.listdir(tmp_dir)
+ os_helper.rmtree(tmp_dir)
+ else:
+ retcode = self._run_process(worker_runtests, stdout_fd)
+ tmp_files = []
+
+ return (retcode, tmp_files)
+
+ def read_stdout(self, stdout_file: TextIO) -> str:
+ stdout_file.seek(0)
+ try:
+ return stdout_file.read().strip()
+ except Exception as exc:
+ # gh-101634: Catch UnicodeDecodeError if stdout cannot be
+ # decoded from encoding
+ raise WorkerError(self.test_name,
+ f"Cannot read process stdout: {exc}",
+ stdout=None,
+ state=State.WORKER_BUG)
+
+ def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
+ stdout: str) -> tuple[TestResult, str]:
+ try:
+ if json_tmpfile is not None:
+ json_tmpfile.seek(0)
+ worker_json = json_tmpfile.read()
+ elif json_file.file_type == JsonFileType.STDOUT:
+ stdout, _, worker_json = stdout.rpartition("\n")
+ stdout = stdout.rstrip()
+ else:
+ with json_file.open(encoding='utf8') as json_fp:
+ worker_json = json_fp.read()
+ except Exception as exc:
+ # gh-101634: Catch UnicodeDecodeError if stdout cannot be
+ # decoded from encoding
+ err_msg = f"Failed to read worker process JSON: {exc}"
+ raise WorkerError(self.test_name, err_msg, stdout,
+ state=State.WORKER_BUG)
+
+ if not worker_json:
+ raise WorkerError(self.test_name, "empty JSON", stdout,
+ state=State.WORKER_BUG)
+
+ try:
+ result = TestResult.from_json(worker_json)
+ except Exception as exc:
+ # gh-101634: Catch UnicodeDecodeError if stdout cannot be
+ # decoded from encoding
+ err_msg = f"Failed to parse worker process JSON: {exc}"
+ raise WorkerError(self.test_name, err_msg, stdout,
+ state=State.WORKER_BUG)
+
+ return (result, stdout)
+
+ def _runtest(self, test_name: TestName) -> MultiprocessResult:
+ with contextlib.ExitStack() as stack:
+ stdout_file = self.create_stdout(stack)
+ json_file, json_tmpfile = self.create_json_file(stack)
+ worker_runtests = self.create_worker_runtests(test_name, json_file)
+
+ retcode, tmp_files = self.run_tmp_files(worker_runtests,
+ stdout_file.fileno())
+
+ stdout = self.read_stdout(stdout_file)
+
+ if retcode is None:
+ raise WorkerError(self.test_name, stdout=stdout,
+ err_msg=None,
+ state=State.TIMEOUT)
+ if retcode != 0:
+ name = get_signal_name(retcode)
+ if name:
+ retcode = f"{retcode} ({name})"
+ raise WorkerError(self.test_name, f"Exit code {retcode}", stdout,
+ state=State.WORKER_FAILED)
+
+ result, stdout = self.read_json(json_file, json_tmpfile, stdout)
+
+ if tmp_files:
+ msg = (f'\n\n'
+ f'Warning -- {test_name} leaked temporary files '
+ f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}')
+ stdout += msg
+ result.set_env_changed()
+
+ return MultiprocessResult(result, stdout)
+
+ def run(self) -> None:
+ fail_fast = self.runtests.fail_fast
+ fail_env_changed = self.runtests.fail_env_changed
+ while not self._stopped:
+ try:
+ try:
+ test_name = next(self.pending)
+ except StopIteration:
+ break
+
+ self.start_time = time.monotonic()
+ self.test_name = test_name
+ try:
+ mp_result = self._runtest(test_name)
+ except WorkerError as exc:
+ mp_result = exc.mp_result
+ finally:
+ self.test_name = None
+ mp_result.result.duration = time.monotonic() - self.start_time
+ self.output.put((False, mp_result))
+
+ if mp_result.result.must_stop(fail_fast, fail_env_changed):
+ break
+ except ExitThread:
+ break
+ except BaseException:
+ self.output.put((True, traceback.format_exc()))
+ break
+
+ def _wait_completed(self) -> None:
+ popen = self._popen
+
+ try:
+ popen.wait(WAIT_COMPLETED_TIMEOUT)
+ except (subprocess.TimeoutExpired, OSError) as exc:
+ print_warning(f"Failed to wait for {self} completion "
+ f"(timeout={format_duration(WAIT_COMPLETED_TIMEOUT)}): "
+ f"{exc!r}")
+
+ def wait_stopped(self, start_time: float) -> None:
+ # bpo-38207: RunWorkers.stop_workers() called self.stop()
+ # which killed the process. Sometimes, killing the process from the
+ # main thread does not interrupt popen.communicate() in
+ # WorkerThread thread. This loop with a timeout is a workaround
+ # for that.
+ #
+ # Moreover, if this method fails to join the thread, it is likely
+ # that Python will hang at exit while calling threading._shutdown()
+ # which tries again to join the blocked thread. Regrtest.main()
+ # uses EXIT_TIMEOUT to workaround this second bug.
+ while True:
+ # Write a message every second
+ self.join(1.0)
+ if not self.is_alive():
+ break
+ dt = time.monotonic() - start_time
+ self.log(f"Waiting for {self} thread for {format_duration(dt)}")
+ if dt > WAIT_KILLED_TIMEOUT:
+ print_warning(f"Failed to join {self} in {format_duration(dt)}")
+ break
+
+
+def get_running(workers: list[WorkerThread]) -> str | None:
+ running: list[str] = []
+ for worker in workers:
+ test_name = worker.test_name
+ if not test_name:
+ continue
+ dt = time.monotonic() - worker.start_time
+ if dt >= PROGRESS_MIN_TIME:
+ text = f'{test_name} ({format_duration(dt)})'
+ running.append(text)
+ if not running:
+ return None
+ return f"running ({len(running)}): {', '.join(running)}"
+
+
+class RunWorkers:
+ def __init__(self, num_workers: int, runtests: RunTests,
+ logger: Logger, results: TestResults) -> None:
+ self.num_workers = num_workers
+ self.runtests = runtests
+ self.log = logger.log
+ self.display_progress = logger.display_progress
+ self.results: TestResults = results
+
+ self.output: queue.Queue[QueueOutput] = queue.Queue()
+ tests_iter = runtests.iter_tests()
+ self.pending = MultiprocessIterator(tests_iter)
+ self.timeout = runtests.timeout
+ if self.timeout is not None:
+ # Rely on faulthandler to kill a worker process. This timouet is
+ # when faulthandler fails to kill a worker process. Give a maximum
+ # of 5 minutes to faulthandler to kill the worker.
+ self.worker_timeout: float | None = min(self.timeout * 1.5, self.timeout + 5 * 60)
+ else:
+ self.worker_timeout = None
+ self.workers: list[WorkerThread] | None = None
+
+ jobs = self.runtests.get_jobs()
+ if jobs is not None:
+ # Don't spawn more threads than the number of jobs:
+ # these worker threads would never get anything to do.
+ self.num_workers = min(self.num_workers, jobs)
+
+ def start_workers(self) -> None:
+ self.workers = [WorkerThread(index, self)
+ for index in range(1, self.num_workers + 1)]
+ jobs = self.runtests.get_jobs()
+ if jobs is not None:
+ tests = count(jobs, 'test')
+ else:
+ tests = 'tests'
+ nworkers = len(self.workers)
+ processes = plural(nworkers, "process", "processes")
+ msg = (f"Run {tests} in parallel using "
+ f"{nworkers} worker {processes}")
+ if self.timeout:
+ msg += (" (timeout: %s, worker timeout: %s)"
+ % (format_duration(self.timeout),
+ format_duration(self.worker_timeout)))
+ self.log(msg)
+ for worker in self.workers:
+ worker.start()
+
+ def stop_workers(self) -> None:
+ start_time = time.monotonic()
+ for worker in self.workers:
+ worker.stop()
+ for worker in self.workers:
+ worker.wait_stopped(start_time)
+
+ def _get_result(self) -> QueueOutput | None:
+ pgo = self.runtests.pgo
+ use_faulthandler = (self.timeout is not None)
+
+ # bpo-46205: check the status of workers every iteration to avoid
+ # waiting forever on an empty queue.
+ while any(worker.is_alive() for worker in self.workers):
+ if use_faulthandler:
+ faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
+ exit=True)
+
+ # wait for a thread
+ try:
+ return self.output.get(timeout=PROGRESS_UPDATE)
+ except queue.Empty:
+ pass
+
+ if not pgo:
+ # display progress
+ running = get_running(self.workers)
+ if running:
+ self.log(running)
+
+ # all worker threads are done: consume pending results
+ try:
+ return self.output.get(timeout=0)
+ except queue.Empty:
+ return None
+
+ def display_result(self, mp_result: MultiprocessResult) -> None:
+ result = mp_result.result
+ pgo = self.runtests.pgo
+
+ text = str(result)
+ if mp_result.err_msg:
+ # WORKER_BUG
+ text += ' (%s)' % mp_result.err_msg
+ elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
+ text += ' (%s)' % format_duration(result.duration)
+ if not pgo:
+ running = get_running(self.workers)
+ if running:
+ text += f' -- {running}'
+ self.display_progress(self.test_index, text)
+
+ def _process_result(self, item: QueueOutput) -> TestResult:
+ """Returns True if test runner must stop."""
+ if item[0]:
+ # Thread got an exception
+ format_exc = item[1]
+ print_warning(f"regrtest worker thread failed: {format_exc}")
+ result = TestResult("<regrtest worker>", state=State.WORKER_BUG)
+ self.results.accumulate_result(result, self.runtests)
+ return result
+
+ self.test_index += 1
+ mp_result = item[1]
+ result = mp_result.result
+ self.results.accumulate_result(result, self.runtests)
+ self.display_result(mp_result)
+
+ # Display worker stdout
+ if not self.runtests.output_on_failure:
+ show_stdout = True
+ else:
+ # --verbose3 ignores stdout on success
+ show_stdout = (result.state != State.PASSED)
+ if show_stdout:
+ stdout = mp_result.worker_stdout
+ if stdout:
+ print(stdout, flush=True)
+
+ return result
+
+ def run(self) -> None:
+ fail_fast = self.runtests.fail_fast
+ fail_env_changed = self.runtests.fail_env_changed
+
+ self.start_workers()
+
+ self.test_index = 0
+ try:
+ while True:
+ item = self._get_result()
+ if item is None:
+ break
+
+ result = self._process_result(item)
+ if result.must_stop(fail_fast, fail_env_changed):
+ break
+ except KeyboardInterrupt:
+ print()
+ self.results.interrupted = True
+ finally:
+ if self.timeout is not None:
+ faulthandler.cancel_dump_traceback_later()
+
+ # Always ensure that all worker processes are no longer
+ # worker when we exit this function
+ self.pending.stop()
+ self.stop_workers()
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
deleted file mode 100644
index f37093b..0000000
--- a/Lib/test/libregrtest/runtest.py
+++ /dev/null
@@ -1,479 +0,0 @@
-import dataclasses
-import doctest
-import faulthandler
-import functools
-import gc
-import importlib
-import io
-import os
-import sys
-import time
-import traceback
-import unittest
-
-from test import support
-from test.support import TestStats
-from test.support import os_helper
-from test.support import threading_helper
-from test.libregrtest.cmdline import Namespace
-from test.libregrtest.save_env import saved_test_environment
-from test.libregrtest.utils import clear_caches, format_duration, print_warning
-
-
-# Avoid enum.Enum to reduce the number of imports when tests are run
-class State:
- PASSED = "PASSED"
- FAILED = "FAILED"
- SKIPPED = "SKIPPED"
- UNCAUGHT_EXC = "UNCAUGHT_EXC"
- REFLEAK = "REFLEAK"
- ENV_CHANGED = "ENV_CHANGED"
- RESOURCE_DENIED = "RESOURCE_DENIED"
- INTERRUPTED = "INTERRUPTED"
- MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
- DID_NOT_RUN = "DID_NOT_RUN"
- TIMEOUT = "TIMEOUT"
-
- @staticmethod
- def is_failed(state):
- return state in {
- State.FAILED,
- State.UNCAUGHT_EXC,
- State.REFLEAK,
- State.MULTIPROCESSING_ERROR,
- State.TIMEOUT}
-
- @staticmethod
- def has_meaningful_duration(state):
- # Consider that the duration is meaningless for these cases.
- # For example, if a whole test file is skipped, its duration
- # is unlikely to be the duration of executing its tests,
- # but just the duration to execute code which skips the test.
- return state not in {
- State.SKIPPED,
- State.RESOURCE_DENIED,
- State.INTERRUPTED,
- State.MULTIPROCESSING_ERROR,
- State.DID_NOT_RUN}
-
-
-@dataclasses.dataclass(slots=True)
-class TestResult:
- test_name: str
- state: str | None = None
- # Test duration in seconds
- duration: float | None = None
- xml_data: list[str] | None = None
- stats: TestStats | None = None
-
- # errors and failures copied from support.TestFailedWithDetails
- errors: list[tuple[str, str]] | None = None
- failures: list[tuple[str, str]] | None = None
-
- def is_failed(self, fail_env_changed: bool) -> bool:
- if self.state == State.ENV_CHANGED:
- return fail_env_changed
- return State.is_failed(self.state)
-
- def _format_failed(self):
- if self.errors and self.failures:
- le = len(self.errors)
- lf = len(self.failures)
- error_s = "error" + ("s" if le > 1 else "")
- failure_s = "failure" + ("s" if lf > 1 else "")
- return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
-
- if self.errors:
- le = len(self.errors)
- error_s = "error" + ("s" if le > 1 else "")
- return f"{self.test_name} failed ({le} {error_s})"
-
- if self.failures:
- lf = len(self.failures)
- failure_s = "failure" + ("s" if lf > 1 else "")
- return f"{self.test_name} failed ({lf} {failure_s})"
-
- return f"{self.test_name} failed"
-
- def __str__(self) -> str:
- match self.state:
- case State.PASSED:
- return f"{self.test_name} passed"
- case State.FAILED:
- return self._format_failed()
- case State.SKIPPED:
- return f"{self.test_name} skipped"
- case State.UNCAUGHT_EXC:
- return f"{self.test_name} failed (uncaught exception)"
- case State.REFLEAK:
- return f"{self.test_name} failed (reference leak)"
- case State.ENV_CHANGED:
- return f"{self.test_name} failed (env changed)"
- case State.RESOURCE_DENIED:
- return f"{self.test_name} skipped (resource denied)"
- case State.INTERRUPTED:
- return f"{self.test_name} interrupted"
- case State.MULTIPROCESSING_ERROR:
- return f"{self.test_name} process crashed"
- case State.DID_NOT_RUN:
- return f"{self.test_name} ran no tests"
- case State.TIMEOUT:
- return f"{self.test_name} timed out ({format_duration(self.duration)})"
- case _:
- raise ValueError("unknown result state: {state!r}")
-
- def has_meaningful_duration(self):
- return State.has_meaningful_duration(self.state)
-
- def set_env_changed(self):
- if self.state is None or self.state == State.PASSED:
- self.state = State.ENV_CHANGED
-
-
-# Minimum duration of a test to display its duration or to mention that
-# the test is running in background
-PROGRESS_MIN_TIME = 30.0 # seconds
-
-#If these test directories are encountered recurse into them and treat each
-# test_ .py or dir as a separate test module. This can increase parallelism.
-# Beware this can't generally be done for any directory with sub-tests as the
-# __init__.py may do things which alter what tests are to be run.
-
-SPLITTESTDIRS = {
- "test_asyncio",
- "test_concurrent_futures",
- "test_future_stmt",
- "test_gdb",
- "test_multiprocessing_fork",
- "test_multiprocessing_forkserver",
- "test_multiprocessing_spawn",
-}
-
-# Storage of uncollectable objects
-FOUND_GARBAGE = []
-
-
-def findtestdir(path=None):
- return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
-
-
-def findtests(*, testdir=None, exclude=(),
- split_test_dirs=SPLITTESTDIRS, base_mod=""):
- """Return a list of all applicable test modules."""
- testdir = findtestdir(testdir)
- tests = []
- for name in os.listdir(testdir):
- mod, ext = os.path.splitext(name)
- if (not mod.startswith("test_")) or (mod in exclude):
- continue
- if mod in split_test_dirs:
- subdir = os.path.join(testdir, mod)
- mod = f"{base_mod or 'test'}.{mod}"
- tests.extend(findtests(testdir=subdir, exclude=exclude,
- split_test_dirs=split_test_dirs, base_mod=mod))
- elif ext in (".py", ""):
- tests.append(f"{base_mod}.{mod}" if base_mod else mod)
- return sorted(tests)
-
-
-def split_test_packages(tests, *, testdir=None, exclude=(),
- split_test_dirs=SPLITTESTDIRS):
- testdir = findtestdir(testdir)
- splitted = []
- for name in tests:
- if name in split_test_dirs:
- subdir = os.path.join(testdir, name)
- splitted.extend(findtests(testdir=subdir, exclude=exclude,
- split_test_dirs=split_test_dirs,
- base_mod=name))
- else:
- splitted.append(name)
- return splitted
-
-
-def get_abs_module(ns: Namespace, test_name: str) -> str:
- if test_name.startswith('test.') or ns.testdir:
- return test_name
- else:
- # Import it from the test package
- return 'test.' + test_name
-
-
-def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) -> None:
- # Capture stdout and stderr, set faulthandler timeout,
- # and create JUnit XML report.
-
- output_on_failure = ns.verbose3
-
- use_timeout = (
- ns.timeout is not None and threading_helper.can_start_thread
- )
- if use_timeout:
- faulthandler.dump_traceback_later(ns.timeout, exit=True)
-
- try:
- support.set_match_tests(ns.match_tests, ns.ignore_tests)
- support.junit_xml_list = xml_list = [] if ns.xmlpath else None
- if ns.failfast:
- support.failfast = True
-
- if output_on_failure:
- support.verbose = True
-
- stream = io.StringIO()
- orig_stdout = sys.stdout
- orig_stderr = sys.stderr
- print_warning = support.print_warning
- orig_print_warnings_stderr = print_warning.orig_stderr
-
- output = None
- try:
- sys.stdout = stream
- sys.stderr = stream
- # print_warning() writes into the temporary stream to preserve
- # messages order. If support.environment_altered becomes true,
- # warnings will be written to sys.stderr below.
- print_warning.orig_stderr = stream
-
- _runtest_env_changed_exc(result, ns, display_failure=False)
- # Ignore output if the test passed successfully
- if result.state != State.PASSED:
- output = stream.getvalue()
- finally:
- sys.stdout = orig_stdout
- sys.stderr = orig_stderr
- print_warning.orig_stderr = orig_print_warnings_stderr
-
- if output is not None:
- sys.stderr.write(output)
- sys.stderr.flush()
- else:
- # Tell tests to be moderately quiet
- support.verbose = ns.verbose
-
- _runtest_env_changed_exc(result, ns,
- display_failure=not ns.verbose)
-
- if xml_list:
- import xml.etree.ElementTree as ET
- result.xml_data = [ET.tostring(x).decode('us-ascii')
- for x in xml_list]
- finally:
- if use_timeout:
- faulthandler.cancel_dump_traceback_later()
- support.junit_xml_list = None
-
-
-def runtest(ns: Namespace, test_name: str) -> TestResult:
- """Run a single test.
-
- ns -- regrtest namespace of options
- test_name -- the name of the test
-
- Returns a TestResult.
-
- If ns.xmlpath is not None, xml_data is a list containing each
- generated testsuite element.
- """
- start_time = time.perf_counter()
- result = TestResult(test_name)
- try:
- _runtest_capture_output_timeout_junit(result, ns)
- except:
- if not ns.pgo:
- msg = traceback.format_exc()
- print(f"test {test_name} crashed -- {msg}",
- file=sys.stderr, flush=True)
- result.state = State.UNCAUGHT_EXC
- result.duration = time.perf_counter() - start_time
- return result
-
-
-def _test_module(the_module):
- loader = unittest.TestLoader()
- tests = loader.loadTestsFromModule(the_module)
- for error in loader.errors:
- print(error, file=sys.stderr)
- if loader.errors:
- raise Exception("errors while loading tests")
- return support.run_unittest(tests)
-
-
-def save_env(ns: Namespace, test_name: str):
- return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)
-
-
-def regrtest_runner(result, test_func, ns) -> None:
- # Run test_func(), collect statistics, and detect reference and memory
- # leaks.
-
- if ns.huntrleaks:
- from test.libregrtest.refleak import dash_R
- refleak, test_result = dash_R(ns, result.test_name, test_func)
- else:
- test_result = test_func()
- refleak = False
-
- if refleak:
- result.state = State.REFLEAK
-
- match test_result:
- case TestStats():
- stats = test_result
- case unittest.TestResult():
- stats = TestStats.from_unittest(test_result)
- case doctest.TestResults():
- stats = TestStats.from_doctest(test_result)
- case None:
- print_warning(f"{result.test_name} test runner returned None: {test_func}")
- stats = None
- case _:
- print_warning(f"Unknown test result type: {type(test_result)}")
- stats = None
-
- result.stats = stats
-
-
-def _load_run_test(result: TestResult, ns: Namespace) -> None:
- # Load the test function, run the test function.
-
- abstest = get_abs_module(ns, result.test_name)
-
- # remove the module from sys.module to reload it if it was already imported
- try:
- del sys.modules[abstest]
- except KeyError:
- pass
-
- the_module = importlib.import_module(abstest)
-
- if hasattr(the_module, "test_main"):
- # https://github.com/python/cpython/issues/89392
- raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest")
- test_func = functools.partial(_test_module, the_module)
-
- try:
- with save_env(ns, result.test_name):
- regrtest_runner(result, test_func, ns)
- finally:
- # First kill any dangling references to open files etc.
- # This can also issue some ResourceWarnings which would otherwise get
- # triggered during the following test run, and possibly produce
- # failures.
- support.gc_collect()
-
- cleanup_test_droppings(result.test_name, ns.verbose)
-
- if gc.garbage:
- support.environment_altered = True
- print_warning(f"{result.test_name} created {len(gc.garbage)} "
- f"uncollectable object(s).")
-
- # move the uncollectable objects somewhere,
- # so we don't see them again
- FOUND_GARBAGE.extend(gc.garbage)
- gc.garbage.clear()
-
- support.reap_children()
-
-
-def _runtest_env_changed_exc(result: TestResult, ns: Namespace,
- display_failure: bool = True) -> None:
- # Detect environment changes, handle exceptions.
-
- # Reset the environment_altered flag to detect if a test altered
- # the environment
- support.environment_altered = False
-
- if ns.pgo:
- display_failure = False
-
- test_name = result.test_name
- try:
- clear_caches()
- support.gc_collect()
-
- with save_env(ns, test_name):
- _load_run_test(result, ns)
- except support.ResourceDenied as msg:
- if not ns.quiet and not ns.pgo:
- print(f"{test_name} skipped -- {msg}", flush=True)
- result.state = State.RESOURCE_DENIED
- return
- except unittest.SkipTest as msg:
- if not ns.quiet and not ns.pgo:
- print(f"{test_name} skipped -- {msg}", flush=True)
- result.state = State.SKIPPED
- return
- except support.TestFailedWithDetails as exc:
- msg = f"test {test_name} failed"
- if display_failure:
- msg = f"{msg} -- {exc}"
- print(msg, file=sys.stderr, flush=True)
- result.state = State.FAILED
- result.errors = exc.errors
- result.failures = exc.failures
- result.stats = exc.stats
- return
- except support.TestFailed as exc:
- msg = f"test {test_name} failed"
- if display_failure:
- msg = f"{msg} -- {exc}"
- print(msg, file=sys.stderr, flush=True)
- result.state = State.FAILED
- result.stats = exc.stats
- return
- except support.TestDidNotRun:
- result.state = State.DID_NOT_RUN
- return
- except KeyboardInterrupt:
- print()
- result.state = State.INTERRUPTED
- return
- except:
- if not ns.pgo:
- msg = traceback.format_exc()
- print(f"test {test_name} crashed -- {msg}",
- file=sys.stderr, flush=True)
- result.state = State.UNCAUGHT_EXC
- return
-
- if support.environment_altered:
- result.set_env_changed()
- # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
- if result.state is None:
- result.state = State.PASSED
-
-
-def cleanup_test_droppings(test_name: str, verbose: int) -> None:
- # Try to clean up junk commonly left behind. While tests shouldn't leave
- # any files or directories behind, when a test fails that can be tedious
- # for it to arrange. The consequences can be especially nasty on Windows,
- # since if a test leaves a file open, it cannot be deleted by name (while
- # there's nothing we can do about that here either, we can display the
- # name of the offending test, which is a real help).
- for name in (os_helper.TESTFN,):
- if not os.path.exists(name):
- continue
-
- if os.path.isdir(name):
- import shutil
- kind, nuker = "directory", shutil.rmtree
- elif os.path.isfile(name):
- kind, nuker = "file", os.unlink
- else:
- raise RuntimeError(f"os.path says {name!r} exists but is neither "
- f"directory nor file")
-
- if verbose:
- print_warning(f"{test_name} left behind {kind} {name!r}")
- support.environment_altered = True
-
- try:
- import stat
- # fix possible permissions problems that might prevent cleanup
- os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
- nuker(name)
- except Exception as exc:
- print_warning(f"{test_name} left behind {kind} {name!r} "
- f"and it couldn't be removed: {exc}")
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
deleted file mode 100644
index fb1f80b..0000000
--- a/Lib/test/libregrtest/runtest_mp.py
+++ /dev/null
@@ -1,564 +0,0 @@
-import dataclasses
-import faulthandler
-import json
-import os.path
-import queue
-import signal
-import subprocess
-import sys
-import tempfile
-import threading
-import time
-import traceback
-from typing import NamedTuple, NoReturn, Literal, Any, TextIO
-
-from test import support
-from test.support import os_helper
-from test.support import TestStats
-
-from test.libregrtest.cmdline import Namespace
-from test.libregrtest.main import Regrtest
-from test.libregrtest.runtest import (
- runtest, TestResult, State,
- PROGRESS_MIN_TIME)
-from test.libregrtest.setup import setup_tests
-from test.libregrtest.utils import format_duration, print_warning
-
-if sys.platform == 'win32':
- import locale
-
-
-# Display the running tests if nothing happened last N seconds
-PROGRESS_UPDATE = 30.0 # seconds
-assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
-
-# Kill the main process after 5 minutes. It is supposed to write an update
-# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
-# buildbot workers.
-MAIN_PROCESS_TIMEOUT = 5 * 60.0
-assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
-
-# Time to wait until a worker completes: should be immediate
-JOIN_TIMEOUT = 30.0 # seconds
-
-USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
-
-
-def must_stop(result: TestResult, ns: Namespace) -> bool:
- if result.state == State.INTERRUPTED:
- return True
- if ns.failfast and result.is_failed(ns.fail_env_changed):
- return True
- return False
-
-
-def parse_worker_args(worker_args) -> tuple[Namespace, str]:
- ns_dict, test_name = json.loads(worker_args)
- ns = Namespace(**ns_dict)
- return (ns, test_name)
-
-
-def run_test_in_subprocess(testname: str, ns: Namespace, tmp_dir: str, stdout_fh: TextIO) -> subprocess.Popen:
- ns_dict = vars(ns)
- worker_args = (ns_dict, testname)
- worker_args = json.dumps(worker_args)
- if ns.python is not None:
- executable = ns.python
- else:
- executable = [sys.executable]
- cmd = [*executable, *support.args_from_interpreter_flags(),
- '-u', # Unbuffered stdout and stderr
- '-m', 'test.regrtest',
- '--worker-args', worker_args]
-
- env = dict(os.environ)
- if tmp_dir is not None:
- env['TMPDIR'] = tmp_dir
- env['TEMP'] = tmp_dir
- env['TMP'] = tmp_dir
-
- # Running the child from the same working directory as regrtest's original
- # invocation ensures that TEMPDIR for the child is the same when
- # sysconfig.is_python_build() is true. See issue 15300.
- kw = dict(
- env=env,
- stdout=stdout_fh,
- # bpo-45410: Write stderr into stdout to keep messages order
- stderr=stdout_fh,
- text=True,
- close_fds=(os.name != 'nt'),
- cwd=os_helper.SAVEDCWD,
- )
- if USE_PROCESS_GROUP:
- kw['start_new_session'] = True
- return subprocess.Popen(cmd, **kw)
-
-
-def run_tests_worker(ns: Namespace, test_name: str) -> NoReturn:
- setup_tests(ns)
-
- result = runtest(ns, test_name)
-
- print() # Force a newline (just in case)
-
- # Serialize TestResult as dict in JSON
- print(json.dumps(result, cls=EncodeTestResult), flush=True)
- sys.exit(0)
-
-
-# We do not use a generator so multiple threads can call next().
-class MultiprocessIterator:
-
- """A thread-safe iterator over tests for multiprocess mode."""
-
- def __init__(self, tests_iter):
- self.lock = threading.Lock()
- self.tests_iter = tests_iter
-
- def __iter__(self):
- return self
-
- def __next__(self):
- with self.lock:
- if self.tests_iter is None:
- raise StopIteration
- return next(self.tests_iter)
-
- def stop(self):
- with self.lock:
- self.tests_iter = None
-
-
-class MultiprocessResult(NamedTuple):
- result: TestResult
- # bpo-45410: stderr is written into stdout to keep messages order
- worker_stdout: str | None = None
- err_msg: str | None = None
-
-
-ExcStr = str
-QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
-
-
-class ExitThread(Exception):
- pass
-
-
-class TestWorkerProcess(threading.Thread):
- def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
- super().__init__()
- self.worker_id = worker_id
- self.pending = runner.pending
- self.output = runner.output
- self.ns = runner.ns
- self.timeout = runner.worker_timeout
- self.regrtest = runner.regrtest
- self.current_test_name = None
- self.start_time = None
- self._popen = None
- self._killed = False
- self._stopped = False
-
- def __repr__(self) -> str:
- info = [f'TestWorkerProcess #{self.worker_id}']
- if self.is_alive():
- info.append("running")
- else:
- info.append('stopped')
- test = self.current_test_name
- if test:
- info.append(f'test={test}')
- popen = self._popen
- if popen is not None:
- dt = time.monotonic() - self.start_time
- info.extend((f'pid={self._popen.pid}',
- f'time={format_duration(dt)}'))
- return '<%s>' % ' '.join(info)
-
- def _kill(self) -> None:
- popen = self._popen
- if popen is None:
- return
-
- if self._killed:
- return
- self._killed = True
-
- if USE_PROCESS_GROUP:
- what = f"{self} process group"
- else:
- what = f"{self}"
-
- print(f"Kill {what}", file=sys.stderr, flush=True)
- try:
- if USE_PROCESS_GROUP:
- os.killpg(popen.pid, signal.SIGKILL)
- else:
- popen.kill()
- except ProcessLookupError:
- # popen.kill(): the process completed, the TestWorkerProcess thread
- # read its exit status, but Popen.send_signal() read the returncode
- # just before Popen.wait() set returncode.
- pass
- except OSError as exc:
- print_warning(f"Failed to kill {what}: {exc!r}")
-
- def stop(self) -> None:
- # Method called from a different thread to stop this thread
- self._stopped = True
- self._kill()
-
- def mp_result_error(
- self,
- test_result: TestResult,
- stdout: str | None = None,
- err_msg=None
- ) -> MultiprocessResult:
- return MultiprocessResult(test_result, stdout, err_msg)
-
- def _run_process(self, test_name: str, tmp_dir: str, stdout_fh: TextIO) -> int:
- self.current_test_name = test_name
- try:
- popen = run_test_in_subprocess(test_name, self.ns, tmp_dir, stdout_fh)
-
- self._killed = False
- self._popen = popen
- except:
- self.current_test_name = None
- raise
-
- try:
- if self._stopped:
- # If kill() has been called before self._popen is set,
- # self._popen is still running. Call again kill()
- # to ensure that the process is killed.
- self._kill()
- raise ExitThread
-
- try:
- # gh-94026: stdout+stderr are written to tempfile
- retcode = popen.wait(timeout=self.timeout)
- assert retcode is not None
- return retcode
- except subprocess.TimeoutExpired:
- if self._stopped:
- # kill() has been called: communicate() fails on reading
- # closed stdout
- raise ExitThread
-
- # On timeout, kill the process
- self._kill()
-
- # None means TIMEOUT for the caller
- retcode = None
- # bpo-38207: Don't attempt to call communicate() again: on it
- # can hang until all child processes using stdout
- # pipes completes.
- except OSError:
- if self._stopped:
- # kill() has been called: communicate() fails
- # on reading closed stdout
- raise ExitThread
- raise
- except:
- self._kill()
- raise
- finally:
- self._wait_completed()
- self._popen = None
- self.current_test_name = None
-
- def _runtest(self, test_name: str) -> MultiprocessResult:
- if sys.platform == 'win32':
- # gh-95027: When stdout is not a TTY, Python uses the ANSI code
- # page for the sys.stdout encoding. If the main process runs in a
- # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding.
- encoding = locale.getencoding()
- else:
- encoding = sys.stdout.encoding
-
- # gh-94026: Write stdout+stderr to a tempfile as workaround for
- # non-blocking pipes on Emscripten with NodeJS.
- with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_fh:
- # gh-93353: Check for leaked temporary files in the parent process,
- # since the deletion of temporary files can happen late during
- # Python finalization: too late for libregrtest.
- if not support.is_wasi:
- # Don't check for leaked temporary files and directories if Python is
- # run on WASI. WASI don't pass environment variables like TMPDIR to
- # worker processes.
- tmp_dir = tempfile.mkdtemp(prefix="test_python_")
- tmp_dir = os.path.abspath(tmp_dir)
- try:
- retcode = self._run_process(test_name, tmp_dir, stdout_fh)
- finally:
- tmp_files = os.listdir(tmp_dir)
- os_helper.rmtree(tmp_dir)
- else:
- retcode = self._run_process(test_name, None, stdout_fh)
- tmp_files = ()
- stdout_fh.seek(0)
-
- try:
- stdout = stdout_fh.read().strip()
- except Exception as exc:
- # gh-101634: Catch UnicodeDecodeError if stdout cannot be
- # decoded from encoding
- err_msg = f"Cannot read process stdout: {exc}"
- result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
- return self.mp_result_error(result, err_msg=err_msg)
-
- if retcode is None:
- result = TestResult(test_name, state=State.TIMEOUT)
- return self.mp_result_error(result, stdout)
-
- err_msg = None
- if retcode != 0:
- err_msg = "Exit code %s" % retcode
- else:
- stdout, _, worker_json = stdout.rpartition("\n")
- stdout = stdout.rstrip()
- if not worker_json:
- err_msg = "Failed to parse worker stdout"
- else:
- try:
- # deserialize run_tests_worker() output
- result = json.loads(worker_json,
- object_hook=decode_test_result)
- except Exception as exc:
- err_msg = "Failed to parse worker JSON: %s" % exc
-
- if err_msg:
- result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
- return self.mp_result_error(result, stdout, err_msg)
-
- if tmp_files:
- msg = (f'\n\n'
- f'Warning -- {test_name} leaked temporary files '
- f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}')
- stdout += msg
- result.set_env_changed()
-
- return MultiprocessResult(result, stdout)
-
- def run(self) -> None:
- while not self._stopped:
- try:
- try:
- test_name = next(self.pending)
- except StopIteration:
- break
-
- self.start_time = time.monotonic()
- mp_result = self._runtest(test_name)
- mp_result.result.duration = time.monotonic() - self.start_time
- self.output.put((False, mp_result))
-
- if must_stop(mp_result.result, self.ns):
- break
- except ExitThread:
- break
- except BaseException:
- self.output.put((True, traceback.format_exc()))
- break
-
- def _wait_completed(self) -> None:
- popen = self._popen
-
- try:
- popen.wait(JOIN_TIMEOUT)
- except (subprocess.TimeoutExpired, OSError) as exc:
- print_warning(f"Failed to wait for {self} completion "
- f"(timeout={format_duration(JOIN_TIMEOUT)}): "
- f"{exc!r}")
-
- def wait_stopped(self, start_time: float) -> None:
- # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
- # which killed the process. Sometimes, killing the process from the
- # main thread does not interrupt popen.communicate() in
- # TestWorkerProcess thread. This loop with a timeout is a workaround
- # for that.
- #
- # Moreover, if this method fails to join the thread, it is likely
- # that Python will hang at exit while calling threading._shutdown()
- # which tries again to join the blocked thread. Regrtest.main()
- # uses EXIT_TIMEOUT to workaround this second bug.
- while True:
- # Write a message every second
- self.join(1.0)
- if not self.is_alive():
- break
- dt = time.monotonic() - start_time
- self.regrtest.log(f"Waiting for {self} thread "
- f"for {format_duration(dt)}")
- if dt > JOIN_TIMEOUT:
- print_warning(f"Failed to join {self} in {format_duration(dt)}")
- break
-
-
-def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
- running = []
- for worker in workers:
- current_test_name = worker.current_test_name
- if not current_test_name:
- continue
- dt = time.monotonic() - worker.start_time
- if dt >= PROGRESS_MIN_TIME:
- text = '%s (%s)' % (current_test_name, format_duration(dt))
- running.append(text)
- return running
-
-
-class MultiprocessTestRunner:
- def __init__(self, regrtest: Regrtest) -> None:
- self.regrtest = regrtest
- self.log = self.regrtest.log
- self.ns = regrtest.ns
- self.output: queue.Queue[QueueOutput] = queue.Queue()
- self.pending = MultiprocessIterator(self.regrtest.tests)
- if self.ns.timeout is not None:
- # Rely on faulthandler to kill a worker process. This timouet is
- # when faulthandler fails to kill a worker process. Give a maximum
- # of 5 minutes to faulthandler to kill the worker.
- self.worker_timeout = min(self.ns.timeout * 1.5,
- self.ns.timeout + 5 * 60)
- else:
- self.worker_timeout = None
- self.workers = None
-
- def start_workers(self) -> None:
- self.workers = [TestWorkerProcess(index, self)
- for index in range(1, self.ns.use_mp + 1)]
- msg = f"Run tests in parallel using {len(self.workers)} child processes"
- if self.ns.timeout:
- msg += (" (timeout: %s, worker timeout: %s)"
- % (format_duration(self.ns.timeout),
- format_duration(self.worker_timeout)))
- self.log(msg)
- for worker in self.workers:
- worker.start()
-
- def stop_workers(self) -> None:
- start_time = time.monotonic()
- for worker in self.workers:
- worker.stop()
- for worker in self.workers:
- worker.wait_stopped(start_time)
-
- def _get_result(self) -> QueueOutput | None:
- use_faulthandler = (self.ns.timeout is not None)
- timeout = PROGRESS_UPDATE
-
- # bpo-46205: check the status of workers every iteration to avoid
- # waiting forever on an empty queue.
- while any(worker.is_alive() for worker in self.workers):
- if use_faulthandler:
- faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
- exit=True)
-
- # wait for a thread
- try:
- return self.output.get(timeout=timeout)
- except queue.Empty:
- pass
-
- # display progress
- running = get_running(self.workers)
- if running and not self.ns.pgo:
- self.log('running: %s' % ', '.join(running))
-
- # all worker threads are done: consume pending results
- try:
- return self.output.get(timeout=0)
- except queue.Empty:
- return None
-
- def display_result(self, mp_result: MultiprocessResult) -> None:
- result = mp_result.result
-
- text = str(result)
- if mp_result.err_msg:
- # MULTIPROCESSING_ERROR
- text += ' (%s)' % mp_result.err_msg
- elif (result.duration >= PROGRESS_MIN_TIME and not self.ns.pgo):
- text += ' (%s)' % format_duration(result.duration)
- running = get_running(self.workers)
- if running and not self.ns.pgo:
- text += ' -- running: %s' % ', '.join(running)
- self.regrtest.display_progress(self.test_index, text)
-
- def _process_result(self, item: QueueOutput) -> bool:
- """Returns True if test runner must stop."""
- if item[0]:
- # Thread got an exception
- format_exc = item[1]
- print_warning(f"regrtest worker thread failed: {format_exc}")
- result = TestResult("<regrtest worker>", state=State.MULTIPROCESSING_ERROR)
- self.regrtest.accumulate_result(result)
- return True
-
- self.test_index += 1
- mp_result = item[1]
- self.regrtest.accumulate_result(mp_result.result)
- self.display_result(mp_result)
-
- if mp_result.worker_stdout:
- print(mp_result.worker_stdout, flush=True)
-
- if must_stop(mp_result.result, self.ns):
- return True
-
- return False
-
- def run_tests(self) -> None:
- self.start_workers()
-
- self.test_index = 0
- try:
- while True:
- item = self._get_result()
- if item is None:
- break
-
- stop = self._process_result(item)
- if stop:
- break
- except KeyboardInterrupt:
- print()
- self.regrtest.interrupted = True
- finally:
- if self.ns.timeout is not None:
- faulthandler.cancel_dump_traceback_later()
-
- # Always ensure that all worker processes are no longer
- # worker when we exit this function
- self.pending.stop()
- self.stop_workers()
-
-
-def run_tests_multiprocess(regrtest: Regrtest) -> None:
- MultiprocessTestRunner(regrtest).run_tests()
-
-
-class EncodeTestResult(json.JSONEncoder):
- """Encode a TestResult (sub)class object into a JSON dict."""
-
- def default(self, o: Any) -> dict[str, Any]:
- if isinstance(o, TestResult):
- result = dataclasses.asdict(o)
- result["__test_result__"] = o.__class__.__name__
- return result
-
- return super().default(o)
-
-
-def decode_test_result(d: dict[str, Any]) -> TestResult | TestStats | dict[str, Any]:
- """Decode a TestResult (sub)class object from a JSON dict."""
-
- if "__test_result__" not in d:
- return d
-
- d.pop('__test_result__')
- if d['stats'] is not None:
- d['stats'] = TestStats(**d['stats'])
- return TestResult(**d)
diff --git a/Lib/test/libregrtest/runtests.py b/Lib/test/libregrtest/runtests.py
new file mode 100644
index 0000000..4da312d
--- /dev/null
+++ b/Lib/test/libregrtest/runtests.py
@@ -0,0 +1,162 @@
+import contextlib
+import dataclasses
+import json
+import os
+import subprocess
+from typing import Any
+
+from test import support
+
+from .utils import (
+ StrPath, StrJSON, TestTuple, FilterTuple, FilterDict)
+
+
+class JsonFileType:
+ UNIX_FD = "UNIX_FD"
+ WINDOWS_HANDLE = "WINDOWS_HANDLE"
+ STDOUT = "STDOUT"
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class JsonFile:
+ # file type depends on file_type:
+ # - UNIX_FD: file descriptor (int)
+ # - WINDOWS_HANDLE: handle (int)
+ # - STDOUT: use process stdout (None)
+ file: int | None
+ file_type: str
+
+ def configure_subprocess(self, popen_kwargs: dict) -> None:
+ match self.file_type:
+ case JsonFileType.UNIX_FD:
+ # Unix file descriptor
+ popen_kwargs['pass_fds'] = [self.file]
+ case JsonFileType.WINDOWS_HANDLE:
+ # Windows handle
+ startupinfo = subprocess.STARTUPINFO()
+ startupinfo.lpAttributeList = {"handle_list": [self.file]}
+ popen_kwargs['startupinfo'] = startupinfo
+
+ @contextlib.contextmanager
+ def inherit_subprocess(self):
+ if self.file_type == JsonFileType.WINDOWS_HANDLE:
+ os.set_handle_inheritable(self.file, True)
+ try:
+ yield
+ finally:
+ os.set_handle_inheritable(self.file, False)
+ else:
+ yield
+
+ def open(self, mode='r', *, encoding):
+ if self.file_type == JsonFileType.STDOUT:
+ raise ValueError("for STDOUT file type, just use sys.stdout")
+
+ file = self.file
+ if self.file_type == JsonFileType.WINDOWS_HANDLE:
+ import msvcrt
+ # Create a file descriptor from the handle
+ file = msvcrt.open_osfhandle(file, os.O_WRONLY)
+ return open(file, mode, encoding=encoding)
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class HuntRefleak:
+ warmups: int
+ runs: int
+ filename: StrPath
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class RunTests:
+ tests: TestTuple
+ fail_fast: bool
+ fail_env_changed: bool
+ match_tests: FilterTuple | None
+ ignore_tests: FilterTuple | None
+ match_tests_dict: FilterDict | None
+ rerun: bool
+ forever: bool
+ pgo: bool
+ pgo_extended: bool
+ output_on_failure: bool
+ timeout: float | None
+ verbose: int
+ quiet: bool
+ hunt_refleak: HuntRefleak | None
+ test_dir: StrPath | None
+ use_junit: bool
+ memory_limit: str | None
+ gc_threshold: int | None
+ use_resources: tuple[str, ...]
+ python_cmd: tuple[str, ...] | None
+ randomize: bool
+ random_seed: int | None
+ json_file: JsonFile | None
+
+ def copy(self, **override):
+ state = dataclasses.asdict(self)
+ state.update(override)
+ return RunTests(**state)
+
+ def get_match_tests(self, test_name) -> FilterTuple | None:
+ if self.match_tests_dict is not None:
+ return self.match_tests_dict.get(test_name, None)
+ else:
+ return None
+
+ def get_jobs(self):
+ # Number of run_single_test() calls needed to run all tests.
+ # None means that there is not bound limit (--forever option).
+ if self.forever:
+ return None
+ return len(self.tests)
+
+ def iter_tests(self):
+ if self.forever:
+ while True:
+ yield from self.tests
+ else:
+ yield from self.tests
+
+ def as_json(self) -> StrJSON:
+ return json.dumps(self, cls=_EncodeRunTests)
+
+ @staticmethod
+ def from_json(worker_json: StrJSON) -> 'RunTests':
+ return json.loads(worker_json, object_hook=_decode_runtests)
+
+ def json_file_use_stdout(self) -> bool:
+ # Use STDOUT in two cases:
+ #
+ # - If --python command line option is used;
+ # - On Emscripten and WASI.
+ #
+ # On other platforms, UNIX_FD or WINDOWS_HANDLE can be used.
+ return (
+ bool(self.python_cmd)
+ or support.is_emscripten
+ or support.is_wasi
+ )
+
+
+class _EncodeRunTests(json.JSONEncoder):
+ def default(self, o: Any) -> dict[str, Any]:
+ if isinstance(o, RunTests):
+ result = dataclasses.asdict(o)
+ result["__runtests__"] = True
+ return result
+ else:
+ return super().default(o)
+
+
+def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]:
+ if "__runtests__" in data:
+ data.pop('__runtests__')
+ if data['hunt_refleak']:
+ data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak'])
+ if data['json_file']:
+ data['json_file'] = JsonFile(**data['json_file'])
+ return RunTests(**data)
+ else:
+ return data
diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py
index 7e801a5..b2cc381 100644
--- a/Lib/test/libregrtest/save_env.py
+++ b/Lib/test/libregrtest/save_env.py
@@ -3,9 +3,11 @@ import locale
import os
import sys
import threading
+
from test import support
from test.support import os_helper
-from test.libregrtest.utils import print_warning
+
+from .utils import print_warning
class SkipTestEnvironment(Exception):
@@ -34,7 +36,7 @@ class saved_test_environment:
items is also printed.
"""
- def __init__(self, test_name, verbose=0, quiet=False, *, pgo=False):
+ def __init__(self, test_name, verbose, quiet, *, pgo):
self.test_name = test_name
self.verbose = verbose
self.quiet = quiet
@@ -161,11 +163,11 @@ class saved_test_environment:
warnings.filters[:] = saved_filters[2]
def get_asyncore_socket_map(self):
- asyncore = sys.modules.get('asyncore')
+ asyncore = sys.modules.get('test.support.asyncore')
# XXX Making a copy keeps objects alive until __exit__ gets called.
return asyncore and asyncore.socket_map.copy() or {}
def restore_asyncore_socket_map(self, saved_map):
- asyncore = sys.modules.get('asyncore')
+ asyncore = sys.modules.get('test.support.asyncore')
if asyncore is not None:
asyncore.close_all(ignore_all=True)
asyncore.socket_map.update(saved_map)
@@ -257,8 +259,10 @@ class saved_test_environment:
sysconfig._INSTALL_SCHEMES.update(saved[2])
def get_files(self):
+ # XXX: Maybe add an allow-list here?
return sorted(fn + ('/' if os.path.isdir(fn) else '')
- for fn in os.listdir())
+ for fn in os.listdir()
+ if not fn.startswith(".hypothesis"))
def restore_files(self, saved_value):
fn = os_helper.TESTFN
if fn not in saved_value and (fn + '/') not in saved_value:
diff --git a/Lib/test/libregrtest/setup.py b/Lib/test/libregrtest/setup.py
index ecd7fa3..793347f 100644
--- a/Lib/test/libregrtest/setup.py
+++ b/Lib/test/libregrtest/setup.py
@@ -1,24 +1,32 @@
-import atexit
import faulthandler
+import gc
import os
+import random
import signal
import sys
import unittest
from test import support
from test.support.os_helper import TESTFN_UNDECODABLE, FS_NONASCII
-try:
- import gc
-except ImportError:
- gc = None
-from test.libregrtest.utils import (setup_unraisable_hook,
- setup_threading_excepthook)
+from .runtests import RunTests
+from .utils import (
+ setup_unraisable_hook, setup_threading_excepthook, fix_umask,
+ adjust_rlimit_nofile)
UNICODE_GUARD_ENV = "PYTHONREGRTEST_UNICODE_GUARD"
-def setup_tests(ns):
+def setup_test_dir(testdir: str | None) -> None:
+ if testdir:
+ # Prepend test directory to sys.path, so runtest() will be able
+ # to locate tests
+ sys.path.insert(0, os.path.abspath(testdir))
+
+
+def setup_process():
+ fix_umask()
+
try:
stderr_fd = sys.__stderr__.fileno()
except (ValueError, AttributeError):
@@ -40,14 +48,9 @@ def setup_tests(ns):
for signum in signals:
faulthandler.register(signum, chain=True, file=stderr_fd)
- _adjust_resource_limits()
- replace_stdout()
- support.record_original_stdout(sys.stdout)
+ adjust_rlimit_nofile()
- if ns.testdir:
- # Prepend test directory to sys.path, so runtest() will be able
- # to locate tests
- sys.path.insert(0, os.path.abspath(ns.testdir))
+ support.record_original_stdout(sys.stdout)
# Some times __path__ and __file__ are not absolute (e.g. while running from
# Lib/) and, if we change the CWD to run the tests in a temporary dir, some
@@ -66,19 +69,6 @@ def setup_tests(ns):
if getattr(module, '__file__', None):
module.__file__ = os.path.abspath(module.__file__)
- if ns.huntrleaks:
- unittest.BaseTestSuite._cleanup = False
-
- if ns.memlimit is not None:
- support.set_memlimit(ns.memlimit)
-
- if ns.threshold is not None:
- gc.set_threshold(ns.threshold)
-
- support.suppress_msvcrt_asserts(ns.verbose and ns.verbose >= 2)
-
- support.use_resources = ns.use_resources
-
if hasattr(sys, 'addaudithook'):
# Add an auditing hook for all tests to ensure PySys_Audit is tested
def _test_audit_hook(name, args):
@@ -88,7 +78,37 @@ def setup_tests(ns):
setup_unraisable_hook()
setup_threading_excepthook()
- timeout = ns.timeout
+ # Ensure there's a non-ASCII character in env vars at all times to force
+ # tests consider this case. See BPO-44647 for details.
+ if TESTFN_UNDECODABLE and os.supports_bytes_environ:
+ os.environb.setdefault(UNICODE_GUARD_ENV.encode(), TESTFN_UNDECODABLE)
+ elif FS_NONASCII:
+ os.environ.setdefault(UNICODE_GUARD_ENV, FS_NONASCII)
+
+
+def setup_tests(runtests: RunTests):
+ support.verbose = runtests.verbose
+ support.failfast = runtests.fail_fast
+ support.PGO = runtests.pgo
+ support.PGO_EXTENDED = runtests.pgo_extended
+
+ support.set_match_tests(runtests.match_tests, runtests.ignore_tests)
+
+ if runtests.use_junit:
+ support.junit_xml_list = []
+ from test.support.testresult import RegressionTestResult
+ RegressionTestResult.USE_XML = True
+ else:
+ support.junit_xml_list = None
+
+ if runtests.memory_limit is not None:
+ support.set_memlimit(runtests.memory_limit)
+
+ support.suppress_msvcrt_asserts(runtests.verbose >= 2)
+
+ support.use_resources = runtests.use_resources
+
+ timeout = runtests.timeout
if timeout is not None:
# For a slow buildbot worker, increase SHORT_TIMEOUT and LONG_TIMEOUT
support.LOOPBACK_TIMEOUT = max(support.LOOPBACK_TIMEOUT, timeout / 120)
@@ -102,61 +122,10 @@ def setup_tests(ns):
support.SHORT_TIMEOUT = min(support.SHORT_TIMEOUT, timeout)
support.LONG_TIMEOUT = min(support.LONG_TIMEOUT, timeout)
- if ns.xmlpath:
- from test.support.testresult import RegressionTestResult
- RegressionTestResult.USE_XML = True
-
- # Ensure there's a non-ASCII character in env vars at all times to force
- # tests consider this case. See BPO-44647 for details.
- if TESTFN_UNDECODABLE and os.supports_bytes_environ:
- os.environb.setdefault(UNICODE_GUARD_ENV.encode(), TESTFN_UNDECODABLE)
- elif FS_NONASCII:
- os.environ.setdefault(UNICODE_GUARD_ENV, FS_NONASCII)
-
-
-def replace_stdout():
- """Set stdout encoder error handler to backslashreplace (as stderr error
- handler) to avoid UnicodeEncodeError when printing a traceback"""
- stdout = sys.stdout
- try:
- fd = stdout.fileno()
- except ValueError:
- # On IDLE, sys.stdout has no file descriptor and is not a TextIOWrapper
- # object. Leaving sys.stdout unchanged.
- #
- # Catch ValueError to catch io.UnsupportedOperation on TextIOBase
- # and ValueError on a closed stream.
- return
-
- sys.stdout = open(fd, 'w',
- encoding=stdout.encoding,
- errors="backslashreplace",
- closefd=False,
- newline='\n')
-
- def restore_stdout():
- sys.stdout.close()
- sys.stdout = stdout
- atexit.register(restore_stdout)
+ if runtests.hunt_refleak:
+ unittest.BaseTestSuite._cleanup = False
+ if runtests.gc_threshold is not None:
+ gc.set_threshold(runtests.gc_threshold)
-def _adjust_resource_limits():
- """Adjust the system resource limits (ulimit) if needed."""
- try:
- import resource
- from resource import RLIMIT_NOFILE, RLIM_INFINITY
- except ImportError:
- return
- fd_limit, max_fds = resource.getrlimit(RLIMIT_NOFILE)
- # On macOS the default fd limit is sometimes too low (256) for our
- # test suite to succeed. Raise it to something more reasonable.
- # 1024 is a common Linux default.
- desired_fds = 1024
- if fd_limit < desired_fds and fd_limit < max_fds:
- new_fd_limit = min(desired_fds, max_fds)
- try:
- resource.setrlimit(RLIMIT_NOFILE, (new_fd_limit, max_fds))
- print(f"Raised RLIMIT_NOFILE: {fd_limit} -> {new_fd_limit}")
- except (ValueError, OSError) as err:
- print(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to "
- f"{new_fd_limit}: {err}.")
+ random.seed(runtests.random_seed)
diff --git a/Lib/test/libregrtest/single.py b/Lib/test/libregrtest/single.py
new file mode 100644
index 0000000..0304f85
--- /dev/null
+++ b/Lib/test/libregrtest/single.py
@@ -0,0 +1,278 @@
+import doctest
+import faulthandler
+import gc
+import importlib
+import io
+import sys
+import time
+import traceback
+import unittest
+
+from test import support
+from test.support import TestStats
+from test.support import threading_helper
+
+from .result import State, TestResult
+from .runtests import RunTests
+from .save_env import saved_test_environment
+from .setup import setup_tests
+from .utils import (
+ TestName,
+ clear_caches, remove_testfn, abs_module_name, print_warning)
+
+
+# Minimum duration of a test to display its duration or to mention that
+# the test is running in background
+PROGRESS_MIN_TIME = 30.0 # seconds
+
+
+def run_unittest(test_mod):
+ loader = unittest.TestLoader()
+ tests = loader.loadTestsFromModule(test_mod)
+ for error in loader.errors:
+ print(error, file=sys.stderr)
+ if loader.errors:
+ raise Exception("errors while loading tests")
+ return support.run_unittest(tests)
+
+
+def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None:
+ # Run test_func(), collect statistics, and detect reference and memory
+ # leaks.
+ if runtests.hunt_refleak:
+ from .refleak import runtest_refleak
+ refleak, test_result = runtest_refleak(result.test_name, test_func,
+ runtests.hunt_refleak,
+ runtests.quiet)
+ else:
+ test_result = test_func()
+ refleak = False
+
+ if refleak:
+ result.state = State.REFLEAK
+
+ stats: TestStats | None
+
+ match test_result:
+ case TestStats():
+ stats = test_result
+ case unittest.TestResult():
+ stats = TestStats.from_unittest(test_result)
+ case doctest.TestResults():
+ stats = TestStats.from_doctest(test_result)
+ case None:
+ print_warning(f"{result.test_name} test runner returned None: {test_func}")
+ stats = None
+ case _:
+ print_warning(f"Unknown test result type: {type(test_result)}")
+ stats = None
+
+ result.stats = stats
+
+
+# Storage of uncollectable GC objects (gc.garbage)
+GC_GARBAGE = []
+
+
+def _load_run_test(result: TestResult, runtests: RunTests) -> None:
+ # Load the test module and run the tests.
+ test_name = result.test_name
+ module_name = abs_module_name(test_name, runtests.test_dir)
+
+ # Remove the module from sys.module to reload it if it was already imported
+ sys.modules.pop(module_name, None)
+
+ test_mod = importlib.import_module(module_name)
+
+ if hasattr(test_mod, "test_main"):
+ # https://github.com/python/cpython/issues/89392
+ raise Exception(f"Module {test_name} defines test_main() which "
+ f"is no longer supported by regrtest")
+ def test_func():
+ return run_unittest(test_mod)
+
+ try:
+ regrtest_runner(result, test_func, runtests)
+ finally:
+ # First kill any dangling references to open files etc.
+ # This can also issue some ResourceWarnings which would otherwise get
+ # triggered during the following test run, and possibly produce
+ # failures.
+ support.gc_collect()
+
+ remove_testfn(test_name, runtests.verbose)
+
+ if gc.garbage:
+ support.environment_altered = True
+ print_warning(f"{test_name} created {len(gc.garbage)} "
+ f"uncollectable object(s)")
+
+ # move the uncollectable objects somewhere,
+ # so we don't see them again
+ GC_GARBAGE.extend(gc.garbage)
+ gc.garbage.clear()
+
+ support.reap_children()
+
+
+def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
+ display_failure: bool = True) -> None:
+ # Handle exceptions, detect environment changes.
+
+ # Reset the environment_altered flag to detect if a test altered
+ # the environment
+ support.environment_altered = False
+
+ pgo = runtests.pgo
+ if pgo:
+ display_failure = False
+ quiet = runtests.quiet
+
+ test_name = result.test_name
+ try:
+ clear_caches()
+ support.gc_collect()
+
+ with saved_test_environment(test_name,
+ runtests.verbose, quiet, pgo=pgo):
+ _load_run_test(result, runtests)
+ except support.ResourceDenied as exc:
+ if not quiet and not pgo:
+ print(f"{test_name} skipped -- {exc}", flush=True)
+ result.state = State.RESOURCE_DENIED
+ return
+ except unittest.SkipTest as exc:
+ if not quiet and not pgo:
+ print(f"{test_name} skipped -- {exc}", flush=True)
+ result.state = State.SKIPPED
+ return
+ except support.TestFailedWithDetails as exc:
+ msg = f"test {test_name} failed"
+ if display_failure:
+ msg = f"{msg} -- {exc}"
+ print(msg, file=sys.stderr, flush=True)
+ result.state = State.FAILED
+ result.errors = exc.errors
+ result.failures = exc.failures
+ result.stats = exc.stats
+ return
+ except support.TestFailed as exc:
+ msg = f"test {test_name} failed"
+ if display_failure:
+ msg = f"{msg} -- {exc}"
+ print(msg, file=sys.stderr, flush=True)
+ result.state = State.FAILED
+ result.stats = exc.stats
+ return
+ except support.TestDidNotRun:
+ result.state = State.DID_NOT_RUN
+ return
+ except KeyboardInterrupt:
+ print()
+ result.state = State.INTERRUPTED
+ return
+ except:
+ if not pgo:
+ msg = traceback.format_exc()
+ print(f"test {test_name} crashed -- {msg}",
+ file=sys.stderr, flush=True)
+ result.state = State.UNCAUGHT_EXC
+ return
+
+ if support.environment_altered:
+ result.set_env_changed()
+ # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
+ if result.state is None:
+ result.state = State.PASSED
+
+
+def _runtest(result: TestResult, runtests: RunTests) -> None:
+ # Capture stdout and stderr, set faulthandler timeout,
+ # and create JUnit XML report.
+ verbose = runtests.verbose
+ output_on_failure = runtests.output_on_failure
+ timeout = runtests.timeout
+
+ use_timeout = (
+ timeout is not None and threading_helper.can_start_thread
+ )
+ if use_timeout:
+ faulthandler.dump_traceback_later(timeout, exit=True)
+
+ try:
+ setup_tests(runtests)
+
+ if output_on_failure:
+ support.verbose = True
+
+ stream = io.StringIO()
+ orig_stdout = sys.stdout
+ orig_stderr = sys.stderr
+ print_warning = support.print_warning
+ orig_print_warnings_stderr = print_warning.orig_stderr
+
+ output = None
+ try:
+ sys.stdout = stream
+ sys.stderr = stream
+ # print_warning() writes into the temporary stream to preserve
+ # messages order. If support.environment_altered becomes true,
+ # warnings will be written to sys.stderr below.
+ print_warning.orig_stderr = stream
+
+ _runtest_env_changed_exc(result, runtests, display_failure=False)
+ # Ignore output if the test passed successfully
+ if result.state != State.PASSED:
+ output = stream.getvalue()
+ finally:
+ sys.stdout = orig_stdout
+ sys.stderr = orig_stderr
+ print_warning.orig_stderr = orig_print_warnings_stderr
+
+ if output is not None:
+ sys.stderr.write(output)
+ sys.stderr.flush()
+ else:
+ # Tell tests to be moderately quiet
+ support.verbose = verbose
+ _runtest_env_changed_exc(result, runtests,
+ display_failure=not verbose)
+
+ xml_list = support.junit_xml_list
+ if xml_list:
+ import xml.etree.ElementTree as ET
+ result.xml_data = [ET.tostring(x).decode('us-ascii')
+ for x in xml_list]
+ finally:
+ if use_timeout:
+ faulthandler.cancel_dump_traceback_later()
+ support.junit_xml_list = None
+
+
+def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult:
+ """Run a single test.
+
+ test_name -- the name of the test
+
+ Returns a TestResult.
+
+ If runtests.use_junit, xml_data is a list containing each generated
+ testsuite element.
+ """
+ start_time = time.perf_counter()
+ result = TestResult(test_name)
+ pgo = runtests.pgo
+ try:
+ _runtest(result, runtests)
+ except:
+ if not pgo:
+ msg = traceback.format_exc()
+ print(f"test {test_name} crashed -- {msg}",
+ file=sys.stderr, flush=True)
+ result.state = State.UNCAUGHT_EXC
+
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ result.duration = time.perf_counter() - start_time
+ return result
diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py
index 49f3a23..b5bbe0e 100644
--- a/Lib/test/libregrtest/utils.py
+++ b/Lib/test/libregrtest/utils.py
@@ -1,9 +1,59 @@
+import contextlib
+import faulthandler
+import locale
import math
import os.path
+import platform
+import random
+import shlex
+import signal
+import subprocess
import sys
import sysconfig
+import tempfile
import textwrap
+from collections.abc import Callable
+
from test import support
+from test.support import os_helper
+from test.support import threading_helper
+
+
+# All temporary files and temporary directories created by libregrtest should
+# use TMP_PREFIX so cleanup_temp_dir() can remove them all.
+TMP_PREFIX = 'test_python_'
+WORK_DIR_PREFIX = TMP_PREFIX
+WORKER_WORK_DIR_PREFIX = WORK_DIR_PREFIX + 'worker_'
+
+# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
+# Used to protect against threading._shutdown() hang.
+# Must be smaller than buildbot "1200 seconds without output" limit.
+EXIT_TIMEOUT = 120.0
+
+
+ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network',
+ 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui', 'walltime')
+
+# Other resources excluded from --use=all:
+#
+# - extralagefile (ex: test_zipfile64): really too slow to be enabled
+# "by default"
+# - tzdata: while needed to validate fully test_datetime, it makes
+# test_datetime too slow (15-20 min on some buildbots) and so is disabled by
+# default (see bpo-30822).
+RESOURCE_NAMES = ALL_RESOURCES + ('extralargefile', 'tzdata')
+
+
+# Types for types hints
+StrPath = str
+TestName = str
+StrJSON = str
+TestTuple = tuple[TestName, ...]
+TestList = list[TestName]
+# --match and --ignore options: list of patterns
+# ('*' joker character can be used)
+FilterTuple = tuple[TestName, ...]
+FilterDict = dict[TestName, FilterTuple]
def format_duration(seconds):
@@ -31,7 +81,7 @@ def format_duration(seconds):
return ' '.join(parts)
-def removepy(names):
+def strip_py_suffix(names: list[str] | None) -> None:
if not names:
return
for idx, name in enumerate(names):
@@ -40,11 +90,20 @@ def removepy(names):
names[idx] = basename
+def plural(n, singular, plural=None):
+ if n == 1:
+ return singular
+ elif plural is not None:
+ return plural
+ else:
+ return singular + 's'
+
+
def count(n, word):
if n == 1:
- return "%d %s" % (n, word)
+ return f"{n} {word}"
else:
- return "%d %ss" % (n, word)
+ return f"{n} {word}s"
def printlist(x, width=70, indent=4, file=None):
@@ -125,15 +184,6 @@ def clear_caches():
if stream is not None:
stream.flush()
- # Clear assorted module caches.
- # Don't worry about resetting the cache if the module is not loaded
- try:
- distutils_dir_util = sys.modules['distutils.dir_util']
- except KeyError:
- pass
- else:
- distutils_dir_util._path_created.clear()
-
try:
re = sys.modules['re']
except KeyError:
@@ -212,6 +262,13 @@ def clear_caches():
for f in typing._cleanups:
f()
+ try:
+ fractions = sys.modules['fractions']
+ except KeyError:
+ pass
+ else:
+ fractions._hash_algorithm.cache_clear()
+
def get_build_info():
# Get most important configure and build options as a list of strings.
@@ -292,3 +349,331 @@ def get_build_info():
build.append("dtrace")
return build
+
+
+def get_temp_dir(tmp_dir: StrPath | None = None) -> StrPath:
+ if tmp_dir:
+ tmp_dir = os.path.expanduser(tmp_dir)
+ else:
+ # When tests are run from the Python build directory, it is best practice
+ # to keep the test files in a subfolder. This eases the cleanup of leftover
+ # files using the "make distclean" command.
+ if sysconfig.is_python_build():
+ if not support.is_wasi:
+ tmp_dir = sysconfig.get_config_var('abs_builddir')
+ if tmp_dir is None:
+ tmp_dir = sysconfig.get_config_var('abs_srcdir')
+ if not tmp_dir:
+ # gh-74470: On Windows, only srcdir is available. Using
+ # abs_builddir mostly matters on UNIX when building
+ # Python out of the source tree, especially when the
+ # source tree is read only.
+ tmp_dir = sysconfig.get_config_var('srcdir')
+ tmp_dir = os.path.join(tmp_dir, 'build')
+ else:
+ # WASI platform
+ tmp_dir = sysconfig.get_config_var('projectbase')
+ tmp_dir = os.path.join(tmp_dir, 'build')
+
+ # When get_temp_dir() is called in a worker process,
+ # get_temp_dir() path is different than in the parent process
+ # which is not a WASI process. So the parent does not create
+ # the same "tmp_dir" than the test worker process.
+ os.makedirs(tmp_dir, exist_ok=True)
+ else:
+ tmp_dir = tempfile.gettempdir()
+
+ return os.path.abspath(tmp_dir)
+
+
+def fix_umask():
+ if support.is_emscripten:
+ # Emscripten has default umask 0o777, which breaks some tests.
+ # see https://github.com/emscripten-core/emscripten/issues/17269
+ old_mask = os.umask(0)
+ if old_mask == 0o777:
+ os.umask(0o027)
+ else:
+ os.umask(old_mask)
+
+
+def get_work_dir(parent_dir: StrPath, worker: bool = False) -> StrPath:
+ # Define a writable temp dir that will be used as cwd while running
+ # the tests. The name of the dir includes the pid to allow parallel
+ # testing (see the -j option).
+ # Emscripten and WASI have stubbed getpid(), Emscripten has only
+ # milisecond clock resolution. Use randint() instead.
+ if support.is_emscripten or support.is_wasi:
+ nounce = random.randint(0, 1_000_000)
+ else:
+ nounce = os.getpid()
+
+ if worker:
+ work_dir = WORK_DIR_PREFIX + str(nounce)
+ else:
+ work_dir = WORKER_WORK_DIR_PREFIX + str(nounce)
+ work_dir += os_helper.FS_NONASCII
+ work_dir = os.path.join(parent_dir, work_dir)
+ return work_dir
+
+
+@contextlib.contextmanager
+def exit_timeout():
+ try:
+ yield
+ except SystemExit as exc:
+ # bpo-38203: Python can hang at exit in Py_Finalize(), especially
+ # on threading._shutdown() call: put a timeout
+ if threading_helper.can_start_thread:
+ faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
+ sys.exit(exc.code)
+
+
+def remove_testfn(test_name: TestName, verbose: int) -> None:
+ # Try to clean up os_helper.TESTFN if left behind.
+ #
+ # While tests shouldn't leave any files or directories behind, when a test
+ # fails that can be tedious for it to arrange. The consequences can be
+ # especially nasty on Windows, since if a test leaves a file open, it
+ # cannot be deleted by name (while there's nothing we can do about that
+ # here either, we can display the name of the offending test, which is a
+ # real help).
+ name = os_helper.TESTFN
+ if not os.path.exists(name):
+ return
+
+ nuker: Callable[[str], None]
+ if os.path.isdir(name):
+ import shutil
+ kind, nuker = "directory", shutil.rmtree
+ elif os.path.isfile(name):
+ kind, nuker = "file", os.unlink
+ else:
+ raise RuntimeError(f"os.path says {name!r} exists but is neither "
+ f"directory nor file")
+
+ if verbose:
+ print_warning(f"{test_name} left behind {kind} {name!r}")
+ support.environment_altered = True
+
+ try:
+ import stat
+ # fix possible permissions problems that might prevent cleanup
+ os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
+ nuker(name)
+ except Exception as exc:
+ print_warning(f"{test_name} left behind {kind} {name!r} "
+ f"and it couldn't be removed: {exc}")
+
+
+def abs_module_name(test_name: TestName, test_dir: StrPath | None) -> TestName:
+ if test_name.startswith('test.') or test_dir:
+ return test_name
+ else:
+ # Import it from the test package
+ return 'test.' + test_name
+
+
+# gh-90681: When rerunning tests, we might need to rerun the whole
+# class or module suite if some its life-cycle hooks fail.
+# Test level hooks are not affected.
+_TEST_LIFECYCLE_HOOKS = frozenset((
+ 'setUpClass', 'tearDownClass',
+ 'setUpModule', 'tearDownModule',
+))
+
+def normalize_test_name(test_full_name, *, is_error=False):
+ short_name = test_full_name.split(" ")[0]
+ if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
+ if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
+ # if setUpModule() or tearDownModule() failed, don't filter
+ # tests with the test file name, don't use use filters.
+ return None
+
+ # This means that we have a failure in a life-cycle hook,
+ # we need to rerun the whole module or class suite.
+ # Basically the error looks like this:
+ # ERROR: setUpClass (test.test_reg_ex.RegTest)
+ # or
+ # ERROR: setUpModule (test.test_reg_ex)
+ # So, we need to parse the class / module name.
+ lpar = test_full_name.index('(')
+ rpar = test_full_name.index(')')
+ return test_full_name[lpar + 1: rpar].split('.')[-1]
+ return short_name
+
+
+def adjust_rlimit_nofile():
+ """
+ On macOS the default fd limit (RLIMIT_NOFILE) is sometimes too low (256)
+ for our test suite to succeed. Raise it to something more reasonable. 1024
+ is a common Linux default.
+ """
+ try:
+ import resource
+ except ImportError:
+ return
+
+ fd_limit, max_fds = resource.getrlimit(resource.RLIMIT_NOFILE)
+
+ desired_fds = 1024
+
+ if fd_limit < desired_fds and fd_limit < max_fds:
+ new_fd_limit = min(desired_fds, max_fds)
+ try:
+ resource.setrlimit(resource.RLIMIT_NOFILE,
+ (new_fd_limit, max_fds))
+ print(f"Raised RLIMIT_NOFILE: {fd_limit} -> {new_fd_limit}")
+ except (ValueError, OSError) as err:
+ print_warning(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to "
+ f"{new_fd_limit}: {err}.")
+
+
+def get_host_runner():
+ if (hostrunner := os.environ.get("_PYTHON_HOSTRUNNER")) is None:
+ hostrunner = sysconfig.get_config_var("HOSTRUNNER")
+ return hostrunner
+
+
+def is_cross_compiled():
+ return ('_PYTHON_HOST_PLATFORM' in os.environ)
+
+
+def format_resources(use_resources: tuple[str, ...]):
+ use_resources = set(use_resources)
+ all_resources = set(ALL_RESOURCES)
+
+ # Express resources relative to "all"
+ relative_all = ['all']
+ for name in sorted(all_resources - use_resources):
+ relative_all.append(f'-{name}')
+ for name in sorted(use_resources - all_resources):
+ relative_all.append(f'{name}')
+ all_text = ','.join(relative_all)
+ all_text = f"resources: {all_text}"
+
+ # List of enabled resources
+ text = ','.join(sorted(use_resources))
+ text = f"resources ({len(use_resources)}): {text}"
+
+ # Pick the shortest string (prefer relative to all if lengths are equal)
+ if len(all_text) <= len(text):
+ return all_text
+ else:
+ return text
+
+
+def process_cpu_count():
+ if hasattr(os, 'sched_getaffinity'):
+ return len(os.sched_getaffinity(0))
+ else:
+ return os.cpu_count()
+
+
+def display_header(use_resources: tuple[str, ...],
+ python_cmd: tuple[str, ...] | None):
+ # Print basic platform information
+ print("==", platform.python_implementation(), *sys.version.split())
+ print("==", platform.platform(aliased=True),
+ "%s-endian" % sys.byteorder)
+ print("== Python build:", ' '.join(get_build_info()))
+ print("== cwd:", os.getcwd())
+
+ cpu_count = os.cpu_count()
+ if cpu_count:
+ affinity = process_cpu_count()
+ if affinity and affinity != cpu_count:
+ cpu_count = f"{affinity} (process) / {cpu_count} (system)"
+ print("== CPU count:", cpu_count)
+ print("== encodings: locale=%s FS=%s"
+ % (locale.getencoding(), sys.getfilesystemencoding()))
+
+ if use_resources:
+ text = format_resources(use_resources)
+ print(f"== {text}")
+ else:
+ print("== resources: all test resources are disabled, "
+ "use -u option to unskip tests")
+
+ cross_compile = is_cross_compiled()
+ if cross_compile:
+ print("== cross compiled: Yes")
+ if python_cmd:
+ cmd = shlex.join(python_cmd)
+ print(f"== host python: {cmd}")
+
+ get_cmd = [*python_cmd, '-m', 'platform']
+ proc = subprocess.run(
+ get_cmd,
+ stdout=subprocess.PIPE,
+ text=True,
+ cwd=os_helper.SAVEDCWD)
+ stdout = proc.stdout.replace('\n', ' ').strip()
+ if stdout:
+ print(f"== host platform: {stdout}")
+ elif proc.returncode:
+ print(f"== host platform: <command failed with exit code {proc.returncode}>")
+ else:
+ hostrunner = get_host_runner()
+ if hostrunner:
+ print(f"== host runner: {hostrunner}")
+
+ # This makes it easier to remember what to set in your local
+ # environment when trying to reproduce a sanitizer failure.
+ asan = support.check_sanitizer(address=True)
+ msan = support.check_sanitizer(memory=True)
+ ubsan = support.check_sanitizer(ub=True)
+ sanitizers = []
+ if asan:
+ sanitizers.append("address")
+ if msan:
+ sanitizers.append("memory")
+ if ubsan:
+ sanitizers.append("undefined behavior")
+ if sanitizers:
+ print(f"== sanitizers: {', '.join(sanitizers)}")
+ for sanitizer, env_var in (
+ (asan, "ASAN_OPTIONS"),
+ (msan, "MSAN_OPTIONS"),
+ (ubsan, "UBSAN_OPTIONS"),
+ ):
+ options= os.environ.get(env_var)
+ if sanitizer and options is not None:
+ print(f"== {env_var}={options!r}")
+
+ print(flush=True)
+
+
+def cleanup_temp_dir(tmp_dir: StrPath):
+ import glob
+
+ path = os.path.join(glob.escape(tmp_dir), TMP_PREFIX + '*')
+ print("Cleanup %s directory" % tmp_dir)
+ for name in glob.glob(path):
+ if os.path.isdir(name):
+ print("Remove directory: %s" % name)
+ os_helper.rmtree(name)
+ else:
+ print("Remove file: %s" % name)
+ os_helper.unlink(name)
+
+WINDOWS_STATUS = {
+ 0xC0000005: "STATUS_ACCESS_VIOLATION",
+ 0xC00000FD: "STATUS_STACK_OVERFLOW",
+ 0xC000013A: "STATUS_CONTROL_C_EXIT",
+}
+
+def get_signal_name(exitcode):
+ if exitcode < 0:
+ signum = -exitcode
+ try:
+ return signal.Signals(signum).name
+ except ValueError:
+ pass
+
+ try:
+ return WINDOWS_STATUS[exitcode]
+ except KeyError:
+ pass
+
+ return None
diff --git a/Lib/test/libregrtest/worker.py b/Lib/test/libregrtest/worker.py
new file mode 100644
index 0000000..a9c8be0
--- /dev/null
+++ b/Lib/test/libregrtest/worker.py
@@ -0,0 +1,116 @@
+import subprocess
+import sys
+import os
+from typing import Any, NoReturn
+
+from test import support
+from test.support import os_helper
+
+from .setup import setup_process, setup_test_dir
+from .runtests import RunTests, JsonFile, JsonFileType
+from .single import run_single_test
+from .utils import (
+ StrPath, StrJSON, FilterTuple,
+ get_temp_dir, get_work_dir, exit_timeout)
+
+
+USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
+
+
+def create_worker_process(runtests: RunTests, output_fd: int,
+ tmp_dir: StrPath | None = None) -> subprocess.Popen:
+ python_cmd = runtests.python_cmd
+ worker_json = runtests.as_json()
+
+ python_opts = support.args_from_interpreter_flags()
+ if python_cmd is not None:
+ executable = python_cmd
+ # Remove -E option, since --python=COMMAND can set PYTHON environment
+ # variables, such as PYTHONPATH, in the worker process.
+ python_opts = [opt for opt in python_opts if opt != "-E"]
+ else:
+ executable = (sys.executable,)
+ cmd = [*executable, *python_opts,
+ '-u', # Unbuffered stdout and stderr
+ '-m', 'test.libregrtest.worker',
+ worker_json]
+
+ env = dict(os.environ)
+ if tmp_dir is not None:
+ env['TMPDIR'] = tmp_dir
+ env['TEMP'] = tmp_dir
+ env['TMP'] = tmp_dir
+
+ # Running the child from the same working directory as regrtest's original
+ # invocation ensures that TEMPDIR for the child is the same when
+ # sysconfig.is_python_build() is true. See issue 15300.
+ #
+ # Emscripten and WASI Python must start in the Python source code directory
+ # to get 'python.js' or 'python.wasm' file. Then worker_process() changes
+ # to a temporary directory created to run tests.
+ work_dir = os_helper.SAVEDCWD
+
+ kwargs: dict[str, Any] = dict(
+ env=env,
+ stdout=output_fd,
+ # bpo-45410: Write stderr into stdout to keep messages order
+ stderr=output_fd,
+ text=True,
+ close_fds=True,
+ cwd=work_dir,
+ )
+ if USE_PROCESS_GROUP:
+ kwargs['start_new_session'] = True
+
+ # Pass json_file to the worker process
+ json_file = runtests.json_file
+ json_file.configure_subprocess(kwargs)
+
+ with json_file.inherit_subprocess():
+ return subprocess.Popen(cmd, **kwargs)
+
+
+def worker_process(worker_json: StrJSON) -> NoReturn:
+ runtests = RunTests.from_json(worker_json)
+ test_name = runtests.tests[0]
+ match_tests: FilterTuple | None = runtests.match_tests
+ json_file: JsonFile = runtests.json_file
+
+ setup_test_dir(runtests.test_dir)
+ setup_process()
+
+ if runtests.rerun:
+ if match_tests:
+ matching = "matching: " + ", ".join(match_tests)
+ print(f"Re-running {test_name} in verbose mode ({matching})", flush=True)
+ else:
+ print(f"Re-running {test_name} in verbose mode", flush=True)
+
+ result = run_single_test(test_name, runtests)
+
+ if json_file.file_type == JsonFileType.STDOUT:
+ print()
+ result.write_json_into(sys.stdout)
+ else:
+ with json_file.open('w', encoding='utf-8') as json_fp:
+ result.write_json_into(json_fp)
+
+ sys.exit(0)
+
+
+def main():
+ if len(sys.argv) != 2:
+ print("usage: python -m test.libregrtest.worker JSON")
+ sys.exit(1)
+ worker_json = sys.argv[1]
+
+ tmp_dir = get_temp_dir()
+ work_dir = get_work_dir(tmp_dir, worker=True)
+
+ with exit_timeout():
+ with os_helper.temp_cwd(work_dir, quiet=True):
+ worker_process(worker_json)
+
+
+if __name__ == "__main__":
+ main()