summaryrefslogtreecommitdiffstats
path: root/Lib/test/libregrtest
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-09-03 21:37:15 (GMT)
committerGitHub <noreply@github.com>2023-09-03 21:37:15 (GMT)
commit31c2945f143c6b80c837fcf09a5cfb85fea9ea4c (patch)
treedf9d2918e92b8167db4bf66fe04c923d487e0977 /Lib/test/libregrtest
parentc2ec174d243da5d2607dbf06c4451d0093ac40ba (diff)
downloadcpython-31c2945f143c6b80c837fcf09a5cfb85fea9ea4c.zip
cpython-31c2945f143c6b80c837fcf09a5cfb85fea9ea4c.tar.gz
cpython-31c2945f143c6b80c837fcf09a5cfb85fea9ea4c.tar.bz2
gh-108834: regrtest reruns failed tests in subprocesses (#108839)
When using --rerun option, regrtest now re-runs failed tests in verbose mode in fresh worker processes to have more deterministic behavior. So it can write its final report even if a test killed a worker progress. Add --fail-rerun option to regrtest: exit with non-zero exit code if a test failed pass passed when re-run in verbose mode (in a fresh process). That's now more useful since tests can pass when re-run in a fresh worker progress, whereas they failed when run after other tests when tests are run sequentially. Rename --verbose2 option (-w) to --rerun. Keep --verbose2 as a deprecated alias. Changes: * Fix and enhance statistics in regrtest summary. Add "(filtered)" when --match and/or --ignore options are used. * Add RunTests class. * Add TestResult.get_rerun_match_tests() method * Rewrite code to serialize/deserialize worker arguments as JSON using a new WorkerJob class. * Fix stats when a test is run with --forever --rerun. * If failed test names cannot be parsed, log a warning and don't filter tests. * test_regrtest.test_rerun_success() now uses a marker file, since the test is re-run in a separated process. * Add tests on normalize_test_name() function. * Add test_success() and test_skip() tests to test_regrtest.
Diffstat (limited to 'Lib/test/libregrtest')
-rw-r--r--Lib/test/libregrtest/cmdline.py11
-rw-r--r--Lib/test/libregrtest/main.py548
-rw-r--r--Lib/test/libregrtest/runtest.py234
-rw-r--r--Lib/test/libregrtest/runtest_mp.py179
-rw-r--r--Lib/test/libregrtest/utils.py2
5 files changed, 585 insertions, 389 deletions
diff --git a/Lib/test/libregrtest/cmdline.py b/Lib/test/libregrtest/cmdline.py
index ebe5792..251fcac 100644
--- a/Lib/test/libregrtest/cmdline.py
+++ b/Lib/test/libregrtest/cmdline.py
@@ -156,7 +156,7 @@ class Namespace(argparse.Namespace):
self.coverdir = 'coverage'
self.runleaks = False
self.huntrleaks = False
- self.verbose2 = False
+ self.rerun = False
self.verbose3 = False
self.print_slow = False
self.random_seed = None
@@ -213,8 +213,10 @@ def _create_parser():
group = parser.add_argument_group('Verbosity')
group.add_argument('-v', '--verbose', action='count',
help='run tests in verbose mode with output to stdout')
- group.add_argument('-w', '--verbose2', action='store_true',
+ group.add_argument('-w', '--rerun', action='store_true',
help='re-run failed tests in verbose mode')
+ group.add_argument('--verbose2', action='store_true', dest='rerun',
+ help='deprecated alias to --rerun')
group.add_argument('-W', '--verbose3', action='store_true',
help='display test output on failure')
group.add_argument('-q', '--quiet', action='store_true',
@@ -309,6 +311,9 @@ def _create_parser():
group.add_argument('--fail-env-changed', action='store_true',
help='if a test file alters the environment, mark '
'the test as failed')
+ group.add_argument('--fail-rerun', action='store_true',
+ help='if a test failed and then passed when re-run, '
+ 'mark the tests as failed')
group.add_argument('--junit-xml', dest='xmlpath', metavar='FILENAME',
help='writes JUnit-style XML results to the specified '
@@ -380,7 +385,7 @@ def _parse_args(args, **kwargs):
ns.python = shlex.split(ns.python)
if ns.failfast and not (ns.verbose or ns.verbose3):
parser.error("-G/--failfast needs either -v or -W")
- if ns.pgo and (ns.verbose or ns.verbose2 or ns.verbose3):
+ if ns.pgo and (ns.verbose or ns.rerun or ns.verbose3):
parser.error("--pgo/-v don't go together!")
if ns.pgo_extended:
ns.pgo = True # pgo_extended implies pgo
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index 6e6423e..77a4090 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -11,11 +11,11 @@ import time
import unittest
from test.libregrtest.cmdline import _parse_args
from test.libregrtest.runtest import (
- findtests, split_test_packages, runtest, get_abs_module,
- PROGRESS_MIN_TIME, State)
+ findtests, split_test_packages, runtest, abs_module_name,
+ PROGRESS_MIN_TIME, State, MatchTestsDict, RunTests)
from test.libregrtest.setup import setup_tests
from test.libregrtest.pgo import setup_pgo_tests
-from test.libregrtest.utils import (removepy, count, format_duration,
+from test.libregrtest.utils import (strip_py_suffix, count, format_duration,
printlist, get_build_info)
from test import support
from test.support import TestStats
@@ -28,14 +28,6 @@ from test.support import threading_helper
# Must be smaller than buildbot "1200 seconds without output" limit.
EXIT_TIMEOUT = 120.0
-# gh-90681: When rerunning tests, we might need to rerun the whole
-# class or module suite if some its life-cycle hooks fail.
-# Test level hooks are not affected.
-_TEST_LIFECYCLE_HOOKS = frozenset((
- 'setUpClass', 'tearDownClass',
- 'setUpModule', 'tearDownModule',
-))
-
EXITCODE_BAD_TEST = 2
EXITCODE_INTERRUPTED = 130
EXITCODE_ENV_CHANGED = 3
@@ -72,19 +64,22 @@ class Regrtest:
# tests
self.tests = []
self.selected = []
+ self.all_runtests: list[RunTests] = []
# test results
- self.good = []
- self.bad = []
- self.skipped = []
- self.resource_denied = []
- self.environment_changed = []
- self.run_no_tests = []
- self.need_rerun = []
- self.rerun = []
- self.first_result = None
+ self.good: list[str] = []
+ self.bad: list[str] = []
+ self.rerun_bad: list[str] = []
+ self.skipped: list[str] = []
+ self.resource_denied: list[str] = []
+ self.environment_changed: list[str] = []
+ self.run_no_tests: list[str] = []
+ self.rerun: list[str] = []
+
+ self.need_rerun: list[TestResult] = []
+ self.first_state: str | None = None
self.interrupted = False
- self.stats_dict: dict[str, TestStats] = {}
+ self.total_stats = TestStats()
# used by --slow
self.test_times = []
@@ -94,7 +89,7 @@ class Regrtest:
# used to display the progress bar "[ 3/100]"
self.start_time = time.perf_counter()
- self.test_count = ''
+ self.test_count_text = ''
self.test_count_width = 1
# used by --single
@@ -107,7 +102,6 @@ class Regrtest:
# misc
self.win_load_tracker = None
self.tmp_dir = None
- self.worker_test_name = None
def get_executed(self):
return (set(self.good) | set(self.bad) | set(self.skipped)
@@ -115,11 +109,9 @@ class Regrtest:
| set(self.run_no_tests))
def accumulate_result(self, result, rerun=False):
+ fail_env_changed = self.ns.fail_env_changed
test_name = result.test_name
- if result.has_meaningful_duration() and not rerun:
- self.test_times.append((result.duration, test_name))
-
match result.state:
case State.PASSED:
self.good.append(test_name)
@@ -128,25 +120,24 @@ class Regrtest:
case State.SKIPPED:
self.skipped.append(test_name)
case State.RESOURCE_DENIED:
- self.skipped.append(test_name)
self.resource_denied.append(test_name)
case State.INTERRUPTED:
self.interrupted = True
case State.DID_NOT_RUN:
self.run_no_tests.append(test_name)
case _:
- if result.is_failed(self.ns.fail_env_changed):
- if not rerun:
- self.bad.append(test_name)
- self.need_rerun.append(result)
+ if result.is_failed(fail_env_changed):
+ self.bad.append(test_name)
+ self.need_rerun.append(result)
else:
- raise ValueError(f"invalid test state: {state!r}")
+ raise ValueError(f"invalid test state: {result.state!r}")
+ if result.has_meaningful_duration() and not rerun:
+ self.test_times.append((result.duration, test_name))
if result.stats is not None:
- self.stats_dict[result.test_name] = result.stats
-
- if rerun and not(result.is_failed(False) or result.state == State.INTERRUPTED):
- self.bad.remove(test_name)
+ self.total_stats.accumulate(result.stats)
+ if rerun:
+ self.rerun.append(test_name)
xml_data = result.xml_data
if xml_data:
@@ -180,13 +171,15 @@ class Regrtest:
print(line, flush=True)
def display_progress(self, test_index, text):
- if self.ns.quiet:
+ quiet = self.ns.quiet
+ pgo = self.ns.pgo
+ if quiet:
return
# "[ 51/405/1] test_tcl passed"
- line = f"{test_index:{self.test_count_width}}{self.test_count}"
+ line = f"{test_index:{self.test_count_width}}{self.test_count_text}"
fails = len(self.bad) + len(self.environment_changed)
- if fails and not self.ns.pgo:
+ if fails and not pgo:
line = f"{line}/{fails}"
self.log(f"[{line}] {text}")
@@ -196,15 +189,7 @@ class Regrtest:
if ns.xmlpath:
support.junit_xml_list = self.testsuite_xml = []
- worker_args = ns.worker_args
- if worker_args is not None:
- from test.libregrtest.runtest_mp import parse_worker_args
- ns, test_name = parse_worker_args(ns.worker_args)
- ns.worker_args = worker_args
- self.worker_test_name = test_name
-
- # Strip .py extensions.
- removepy(ns.args)
+ strip_py_suffix(ns.args)
if ns.huntrleaks:
warmup, repetitions, _ = ns.huntrleaks
@@ -221,9 +206,18 @@ class Regrtest:
self.ns = ns
def find_tests(self, tests):
+ ns = self.ns
+ single = ns.single
+ fromfile = ns.fromfile
+ pgo = ns.pgo
+ exclude = ns.exclude
+ test_dir = ns.testdir
+ starting_test = ns.start
+ randomize = ns.randomize
+
self.tests = tests
- if self.ns.single:
+ if single:
self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
try:
with open(self.next_single_filename, 'r') as fp:
@@ -232,12 +226,12 @@ class Regrtest:
except OSError:
pass
- if self.ns.fromfile:
+ if fromfile:
self.tests = []
# regex to match 'test_builtin' in line:
# '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec'
regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
- with open(os.path.join(os_helper.SAVEDCWD, self.ns.fromfile)) as fp:
+ with open(os.path.join(os_helper.SAVEDCWD, fromfile)) as fp:
for line in fp:
line = line.split('#', 1)[0]
line = line.strip()
@@ -245,22 +239,22 @@ class Regrtest:
if match is not None:
self.tests.append(match.group())
- removepy(self.tests)
+ strip_py_suffix(self.tests)
- if self.ns.pgo:
+ if pgo:
# add default PGO tests if no tests are specified
- setup_pgo_tests(self.ns)
+ setup_pgo_tests(ns)
- exclude = set()
- if self.ns.exclude:
- for arg in self.ns.args:
- exclude.add(arg)
- self.ns.args = []
+ exclude_tests = set()
+ if exclude:
+ for arg in ns.args:
+ exclude_tests.add(arg)
+ ns.args = []
- alltests = findtests(testdir=self.ns.testdir, exclude=exclude)
+ alltests = findtests(testdir=test_dir, exclude=exclude_tests)
- if not self.ns.fromfile:
- self.selected = self.tests or self.ns.args
+ if not fromfile:
+ self.selected = self.tests or ns.args
if self.selected:
self.selected = split_test_packages(self.selected)
else:
@@ -268,7 +262,7 @@ class Regrtest:
else:
self.selected = self.tests
- if self.ns.single:
+ if single:
self.selected = self.selected[:1]
try:
pos = alltests.index(self.selected[0])
@@ -277,17 +271,17 @@ class Regrtest:
pass
# Remove all the selected tests that precede start if it's set.
- if self.ns.start:
+ if starting_test:
try:
- del self.selected[:self.selected.index(self.ns.start)]
+ del self.selected[:self.selected.index(starting_test)]
except ValueError:
- print("Couldn't find starting test (%s), using all tests"
- % self.ns.start, file=sys.stderr)
+ print(f"Cannot find starting test: {starting_test}")
+ sys.exit(1)
- if self.ns.randomize:
- if self.ns.random_seed is None:
- self.ns.random_seed = random.randrange(10000000)
- random.seed(self.ns.random_seed)
+ if randomize:
+ if ns.random_seed is None:
+ ns.random_seed = random.randrange(10000000)
+ random.seed(ns.random_seed)
random.shuffle(self.selected)
def list_tests(self):
@@ -305,25 +299,63 @@ class Regrtest:
print(test.id())
def list_cases(self):
+ ns = self.ns
+ test_dir = ns.testdir
support.verbose = False
- support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests)
+ support.set_match_tests(ns.match_tests, ns.ignore_tests)
+ skipped = []
for test_name in self.selected:
- abstest = get_abs_module(self.ns, test_name)
+ module_name = abs_module_name(test_name, test_dir)
try:
- suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
+ suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
self._list_cases(suite)
except unittest.SkipTest:
- self.skipped.append(test_name)
+ skipped.append(test_name)
- if self.skipped:
- print(file=sys.stderr)
- print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr)
- printlist(self.skipped, file=sys.stderr)
+ if skipped:
+ sys.stdout.flush()
+ stderr = sys.stderr
+ print(file=stderr)
+ print(count(len(skipped), "test"), "skipped:", file=stderr)
+ printlist(skipped, file=stderr)
- def rerun_failed_tests(self):
- self.log()
+ def get_rerun_match(self, rerun_list) -> MatchTestsDict:
+ rerun_match_tests = {}
+ for result in rerun_list:
+ match_tests = result.get_rerun_match_tests()
+ # ignore empty match list
+ if match_tests:
+ rerun_match_tests[result.test_name] = match_tests
+ return rerun_match_tests
+
+ def _rerun_failed_tests(self, need_rerun):
+ # Configure the runner to re-run tests
+ ns = self.ns
+ ns.verbose = True
+ ns.failfast = False
+ ns.verbose3 = False
+ ns.forever = False
+ if ns.use_mp is None:
+ ns.use_mp = 1
+
+ # Get tests to re-run
+ tests = [result.test_name for result in need_rerun]
+ match_tests = self.get_rerun_match(need_rerun)
+ self.set_tests(tests)
+
+ # Clear previously failed tests
+ self.rerun_bad.extend(self.bad)
+ self.bad.clear()
+ self.need_rerun.clear()
+
+ # Re-run failed tests
+ self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses")
+ runtests = RunTests(tests, match_tests=match_tests, rerun=True)
+ self.all_runtests.append(runtests)
+ self._run_tests_mp(runtests)
+ def rerun_failed_tests(self, need_rerun):
if self.ns.python:
# Temp patch for https://github.com/python/cpython/issues/94052
self.log(
@@ -332,45 +364,10 @@ class Regrtest:
)
return
- self.ns.verbose = True
- self.ns.failfast = False
- self.ns.verbose3 = False
-
- self.first_result = self.get_tests_result()
-
- self.log("Re-running failed tests in verbose mode")
- rerun_list = list(self.need_rerun)
- self.need_rerun.clear()
- for result in rerun_list:
- test_name = result.test_name
- self.rerun.append(test_name)
+ self.first_state = self.get_tests_state()
- errors = result.errors or []
- failures = result.failures or []
- error_names = [
- self.normalize_test_name(test_full_name, is_error=True)
- for (test_full_name, *_) in errors]
- failure_names = [
- self.normalize_test_name(test_full_name)
- for (test_full_name, *_) in failures]
- self.ns.verbose = True
- orig_match_tests = self.ns.match_tests
- if errors or failures:
- if self.ns.match_tests is None:
- self.ns.match_tests = []
- self.ns.match_tests.extend(error_names)
- self.ns.match_tests.extend(failure_names)
- matching = "matching: " + ", ".join(self.ns.match_tests)
- self.log(f"Re-running {test_name} in verbose mode ({matching})")
- else:
- self.log(f"Re-running {test_name} in verbose mode")
- result = runtest(self.ns, test_name)
- self.ns.match_tests = orig_match_tests
-
- self.accumulate_result(result, rerun=True)
-
- if result.state == State.INTERRUPTED:
- break
+ print()
+ self._rerun_failed_tests(need_rerun)
if self.bad:
print(count(len(self.bad), 'test'), "failed again:")
@@ -378,28 +375,17 @@ class Regrtest:
self.display_result()
- def normalize_test_name(self, test_full_name, *, is_error=False):
- short_name = test_full_name.split(" ")[0]
- if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
- # This means that we have a failure in a life-cycle hook,
- # we need to rerun the whole module or class suite.
- # Basically the error looks like this:
- # ERROR: setUpClass (test.test_reg_ex.RegTest)
- # or
- # ERROR: setUpModule (test.test_reg_ex)
- # So, we need to parse the class / module name.
- lpar = test_full_name.index('(')
- rpar = test_full_name.index(')')
- return test_full_name[lpar + 1: rpar].split('.')[-1]
- return short_name
-
def display_result(self):
+ pgo = self.ns.pgo
+ quiet = self.ns.quiet
+ print_slow = self.ns.print_slow
+
# If running the test suite for PGO then no one cares about results.
- if self.ns.pgo:
+ if pgo:
return
print()
- print("== Tests result: %s ==" % self.get_tests_result())
+ print("== Tests result: %s ==" % self.get_tests_state())
if self.interrupted:
print("Test suite interrupted by signal SIGINT.")
@@ -410,7 +396,7 @@ class Regrtest:
print(count(len(omitted), "test"), "omitted:")
printlist(omitted)
- if self.good and not self.ns.quiet:
+ if self.good and not quiet:
print()
if (not self.bad
and not self.skipped
@@ -419,7 +405,7 @@ class Regrtest:
print("All", end=' ')
print(count(len(self.good), "test"), "OK.")
- if self.ns.print_slow:
+ if print_slow:
self.test_times.sort(reverse=True)
print()
print("10 slowest tests:")
@@ -437,11 +423,16 @@ class Regrtest:
count(len(self.environment_changed), "test")))
printlist(self.environment_changed)
- if self.skipped and not self.ns.quiet:
+ if self.skipped and not quiet:
print()
print(count(len(self.skipped), "test"), "skipped:")
printlist(self.skipped)
+ if self.resource_denied and not quiet:
+ print()
+ print(count(len(self.resource_denied), "test"), "skipped (resource denied):")
+ printlist(self.resource_denied)
+
if self.rerun:
print()
print("%s:" % count(len(self.rerun), "re-run test"))
@@ -452,40 +443,58 @@ class Regrtest:
print(count(len(self.run_no_tests), "test"), "run no tests:")
printlist(self.run_no_tests)
- def run_tests_sequential(self):
- if self.ns.trace:
+ def run_test(self, test_index, test_name, previous_test, save_modules):
+ text = test_name
+ if previous_test:
+ text = '%s -- %s' % (text, previous_test)
+ self.display_progress(test_index, text)
+
+ if self.tracer:
+ # If we're tracing code coverage, then we don't exit with status
+ # if on a false return value from main.
+ cmd = ('result = runtest(self.ns, test_name); '
+ 'self.accumulate_result(result)')
+ ns = dict(locals())
+ self.tracer.runctx(cmd, globals=globals(), locals=ns)
+ result = ns['result']
+ else:
+ result = runtest(self.ns, test_name)
+ self.accumulate_result(result)
+
+ # Unload the newly imported modules (best effort finalization)
+ for module in sys.modules.keys():
+ if module not in save_modules and module.startswith("test."):
+ support.unload(module)
+
+ return result
+
+ def run_tests_sequentially(self, runtests):
+ ns = self.ns
+ coverage = ns.trace
+ fail_fast = ns.failfast
+ fail_env_changed = ns.fail_env_changed
+ timeout = ns.timeout
+
+ if coverage:
import trace
self.tracer = trace.Trace(trace=False, count=True)
save_modules = sys.modules.keys()
msg = "Run tests sequentially"
- if self.ns.timeout:
- msg += " (timeout: %s)" % format_duration(self.ns.timeout)
+ if timeout:
+ msg += " (timeout: %s)" % format_duration(timeout)
self.log(msg)
previous_test = None
- for test_index, test_name in enumerate(self.tests, 1):
+ tests_iter = runtests.iter_tests()
+ for test_index, test_name in enumerate(tests_iter, 1):
start_time = time.perf_counter()
- text = test_name
- if previous_test:
- text = '%s -- %s' % (text, previous_test)
- self.display_progress(test_index, text)
-
- if self.tracer:
- # If we're tracing code coverage, then we don't exit with status
- # if on a false return value from main.
- cmd = ('result = runtest(self.ns, test_name); '
- 'self.accumulate_result(result)')
- ns = dict(locals())
- self.tracer.runctx(cmd, globals=globals(), locals=ns)
- result = ns['result']
- else:
- result = runtest(self.ns, test_name)
- self.accumulate_result(result)
+ result = self.run_test(test_index, test_name,
+ previous_test, save_modules)
- if result.state == State.INTERRUPTED:
+ if result.must_stop(fail_fast, fail_env_changed):
break
previous_test = str(result)
@@ -496,26 +505,9 @@ class Regrtest:
# be quiet: say nothing if the test passed shortly
previous_test = None
- # Unload the newly imported modules (best effort finalization)
- for module in sys.modules.keys():
- if module not in save_modules and module.startswith("test."):
- support.unload(module)
-
- if self.ns.failfast and result.is_failed(self.ns.fail_env_changed):
- break
-
if previous_test:
print(previous_test)
- def _test_forever(self, tests):
- while True:
- for test_name in tests:
- yield test_name
- if self.bad:
- return
- if self.ns.fail_env_changed and self.environment_changed:
- return
-
def display_header(self):
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
@@ -560,11 +552,13 @@ class Regrtest:
return not any((self.good, self.bad, self.skipped, self.interrupted,
self.environment_changed))
- def get_tests_result(self):
+ def get_tests_state(self):
+ fail_env_changed = self.ns.fail_env_changed
+
result = []
if self.bad:
result.append("FAILURE")
- elif self.ns.fail_env_changed and self.environment_changed:
+ elif fail_env_changed and self.environment_changed:
result.append("ENV CHANGED")
elif self.no_tests_run():
result.append("NO TESTS RAN")
@@ -576,10 +570,40 @@ class Regrtest:
result.append("SUCCESS")
result = ', '.join(result)
- if self.first_result:
- result = '%s then %s' % (self.first_result, result)
+ if self.first_state:
+ result = '%s then %s' % (self.first_state, result)
return result
+ def _run_tests_mp(self, runtests: RunTests) -> None:
+ from test.libregrtest.runtest_mp import run_tests_multiprocess
+ # If we're on windows and this is the parent runner (not a worker),
+ # track the load average.
+ if sys.platform == 'win32':
+ from test.libregrtest.win_utils import WindowsLoadTracker
+
+ try:
+ self.win_load_tracker = WindowsLoadTracker()
+ except PermissionError as error:
+ # Standard accounts may not have access to the performance
+ # counters.
+ print(f'Failed to create WindowsLoadTracker: {error}')
+
+ try:
+ run_tests_multiprocess(self, runtests)
+ finally:
+ if self.win_load_tracker is not None:
+ self.win_load_tracker.close()
+ self.win_load_tracker = None
+
+ def set_tests(self, tests):
+ self.tests = tests
+ if self.ns.forever:
+ self.test_count_text = ''
+ self.test_count_width = 3
+ else:
+ self.test_count_text = '/{}'.format(len(self.tests))
+ self.test_count_width = len(self.test_count_text) - 1
+
def run_tests(self):
# For a partial run, we do not need to clutter the output.
if (self.ns.header
@@ -597,37 +621,14 @@ class Regrtest:
if self.ns.randomize:
print("Using random seed", self.ns.random_seed)
- if self.ns.forever:
- self.tests = self._test_forever(list(self.selected))
- self.test_count = ''
- self.test_count_width = 3
- else:
- self.tests = iter(self.selected)
- self.test_count = '/{}'.format(len(self.selected))
- self.test_count_width = len(self.test_count) - 1
-
+ tests = self.selected
+ self.set_tests(tests)
+ runtests = RunTests(tests, forever=self.ns.forever)
+ self.all_runtests.append(runtests)
if self.ns.use_mp:
- from test.libregrtest.runtest_mp import run_tests_multiprocess
- # If we're on windows and this is the parent runner (not a worker),
- # track the load average.
- if sys.platform == 'win32' and self.worker_test_name is None:
- from test.libregrtest.win_utils import WindowsLoadTracker
-
- try:
- self.win_load_tracker = WindowsLoadTracker()
- except PermissionError as error:
- # Standard accounts may not have access to the performance
- # counters.
- print(f'Failed to create WindowsLoadTracker: {error}')
-
- try:
- run_tests_multiprocess(self)
- finally:
- if self.win_load_tracker is not None:
- self.win_load_tracker.close()
- self.win_load_tracker = None
+ self._run_tests_mp(runtests)
else:
- self.run_tests_sequential()
+ self.run_tests_sequentially(runtests)
def finalize(self):
if self.next_single_filename:
@@ -642,23 +643,29 @@ class Regrtest:
r.write_results(show_missing=True, summary=True,
coverdir=self.ns.coverdir)
- print()
- self.display_summary()
-
if self.ns.runleaks:
os.system("leaks %d" % os.getpid())
+ self.save_xml_result()
+
def display_summary(self):
duration = time.perf_counter() - self.start_time
+ first_runtests = self.all_runtests[0]
+ # the second runtests (re-run failed tests) disables forever,
+ # use the first runtests
+ forever = first_runtests.forever
+ filtered = bool(self.ns.match_tests) or bool(self.ns.ignore_tests)
# Total duration
+ print()
print("Total duration: %s" % format_duration(duration))
# Total tests
- total = TestStats()
- for stats in self.stats_dict.values():
- total.accumulate(stats)
- stats = [f'run={total.tests_run:,}']
+ total = self.total_stats
+ text = f'run={total.tests_run:,}'
+ if filtered:
+ text = f"{text} (filtered)"
+ stats = [text]
if total.failures:
stats.append(f'failures={total.failures:,}')
if total.skipped:
@@ -666,23 +673,31 @@ class Regrtest:
print(f"Total tests: {' '.join(stats)}")
# Total test files
- report = [f'success={len(self.good)}']
- if self.bad:
- report.append(f'failed={len(self.bad)}')
- if self.environment_changed:
- report.append(f'env_changed={len(self.environment_changed)}')
- if self.skipped:
- report.append(f'skipped={len(self.skipped)}')
- if self.resource_denied:
- report.append(f'resource_denied={len(self.resource_denied)}')
- if self.rerun:
- report.append(f'rerun={len(self.rerun)}')
- if self.run_no_tests:
- report.append(f'run_no_tests={len(self.run_no_tests)}')
+ all_tests = [self.good, self.bad, self.rerun,
+ self.skipped,
+ self.environment_changed, self.run_no_tests]
+ run = sum(map(len, all_tests))
+ text = f'run={run}'
+ if not forever:
+ ntest = len(first_runtests.tests)
+ text = f"{text}/{ntest}"
+ if filtered:
+ text = f"{text} (filtered)"
+ report = [text]
+ for name, tests in (
+ ('failed', self.bad),
+ ('env_changed', self.environment_changed),
+ ('skipped', self.skipped),
+ ('resource_denied', self.resource_denied),
+ ('rerun', self.rerun),
+ ('run_no_tests', self.run_no_tests),
+ ):
+ if tests:
+ report.append(f'{name}={len(tests)}')
print(f"Total test files: {' '.join(report)}")
# Result
- result = self.get_tests_result()
+ result = self.get_tests_state()
print(f"Result: {result}")
def save_xml_result(self):
@@ -742,6 +757,9 @@ class Regrtest:
self.tmp_dir = os.path.abspath(self.tmp_dir)
+ def is_worker(self):
+ return (self.ns.worker_args is not None)
+
def create_temp_dir(self):
os.makedirs(self.tmp_dir, exist_ok=True)
@@ -754,7 +772,8 @@ class Regrtest:
nounce = random.randint(0, 1_000_000)
else:
nounce = os.getpid()
- if self.worker_test_name is not None:
+
+ if self.is_worker():
test_cwd = 'test_python_worker_{}'.format(nounce)
else:
test_cwd = 'test_python_{}'.format(nounce)
@@ -817,48 +836,53 @@ class Regrtest:
return None
+ def get_exitcode(self):
+ exitcode = 0
+ if self.bad:
+ exitcode = EXITCODE_BAD_TEST
+ elif self.interrupted:
+ exitcode = EXITCODE_INTERRUPTED
+ elif self.ns.fail_env_changed and self.environment_changed:
+ exitcode = EXITCODE_ENV_CHANGED
+ elif self.no_tests_run():
+ exitcode = EXITCODE_NO_TESTS_RAN
+ elif self.rerun and self.ns.fail_rerun:
+ exitcode = EXITCODE_BAD_TEST
+ return exitcode
+
+ def action_run_tests(self):
+ self.run_tests()
+ self.display_result()
+
+ need_rerun = self.need_rerun
+ if self.ns.rerun and need_rerun:
+ self.rerun_failed_tests(need_rerun)
+
+ self.display_summary()
+ self.finalize()
+
def _main(self, tests, kwargs):
- if self.worker_test_name is not None:
+ if self.is_worker():
from test.libregrtest.runtest_mp import run_tests_worker
- run_tests_worker(self.ns, self.worker_test_name)
+ run_tests_worker(self.ns.worker_args)
+ return
if self.ns.wait:
input("Press any key to continue...")
- support.PGO = self.ns.pgo
- support.PGO_EXTENDED = self.ns.pgo_extended
-
setup_tests(self.ns)
-
self.find_tests(tests)
+ exitcode = 0
if self.ns.list_tests:
self.list_tests()
- sys.exit(0)
-
- if self.ns.list_cases:
+ elif self.ns.list_cases:
self.list_cases()
- sys.exit(0)
-
- self.run_tests()
- self.display_result()
-
- if self.ns.verbose2 and self.bad:
- self.rerun_failed_tests()
-
- self.finalize()
-
- self.save_xml_result()
+ else:
+ self.action_run_tests()
+ exitcode = self.get_exitcode()
- if self.bad:
- sys.exit(EXITCODE_BAD_TEST)
- if self.interrupted:
- sys.exit(EXITCODE_INTERRUPTED)
- if self.ns.fail_env_changed and self.environment_changed:
- sys.exit(EXITCODE_ENV_CHANGED)
- if self.no_tests_run():
- sys.exit(EXITCODE_NO_TESTS_RAN)
- sys.exit(0)
+ sys.exit(exitcode)
def main(tests=None, **kwargs):
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
index 6fa6069..6e3fab1 100644
--- a/Lib/test/libregrtest/runtest.py
+++ b/Lib/test/libregrtest/runtest.py
@@ -1,7 +1,6 @@
import dataclasses
import doctest
import faulthandler
-import functools
import gc
import importlib
import io
@@ -20,6 +19,10 @@ from test.libregrtest.save_env import saved_test_environment
from test.libregrtest.utils import clear_caches, format_duration, print_warning
+MatchTests = list[str]
+MatchTestsDict = dict[str, MatchTests]
+
+
# Avoid enum.Enum to reduce the number of imports when tests are run
class State:
PASSED = "PASSED"
@@ -56,6 +59,41 @@ class State:
State.MULTIPROCESSING_ERROR,
State.DID_NOT_RUN}
+ @staticmethod
+ def must_stop(state):
+ return state in {
+ State.INTERRUPTED,
+ State.MULTIPROCESSING_ERROR}
+
+
+# gh-90681: When rerunning tests, we might need to rerun the whole
+# class or module suite if some its life-cycle hooks fail.
+# Test level hooks are not affected.
+_TEST_LIFECYCLE_HOOKS = frozenset((
+ 'setUpClass', 'tearDownClass',
+ 'setUpModule', 'tearDownModule',
+))
+
+def normalize_test_name(test_full_name, *, is_error=False):
+ short_name = test_full_name.split(" ")[0]
+ if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
+ if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
+ # if setUpModule() or tearDownModule() failed, don't filter
+ # tests with the test file name, don't use use filters.
+ return None
+
+ # This means that we have a failure in a life-cycle hook,
+ # we need to rerun the whole module or class suite.
+ # Basically the error looks like this:
+ # ERROR: setUpClass (test.test_reg_ex.RegTest)
+ # or
+ # ERROR: setUpModule (test.test_reg_ex)
+ # So, we need to parse the class / module name.
+ lpar = test_full_name.index('(')
+ rpar = test_full_name.index(')')
+ return test_full_name[lpar + 1: rpar].split('.')[-1]
+ return short_name
+
@dataclasses.dataclass(slots=True)
class TestResult:
@@ -129,6 +167,58 @@ class TestResult:
if self.state is None or self.state == State.PASSED:
self.state = State.ENV_CHANGED
+ def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
+ if State.must_stop(self.state):
+ return True
+ if fail_fast and self.is_failed(fail_env_changed):
+ return True
+ return False
+
+ def get_rerun_match_tests(self):
+ match_tests = []
+
+ errors = self.errors or []
+ failures = self.failures or []
+ for error_list, is_error in (
+ (errors, True),
+ (failures, False),
+ ):
+ for full_name, *_ in error_list:
+ match_name = normalize_test_name(full_name, is_error=is_error)
+ if match_name is None:
+ # 'setUpModule (test.test_sys)': don't filter tests
+ return None
+ if not match_name:
+ error_type = "ERROR" if is_error else "FAIL"
+ print_warning(f"rerun failed to parse {error_type} test name: "
+ f"{full_name!r}: don't filter tests")
+ return None
+ match_tests.append(match_name)
+
+ return match_tests
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class RunTests:
+ tests: list[str]
+ match_tests: MatchTestsDict | None = None
+ rerun: bool = False
+ forever: bool = False
+
+ def get_match_tests(self, test_name) -> MatchTests | None:
+ if self.match_tests is not None:
+ return self.match_tests.get(test_name, None)
+ else:
+ return None
+
+ def iter_tests(self):
+ tests = tuple(self.tests)
+ if self.forever:
+ while True:
+ yield from tests
+ else:
+ yield from tests
+
# Minimum duration of a test to display its duration or to mention that
# the test is running in background
@@ -147,9 +237,6 @@ SPLITTESTDIRS = {
"test_multiprocessing_spawn",
}
-# Storage of uncollectable objects
-FOUND_GARBAGE = []
-
def findtestdir(path=None):
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
@@ -189,31 +276,41 @@ def split_test_packages(tests, *, testdir=None, exclude=(),
return splitted
-def get_abs_module(ns: Namespace, test_name: str) -> str:
- if test_name.startswith('test.') or ns.testdir:
+def abs_module_name(test_name: str, test_dir: str | None) -> str:
+ if test_name.startswith('test.') or test_dir:
return test_name
else:
# Import it from the test package
return 'test.' + test_name
-def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) -> None:
+def setup_support(ns: Namespace):
+ support.PGO = ns.pgo
+ support.PGO_EXTENDED = ns.pgo_extended
+ support.set_match_tests(ns.match_tests, ns.ignore_tests)
+ support.failfast = ns.failfast
+ support.verbose = ns.verbose
+ if ns.xmlpath:
+ support.junit_xml_list = []
+ else:
+ support.junit_xml_list = None
+
+
+def _runtest(result: TestResult, ns: Namespace) -> None:
# Capture stdout and stderr, set faulthandler timeout,
# and create JUnit XML report.
-
+ verbose = ns.verbose
output_on_failure = ns.verbose3
+ timeout = ns.timeout
use_timeout = (
- ns.timeout is not None and threading_helper.can_start_thread
+ timeout is not None and threading_helper.can_start_thread
)
if use_timeout:
- faulthandler.dump_traceback_later(ns.timeout, exit=True)
+ faulthandler.dump_traceback_later(timeout, exit=True)
try:
- support.set_match_tests(ns.match_tests, ns.ignore_tests)
- support.junit_xml_list = xml_list = [] if ns.xmlpath else None
- if ns.failfast:
- support.failfast = True
+ setup_support(ns)
if output_on_failure:
support.verbose = True
@@ -247,11 +344,10 @@ def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) ->
sys.stderr.flush()
else:
# Tell tests to be moderately quiet
- support.verbose = ns.verbose
-
- _runtest_env_changed_exc(result, ns,
- display_failure=not ns.verbose)
+ support.verbose = verbose
+ _runtest_env_changed_exc(result, ns, display_failure=not verbose)
+ xml_list = support.junit_xml_list
if xml_list:
import xml.etree.ElementTree as ET
result.xml_data = [ET.tostring(x).decode('us-ascii')
@@ -276,7 +372,7 @@ def runtest(ns: Namespace, test_name: str) -> TestResult:
start_time = time.perf_counter()
result = TestResult(test_name)
try:
- _runtest_capture_output_timeout_junit(result, ns)
+ _runtest(result, ns)
except:
if not ns.pgo:
msg = traceback.format_exc()
@@ -287,9 +383,9 @@ def runtest(ns: Namespace, test_name: str) -> TestResult:
return result
-def _test_module(the_module):
+def run_unittest(test_mod):
loader = unittest.TestLoader()
- tests = loader.loadTestsFromModule(the_module)
+ tests = loader.loadTestsFromModule(test_mod)
for error in loader.errors:
print(error, file=sys.stderr)
if loader.errors:
@@ -304,7 +400,6 @@ def save_env(ns: Namespace, test_name: str):
def regrtest_runner(result, test_func, ns) -> None:
# Run test_func(), collect statistics, and detect reference and memory
# leaks.
-
if ns.huntrleaks:
from test.libregrtest.refleak import dash_R
refleak, test_result = dash_R(ns, result.test_name, test_func)
@@ -332,24 +427,27 @@ def regrtest_runner(result, test_func, ns) -> None:
result.stats = stats
+# Storage of uncollectable objects
+FOUND_GARBAGE = []
+
+
def _load_run_test(result: TestResult, ns: Namespace) -> None:
# Load the test function, run the test function.
+ module_name = abs_module_name(result.test_name, ns.testdir)
- abstest = get_abs_module(ns, result.test_name)
-
- # remove the module from sys.module to reload it if it was already imported
- try:
- del sys.modules[abstest]
- except KeyError:
- pass
+ # Remove the module from sys.module to reload it if it was already imported
+ sys.modules.pop(module_name, None)
- the_module = importlib.import_module(abstest)
+ test_mod = importlib.import_module(module_name)
# If the test has a test_main, that will run the appropriate
- # tests. If not, use normal unittest test loading.
- test_func = getattr(the_module, "test_main", None)
- if test_func is None:
- test_func = functools.partial(_test_module, the_module)
+ # tests. If not, use normal unittest test runner.
+ test_main = getattr(test_mod, "test_main", None)
+ if test_main is not None:
+ test_func = test_main
+ else:
+ def test_func():
+ return run_unittest(test_mod)
try:
with save_env(ns, result.test_name):
@@ -361,12 +459,12 @@ def _load_run_test(result: TestResult, ns: Namespace) -> None:
# failures.
support.gc_collect()
- cleanup_test_droppings(result.test_name, ns.verbose)
+ remove_testfn(result.test_name, ns.verbose)
if gc.garbage:
support.environment_altered = True
print_warning(f"{result.test_name} created {len(gc.garbage)} "
- f"uncollectable object(s).")
+ f"uncollectable object(s)")
# move the uncollectable objects somewhere,
# so we don't see them again
@@ -444,35 +542,37 @@ def _runtest_env_changed_exc(result: TestResult, ns: Namespace,
result.state = State.PASSED
-def cleanup_test_droppings(test_name: str, verbose: int) -> None:
- # Try to clean up junk commonly left behind. While tests shouldn't leave
- # any files or directories behind, when a test fails that can be tedious
- # for it to arrange. The consequences can be especially nasty on Windows,
- # since if a test leaves a file open, it cannot be deleted by name (while
- # there's nothing we can do about that here either, we can display the
- # name of the offending test, which is a real help).
- for name in (os_helper.TESTFN,):
- if not os.path.exists(name):
- continue
+def remove_testfn(test_name: str, verbose: int) -> None:
+ # Try to clean up os_helper.TESTFN if left behind.
+ #
+ # While tests shouldn't leave any files or directories behind, when a test
+ # fails that can be tedious for it to arrange. The consequences can be
+ # especially nasty on Windows, since if a test leaves a file open, it
+ # cannot be deleted by name (while there's nothing we can do about that
+ # here either, we can display the name of the offending test, which is a
+ # real help).
+ name = os_helper.TESTFN
+ if not os.path.exists(name):
+ return
- if os.path.isdir(name):
- import shutil
- kind, nuker = "directory", shutil.rmtree
- elif os.path.isfile(name):
- kind, nuker = "file", os.unlink
- else:
- raise RuntimeError(f"os.path says {name!r} exists but is neither "
- f"directory nor file")
-
- if verbose:
- print_warning(f"{test_name} left behind {kind} {name!r}")
- support.environment_altered = True
-
- try:
- import stat
- # fix possible permissions problems that might prevent cleanup
- os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
- nuker(name)
- except Exception as exc:
- print_warning(f"{test_name} left behind {kind} {name!r} "
- f"and it couldn't be removed: {exc}")
+ if os.path.isdir(name):
+ import shutil
+ kind, nuker = "directory", shutil.rmtree
+ elif os.path.isfile(name):
+ kind, nuker = "file", os.unlink
+ else:
+ raise RuntimeError(f"os.path says {name!r} exists but is neither "
+ f"directory nor file")
+
+ if verbose:
+ print_warning(f"{test_name} left behind {kind} {name!r}")
+ support.environment_altered = True
+
+ try:
+ import stat
+ # fix possible permissions problems that might prevent cleanup
+ os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
+ nuker(name)
+ except Exception as exc:
+ print_warning(f"{test_name} left behind {kind} {name!r} "
+ f"and it couldn't be removed: {exc}")
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
index fb1f80b..6008955 100644
--- a/Lib/test/libregrtest/runtest_mp.py
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -19,8 +19,8 @@ from test.support import TestStats
from test.libregrtest.cmdline import Namespace
from test.libregrtest.main import Regrtest
from test.libregrtest.runtest import (
- runtest, TestResult, State,
- PROGRESS_MIN_TIME)
+ runtest, TestResult, State, PROGRESS_MIN_TIME,
+ MatchTests, RunTests)
from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import format_duration, print_warning
@@ -44,26 +44,54 @@ JOIN_TIMEOUT = 30.0 # seconds
USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
-def must_stop(result: TestResult, ns: Namespace) -> bool:
- if result.state == State.INTERRUPTED:
- return True
- if ns.failfast and result.is_failed(ns.fail_env_changed):
- return True
- return False
+@dataclasses.dataclass(slots=True)
+class WorkerJob:
+ test_name: str
+ namespace: Namespace
+ rerun: bool = False
+ match_tests: MatchTests | None = None
-def parse_worker_args(worker_args) -> tuple[Namespace, str]:
- ns_dict, test_name = json.loads(worker_args)
- ns = Namespace(**ns_dict)
- return (ns, test_name)
+class _EncodeWorkerJob(json.JSONEncoder):
+ def default(self, o: Any) -> dict[str, Any]:
+ match o:
+ case WorkerJob():
+ result = dataclasses.asdict(o)
+ result["__worker_job__"] = True
+ return result
+ case Namespace():
+ result = vars(o)
+ result["__namespace__"] = True
+ return result
+ case _:
+ return super().default(o)
+
+
+def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]:
+ if "__worker_job__" in d:
+ d.pop('__worker_job__')
+ return WorkerJob(**d)
+ if "__namespace__" in d:
+ d.pop('__namespace__')
+ return Namespace(**d)
+ else:
+ return d
+
+
+def _parse_worker_args(worker_json: str) -> tuple[Namespace, str]:
+ return json.loads(worker_json,
+ object_hook=_decode_worker_job)
-def run_test_in_subprocess(testname: str, ns: Namespace, tmp_dir: str, stdout_fh: TextIO) -> subprocess.Popen:
- ns_dict = vars(ns)
- worker_args = (ns_dict, testname)
- worker_args = json.dumps(worker_args)
- if ns.python is not None:
- executable = ns.python
+def run_test_in_subprocess(worker_job: WorkerJob,
+ output_file: TextIO,
+ tmp_dir: str | None = None) -> subprocess.Popen:
+ ns = worker_job.namespace
+ python = ns.python
+ worker_args = json.dumps(worker_job, cls=_EncodeWorkerJob)
+
+ if python is not None:
+ executable = python
else:
executable = [sys.executable]
cmd = [*executable, *support.args_from_interpreter_flags(),
@@ -82,9 +110,9 @@ def run_test_in_subprocess(testname: str, ns: Namespace, tmp_dir: str, stdout_fh
# sysconfig.is_python_build() is true. See issue 15300.
kw = dict(
env=env,
- stdout=stdout_fh,
+ stdout=output_file,
# bpo-45410: Write stderr into stdout to keep messages order
- stderr=stdout_fh,
+ stderr=output_file,
text=True,
close_fds=(os.name != 'nt'),
cwd=os_helper.SAVEDCWD,
@@ -94,11 +122,27 @@ def run_test_in_subprocess(testname: str, ns: Namespace, tmp_dir: str, stdout_fh
return subprocess.Popen(cmd, **kw)
-def run_tests_worker(ns: Namespace, test_name: str) -> NoReturn:
+def run_tests_worker(worker_json: str) -> NoReturn:
+ worker_job = _parse_worker_args(worker_json)
+ ns = worker_job.namespace
+ test_name = worker_job.test_name
+ rerun = worker_job.rerun
+ match_tests = worker_job.match_tests
+
setup_tests(ns)
- result = runtest(ns, test_name)
+ if rerun:
+ if match_tests:
+ matching = "matching: " + ", ".join(match_tests)
+ print(f"Re-running {test_name} in verbose mode ({matching})", flush=True)
+ else:
+ print(f"Re-running {test_name} in verbose mode", flush=True)
+ ns.verbose = True
+ if match_tests is not None:
+ ns.match_tests = match_tests
+
+ result = runtest(ns, test_name)
print() # Force a newline (just in case)
# Serialize TestResult as dict in JSON
@@ -148,11 +192,13 @@ class TestWorkerProcess(threading.Thread):
def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
super().__init__()
self.worker_id = worker_id
+ self.runtests = runner.runtests
self.pending = runner.pending
self.output = runner.output
self.ns = runner.ns
self.timeout = runner.worker_timeout
self.regrtest = runner.regrtest
+ self.rerun = runner.rerun
self.current_test_name = None
self.start_time = None
self._popen = None
@@ -216,10 +262,11 @@ class TestWorkerProcess(threading.Thread):
) -> MultiprocessResult:
return MultiprocessResult(test_result, stdout, err_msg)
- def _run_process(self, test_name: str, tmp_dir: str, stdout_fh: TextIO) -> int:
- self.current_test_name = test_name
+ def _run_process(self, worker_job, output_file: TextIO,
+ tmp_dir: str | None = None) -> int:
+ self.current_test_name = worker_job.test_name
try:
- popen = run_test_in_subprocess(test_name, self.ns, tmp_dir, stdout_fh)
+ popen = run_test_in_subprocess(worker_job, output_file, tmp_dir)
self._killed = False
self._popen = popen
@@ -277,9 +324,15 @@ class TestWorkerProcess(threading.Thread):
else:
encoding = sys.stdout.encoding
+ match_tests = self.runtests.get_match_tests(test_name)
+
# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
- with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_fh:
+ with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file:
+ worker_job = WorkerJob(test_name,
+ namespace=self.ns,
+ rerun=self.rerun,
+ match_tests=match_tests)
# gh-93353: Check for leaked temporary files in the parent process,
# since the deletion of temporary files can happen late during
# Python finalization: too late for libregrtest.
@@ -290,17 +343,17 @@ class TestWorkerProcess(threading.Thread):
tmp_dir = tempfile.mkdtemp(prefix="test_python_")
tmp_dir = os.path.abspath(tmp_dir)
try:
- retcode = self._run_process(test_name, tmp_dir, stdout_fh)
+ retcode = self._run_process(worker_job, stdout_file, tmp_dir)
finally:
tmp_files = os.listdir(tmp_dir)
os_helper.rmtree(tmp_dir)
else:
- retcode = self._run_process(test_name, None, stdout_fh)
+ retcode = self._run_process(worker_job, stdout_file)
tmp_files = ()
- stdout_fh.seek(0)
+ stdout_file.seek(0)
try:
- stdout = stdout_fh.read().strip()
+ stdout = stdout_file.read().strip()
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
@@ -342,6 +395,8 @@ class TestWorkerProcess(threading.Thread):
return MultiprocessResult(result, stdout)
def run(self) -> None:
+ fail_fast = self.ns.failfast
+ fail_env_changed = self.ns.fail_env_changed
while not self._stopped:
try:
try:
@@ -354,7 +409,7 @@ class TestWorkerProcess(threading.Thread):
mp_result.result.duration = time.monotonic() - self.start_time
self.output.put((False, mp_result))
- if must_stop(mp_result.result, self.ns):
+ if mp_result.result.must_stop(fail_fast, fail_env_changed):
break
except ExitThread:
break
@@ -410,29 +465,36 @@ def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
class MultiprocessTestRunner:
- def __init__(self, regrtest: Regrtest) -> None:
+ def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None:
+ ns = regrtest.ns
+ timeout = ns.timeout
+
self.regrtest = regrtest
+ self.runtests = runtests
+ self.rerun = runtests.rerun
self.log = self.regrtest.log
- self.ns = regrtest.ns
+ self.ns = ns
self.output: queue.Queue[QueueOutput] = queue.Queue()
- self.pending = MultiprocessIterator(self.regrtest.tests)
- if self.ns.timeout is not None:
+ tests_iter = runtests.iter_tests()
+ self.pending = MultiprocessIterator(tests_iter)
+ if timeout is not None:
# Rely on faulthandler to kill a worker process. This timouet is
# when faulthandler fails to kill a worker process. Give a maximum
# of 5 minutes to faulthandler to kill the worker.
- self.worker_timeout = min(self.ns.timeout * 1.5,
- self.ns.timeout + 5 * 60)
+ self.worker_timeout = min(timeout * 1.5, timeout + 5 * 60)
else:
self.worker_timeout = None
self.workers = None
def start_workers(self) -> None:
+ use_mp = self.ns.use_mp
+ timeout = self.ns.timeout
self.workers = [TestWorkerProcess(index, self)
- for index in range(1, self.ns.use_mp + 1)]
+ for index in range(1, use_mp + 1)]
msg = f"Run tests in parallel using {len(self.workers)} child processes"
- if self.ns.timeout:
+ if timeout:
msg += (" (timeout: %s, worker timeout: %s)"
- % (format_duration(self.ns.timeout),
+ % (format_duration(timeout),
format_duration(self.worker_timeout)))
self.log(msg)
for worker in self.workers:
@@ -446,6 +508,7 @@ class MultiprocessTestRunner:
worker.wait_stopped(start_time)
def _get_result(self) -> QueueOutput | None:
+ pgo = self.ns.pgo
use_faulthandler = (self.ns.timeout is not None)
timeout = PROGRESS_UPDATE
@@ -464,7 +527,7 @@ class MultiprocessTestRunner:
# display progress
running = get_running(self.workers)
- if running and not self.ns.pgo:
+ if running and not pgo:
self.log('running: %s' % ', '.join(running))
# all worker threads are done: consume pending results
@@ -475,42 +538,46 @@ class MultiprocessTestRunner:
def display_result(self, mp_result: MultiprocessResult) -> None:
result = mp_result.result
+ pgo = self.ns.pgo
text = str(result)
if mp_result.err_msg:
# MULTIPROCESSING_ERROR
text += ' (%s)' % mp_result.err_msg
- elif (result.duration >= PROGRESS_MIN_TIME and not self.ns.pgo):
+ elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
text += ' (%s)' % format_duration(result.duration)
running = get_running(self.workers)
- if running and not self.ns.pgo:
+ if running and not pgo:
text += ' -- running: %s' % ', '.join(running)
self.regrtest.display_progress(self.test_index, text)
def _process_result(self, item: QueueOutput) -> bool:
"""Returns True if test runner must stop."""
+ rerun = self.runtests.rerun
if item[0]:
# Thread got an exception
format_exc = item[1]
print_warning(f"regrtest worker thread failed: {format_exc}")
result = TestResult("<regrtest worker>", state=State.MULTIPROCESSING_ERROR)
- self.regrtest.accumulate_result(result)
- return True
+ self.regrtest.accumulate_result(result, rerun=rerun)
+ return result
self.test_index += 1
mp_result = item[1]
- self.regrtest.accumulate_result(mp_result.result)
+ result = mp_result.result
+ self.regrtest.accumulate_result(result, rerun=rerun)
self.display_result(mp_result)
if mp_result.worker_stdout:
print(mp_result.worker_stdout, flush=True)
- if must_stop(mp_result.result, self.ns):
- return True
-
- return False
+ return result
def run_tests(self) -> None:
+ fail_fast = self.ns.failfast
+ fail_env_changed = self.ns.fail_env_changed
+ timeout = self.ns.timeout
+
self.start_workers()
self.test_index = 0
@@ -520,14 +587,14 @@ class MultiprocessTestRunner:
if item is None:
break
- stop = self._process_result(item)
- if stop:
+ result = self._process_result(item)
+ if result.must_stop(fail_fast, fail_env_changed):
break
except KeyboardInterrupt:
print()
self.regrtest.interrupted = True
finally:
- if self.ns.timeout is not None:
+ if timeout is not None:
faulthandler.cancel_dump_traceback_later()
# Always ensure that all worker processes are no longer
@@ -536,8 +603,8 @@ class MultiprocessTestRunner:
self.stop_workers()
-def run_tests_multiprocess(regrtest: Regrtest) -> None:
- MultiprocessTestRunner(regrtest).run_tests()
+def run_tests_multiprocess(regrtest: Regrtest, runtests: RunTests) -> None:
+ MultiprocessTestRunner(regrtest, runtests).run_tests()
class EncodeTestResult(json.JSONEncoder):
@@ -552,7 +619,7 @@ class EncodeTestResult(json.JSONEncoder):
return super().default(o)
-def decode_test_result(d: dict[str, Any]) -> TestResult | TestStats | dict[str, Any]:
+def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]:
"""Decode a TestResult (sub)class object from a JSON dict."""
if "__test_result__" not in d:
diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py
index 89a149e..9a60a3d 100644
--- a/Lib/test/libregrtest/utils.py
+++ b/Lib/test/libregrtest/utils.py
@@ -31,7 +31,7 @@ def format_duration(seconds):
return ' '.join(parts)
-def removepy(names):
+def strip_py_suffix(names: list[str]):
if not names:
return
for idx, name in enumerate(names):