summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-09-08 13:14:17 (GMT)
committerGitHub <noreply@github.com>2023-09-08 13:14:17 (GMT)
commit0855b2c8b69f02f86f837875acda148479e6a061 (patch)
tree4e42efd35d33d09170e4b5b02bc350db4672b758
parenteeaae92b496bf75e8c937af1d5fd277d71fb7334 (diff)
downloadcpython-0855b2c8b69f02f86f837875acda148479e6a061.zip
cpython-0855b2c8b69f02f86f837875acda148479e6a061.tar.gz
cpython-0855b2c8b69f02f86f837875acda148479e6a061.tar.bz2
[3.12] gh-108834: Sync libregrtest with the main branch (#108966)
* gh-108834: regrtest reruns failed tests in subprocesses (#108839) When using --rerun option, regrtest now re-runs failed tests in verbose mode in fresh worker processes to have more deterministic behavior. So it can write its final report even if a test killed a worker progress. Add --fail-rerun option to regrtest: exit with non-zero exit code if a test failed pass passed when re-run in verbose mode (in a fresh process). That's now more useful since tests can pass when re-run in a fresh worker progress, whereas they failed when run after other tests when tests are run sequentially. Rename --verbose2 option (-w) to --rerun. Keep --verbose2 as a deprecated alias. Changes: * Fix and enhance statistics in regrtest summary. Add "(filtered)" when --match and/or --ignore options are used. * Add RunTests class. * Add TestResult.get_rerun_match_tests() method * Rewrite code to serialize/deserialize worker arguments as JSON using a new WorkerJob class. * Fix stats when a test is run with --forever --rerun. * If failed test names cannot be parsed, log a warning and don't filter tests. * test_regrtest.test_rerun_success() now uses a marker file, since the test is re-run in a separated process. * Add tests on normalize_test_name() function. * Add test_success() and test_skip() tests to test_regrtest. (cherry picked from commit 31c2945f143c6b80c837fcf09a5cfb85fea9ea4c) * gh-108834: regrtest --fail-rerun exits with code 5 (#108896) When the --fail-rerun option is used and a test fails and then pass, regrtest now uses exit code 5 ("rerun) instead of 2 ("bad test"). (cherry picked from commit 1170d5a292b46f754cd29c245a040f1602f70301) * gh-108416: Mark slow but not CPU bound test methods with requires_resource('walltime') (GH-108480) (cherry picked from commit 1e0d62793a84001e92f1c80b511d3a212b435acc) * Manually sync Lib/test/libregrtest/ from main --------- Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
-rw-r--r--Lib/test/_test_multiprocessing.py3
-rwxr-xr-xLib/test/bisect_cmd.py7
-rw-r--r--Lib/test/libregrtest/cmdline.py15
-rw-r--r--Lib/test/libregrtest/main.py592
-rw-r--r--Lib/test/libregrtest/runtest.py228
-rw-r--r--Lib/test/libregrtest/runtest_mp.py179
-rw-r--r--Lib/test/libregrtest/utils.py2
-rw-r--r--Lib/test/support/__init__.py1
-rw-r--r--Lib/test/support/testresult.py3
-rw-r--r--Lib/test/test_concurrent_futures/executor.py1
-rw-r--r--Lib/test/test_concurrent_futures/test_wait.py3
-rw-r--r--Lib/test/test_eintr.py1
-rw-r--r--Lib/test/test_faulthandler.py1
-rw-r--r--Lib/test/test_httplib.py1
-rw-r--r--Lib/test/test_imaplib.py5
-rw-r--r--Lib/test/test_io.py6
-rw-r--r--Lib/test/test_logging.py1
-rw-r--r--Lib/test/test_poll.py3
-rw-r--r--Lib/test/test_regrtest.py306
-rw-r--r--Lib/test/test_signal.py1
-rw-r--r--Lib/test/test_smtpnet.py1
-rw-r--r--Lib/test/test_ssl.py2
-rw-r--r--Lib/test/test_subprocess.py2
-rw-r--r--Lib/test/test_urllib2net.py5
-rw-r--r--Lib/test/test_urllibnet.py2
-rw-r--r--Lib/test/test_xmlrpc.py9
-rw-r--r--Misc/NEWS.d/next/Tests/2023-09-03-02-01-55.gh-issue-108834.iAwXzj.rst6
-rw-r--r--Misc/NEWS.d/next/Tests/2023-09-03-06-17-12.gh-issue-108834.fjV-CJ.rst2
-rw-r--r--Misc/NEWS.d/next/Tests/2023-09-03-20-15-49.gh-issue-108834.Osvmhf.rst3
29 files changed, 892 insertions, 499 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 2e65653..044bfc9 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -675,6 +675,7 @@ class _TestProcess(BaseTestCase):
close_queue(q)
+ @support.requires_resource('walltime')
def test_many_processes(self):
if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
@@ -4953,6 +4954,7 @@ class TestWait(unittest.TestCase):
def test_wait_socket_slow(self):
self.test_wait_socket(True)
+ @support.requires_resource('walltime')
def test_wait_timeout(self):
from multiprocessing.connection import wait
@@ -4981,6 +4983,7 @@ class TestWait(unittest.TestCase):
sem.release()
time.sleep(period)
+ @support.requires_resource('walltime')
def test_wait_integer(self):
from multiprocessing.connection import wait
diff --git a/Lib/test/bisect_cmd.py b/Lib/test/bisect_cmd.py
index 0bdd7a4..5cb804b 100755
--- a/Lib/test/bisect_cmd.py
+++ b/Lib/test/bisect_cmd.py
@@ -109,9 +109,10 @@ def parse_args():
def main():
args = parse_args()
- if '-w' in args.test_args or '--verbose2' in args.test_args:
- print("WARNING: -w/--verbose2 option should not be used to bisect!")
- print()
+ for opt in ('-w', '--rerun', '--verbose2'):
+ if opt in args.test_args:
+ print(f"WARNING: {opt} option should not be used to bisect!")
+ print()
if args.input:
with open(args.input) as fp:
diff --git a/Lib/test/libregrtest/cmdline.py b/Lib/test/libregrtest/cmdline.py
index ebe5792..d1a590d 100644
--- a/Lib/test/libregrtest/cmdline.py
+++ b/Lib/test/libregrtest/cmdline.py
@@ -107,6 +107,8 @@ resources to test. Currently only the following are defined:
cpu - Used for certain CPU-heavy tests.
+ walltime - Long running but not CPU-bound tests.
+
subprocess Run all tests for the subprocess module.
urlfetch - It is okay to download files required on testing.
@@ -129,7 +131,7 @@ Pattern examples:
ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network',
- 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui')
+ 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui', 'walltime')
# Other resources excluded from --use=all:
#
@@ -156,7 +158,7 @@ class Namespace(argparse.Namespace):
self.coverdir = 'coverage'
self.runleaks = False
self.huntrleaks = False
- self.verbose2 = False
+ self.rerun = False
self.verbose3 = False
self.print_slow = False
self.random_seed = None
@@ -213,8 +215,10 @@ def _create_parser():
group = parser.add_argument_group('Verbosity')
group.add_argument('-v', '--verbose', action='count',
help='run tests in verbose mode with output to stdout')
- group.add_argument('-w', '--verbose2', action='store_true',
+ group.add_argument('-w', '--rerun', action='store_true',
help='re-run failed tests in verbose mode')
+ group.add_argument('--verbose2', action='store_true', dest='rerun',
+ help='deprecated alias to --rerun')
group.add_argument('-W', '--verbose3', action='store_true',
help='display test output on failure')
group.add_argument('-q', '--quiet', action='store_true',
@@ -309,6 +313,9 @@ def _create_parser():
group.add_argument('--fail-env-changed', action='store_true',
help='if a test file alters the environment, mark '
'the test as failed')
+ group.add_argument('--fail-rerun', action='store_true',
+ help='if a test failed and then passed when re-run, '
+ 'mark the tests as failed')
group.add_argument('--junit-xml', dest='xmlpath', metavar='FILENAME',
help='writes JUnit-style XML results to the specified '
@@ -380,7 +387,7 @@ def _parse_args(args, **kwargs):
ns.python = shlex.split(ns.python)
if ns.failfast and not (ns.verbose or ns.verbose3):
parser.error("-G/--failfast needs either -v or -W")
- if ns.pgo and (ns.verbose or ns.verbose2 or ns.verbose3):
+ if ns.pgo and (ns.verbose or ns.rerun or ns.verbose3):
parser.error("--pgo/-v don't go together!")
if ns.pgo_extended:
ns.pgo = True # pgo_extended implies pgo
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index a357bd9..ab03647 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -11,11 +11,11 @@ import time
import unittest
from test.libregrtest.cmdline import _parse_args
from test.libregrtest.runtest import (
- findtests, split_test_packages, runtest, get_abs_module,
- PROGRESS_MIN_TIME, State)
+ findtests, split_test_packages, runtest, abs_module_name,
+ PROGRESS_MIN_TIME, State, MatchTestsDict, RunTests)
from test.libregrtest.setup import setup_tests
from test.libregrtest.pgo import setup_pgo_tests
-from test.libregrtest.utils import (removepy, count, format_duration,
+from test.libregrtest.utils import (strip_py_suffix, count, format_duration,
printlist, get_build_info)
from test import support
from test.support import TestStats
@@ -28,18 +28,11 @@ from test.support import threading_helper
# Must be smaller than buildbot "1200 seconds without output" limit.
EXIT_TIMEOUT = 120.0
-# gh-90681: When rerunning tests, we might need to rerun the whole
-# class or module suite if some its life-cycle hooks fail.
-# Test level hooks are not affected.
-_TEST_LIFECYCLE_HOOKS = frozenset((
- 'setUpClass', 'tearDownClass',
- 'setUpModule', 'tearDownModule',
-))
-
EXITCODE_BAD_TEST = 2
-EXITCODE_INTERRUPTED = 130
EXITCODE_ENV_CHANGED = 3
EXITCODE_NO_TESTS_RAN = 4
+EXITCODE_RERUN_FAIL = 5
+EXITCODE_INTERRUPTED = 130
class Regrtest:
@@ -72,19 +65,22 @@ class Regrtest:
# tests
self.tests = []
self.selected = []
+ self.all_runtests: list[RunTests] = []
# test results
- self.good = []
- self.bad = []
- self.skipped = []
- self.resource_denied = []
- self.environment_changed = []
- self.run_no_tests = []
- self.need_rerun = []
- self.rerun = []
- self.first_result = None
+ self.good: list[str] = []
+ self.bad: list[str] = []
+ self.rerun_bad: list[str] = []
+ self.skipped: list[str] = []
+ self.resource_denied: list[str] = []
+ self.environment_changed: list[str] = []
+ self.run_no_tests: list[str] = []
+ self.rerun: list[str] = []
+
+ self.need_rerun: list[TestResult] = []
+ self.first_state: str | None = None
self.interrupted = False
- self.stats_dict: dict[str, TestStats] = {}
+ self.total_stats = TestStats()
# used by --slow
self.test_times = []
@@ -94,7 +90,7 @@ class Regrtest:
# used to display the progress bar "[ 3/100]"
self.start_time = time.perf_counter()
- self.test_count = ''
+ self.test_count_text = ''
self.test_count_width = 1
# used by --single
@@ -107,7 +103,6 @@ class Regrtest:
# misc
self.win_load_tracker = None
self.tmp_dir = None
- self.worker_test_name = None
def get_executed(self):
return (set(self.good) | set(self.bad) | set(self.skipped)
@@ -115,11 +110,9 @@ class Regrtest:
| set(self.run_no_tests))
def accumulate_result(self, result, rerun=False):
+ fail_env_changed = self.ns.fail_env_changed
test_name = result.test_name
- if result.has_meaningful_duration() and not rerun:
- self.test_times.append((result.duration, test_name))
-
match result.state:
case State.PASSED:
self.good.append(test_name)
@@ -128,25 +121,24 @@ class Regrtest:
case State.SKIPPED:
self.skipped.append(test_name)
case State.RESOURCE_DENIED:
- self.skipped.append(test_name)
self.resource_denied.append(test_name)
case State.INTERRUPTED:
self.interrupted = True
case State.DID_NOT_RUN:
self.run_no_tests.append(test_name)
case _:
- if result.is_failed(self.ns.fail_env_changed):
- if not rerun:
- self.bad.append(test_name)
- self.need_rerun.append(result)
+ if result.is_failed(fail_env_changed):
+ self.bad.append(test_name)
+ self.need_rerun.append(result)
else:
- raise ValueError(f"invalid test state: {state!r}")
+ raise ValueError(f"invalid test state: {result.state!r}")
+ if result.has_meaningful_duration() and not rerun:
+ self.test_times.append((result.duration, test_name))
if result.stats is not None:
- self.stats_dict[result.test_name] = result.stats
-
- if rerun and not(result.is_failed(False) or result.state == State.INTERRUPTED):
- self.bad.remove(test_name)
+ self.total_stats.accumulate(result.stats)
+ if rerun:
+ self.rerun.append(test_name)
xml_data = result.xml_data
if xml_data:
@@ -180,13 +172,15 @@ class Regrtest:
print(line, flush=True)
def display_progress(self, test_index, text):
- if self.ns.quiet:
+ quiet = self.ns.quiet
+ pgo = self.ns.pgo
+ if quiet:
return
# "[ 51/405/1] test_tcl passed"
- line = f"{test_index:{self.test_count_width}}{self.test_count}"
+ line = f"{test_index:{self.test_count_width}}{self.test_count_text}"
fails = len(self.bad) + len(self.environment_changed)
- if fails and not self.ns.pgo:
+ if fails and not pgo:
line = f"{line}/{fails}"
self.log(f"[{line}] {text}")
@@ -196,15 +190,7 @@ class Regrtest:
if ns.xmlpath:
support.junit_xml_list = self.testsuite_xml = []
- worker_args = ns.worker_args
- if worker_args is not None:
- from test.libregrtest.runtest_mp import parse_worker_args
- ns, test_name = parse_worker_args(ns.worker_args)
- ns.worker_args = worker_args
- self.worker_test_name = test_name
-
- # Strip .py extensions.
- removepy(ns.args)
+ strip_py_suffix(ns.args)
if ns.huntrleaks:
warmup, repetitions, _ = ns.huntrleaks
@@ -221,9 +207,18 @@ class Regrtest:
self.ns = ns
def find_tests(self, tests):
+ ns = self.ns
+ single = ns.single
+ fromfile = ns.fromfile
+ pgo = ns.pgo
+ exclude = ns.exclude
+ test_dir = ns.testdir
+ starting_test = ns.start
+ randomize = ns.randomize
+
self.tests = tests
- if self.ns.single:
+ if single:
self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
try:
with open(self.next_single_filename, 'r') as fp:
@@ -232,12 +227,12 @@ class Regrtest:
except OSError:
pass
- if self.ns.fromfile:
+ if fromfile:
self.tests = []
# regex to match 'test_builtin' in line:
# '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec'
regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
- with open(os.path.join(os_helper.SAVEDCWD, self.ns.fromfile)) as fp:
+ with open(os.path.join(os_helper.SAVEDCWD, fromfile)) as fp:
for line in fp:
line = line.split('#', 1)[0]
line = line.strip()
@@ -245,22 +240,22 @@ class Regrtest:
if match is not None:
self.tests.append(match.group())
- removepy(self.tests)
+ strip_py_suffix(self.tests)
- if self.ns.pgo:
+ if pgo:
# add default PGO tests if no tests are specified
- setup_pgo_tests(self.ns)
+ setup_pgo_tests(ns)
- exclude = set()
- if self.ns.exclude:
- for arg in self.ns.args:
- exclude.add(arg)
- self.ns.args = []
+ exclude_tests = set()
+ if exclude:
+ for arg in ns.args:
+ exclude_tests.add(arg)
+ ns.args = []
- alltests = findtests(testdir=self.ns.testdir, exclude=exclude)
+ alltests = findtests(testdir=test_dir, exclude=exclude_tests)
- if not self.ns.fromfile:
- self.selected = self.tests or self.ns.args
+ if not fromfile:
+ self.selected = self.tests or ns.args
if self.selected:
self.selected = split_test_packages(self.selected)
else:
@@ -268,7 +263,7 @@ class Regrtest:
else:
self.selected = self.tests
- if self.ns.single:
+ if single:
self.selected = self.selected[:1]
try:
pos = alltests.index(self.selected[0])
@@ -277,17 +272,17 @@ class Regrtest:
pass
# Remove all the selected tests that precede start if it's set.
- if self.ns.start:
+ if starting_test:
try:
- del self.selected[:self.selected.index(self.ns.start)]
+ del self.selected[:self.selected.index(starting_test)]
except ValueError:
- print("Couldn't find starting test (%s), using all tests"
- % self.ns.start, file=sys.stderr)
+ print(f"Cannot find starting test: {starting_test}")
+ sys.exit(1)
- if self.ns.randomize:
- if self.ns.random_seed is None:
- self.ns.random_seed = random.randrange(10000000)
- random.seed(self.ns.random_seed)
+ if randomize:
+ if ns.random_seed is None:
+ ns.random_seed = random.randrange(10000000)
+ random.seed(ns.random_seed)
random.shuffle(self.selected)
def list_tests(self):
@@ -305,25 +300,63 @@ class Regrtest:
print(test.id())
def list_cases(self):
+ ns = self.ns
+ test_dir = ns.testdir
support.verbose = False
- support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests)
+ support.set_match_tests(ns.match_tests, ns.ignore_tests)
+ skipped = []
for test_name in self.selected:
- abstest = get_abs_module(self.ns, test_name)
+ module_name = abs_module_name(test_name, test_dir)
try:
- suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
+ suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
self._list_cases(suite)
except unittest.SkipTest:
- self.skipped.append(test_name)
+ skipped.append(test_name)
- if self.skipped:
- print(file=sys.stderr)
- print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr)
- printlist(self.skipped, file=sys.stderr)
+ if skipped:
+ sys.stdout.flush()
+ stderr = sys.stderr
+ print(file=stderr)
+ print(count(len(skipped), "test"), "skipped:", file=stderr)
+ printlist(skipped, file=stderr)
- def rerun_failed_tests(self):
- self.log()
+ def get_rerun_match(self, rerun_list) -> MatchTestsDict:
+ rerun_match_tests = {}
+ for result in rerun_list:
+ match_tests = result.get_rerun_match_tests()
+ # ignore empty match list
+ if match_tests:
+ rerun_match_tests[result.test_name] = match_tests
+ return rerun_match_tests
+
+ def _rerun_failed_tests(self, need_rerun):
+ # Configure the runner to re-run tests
+ ns = self.ns
+ ns.verbose = True
+ ns.failfast = False
+ ns.verbose3 = False
+ ns.forever = False
+ if ns.use_mp is None:
+ ns.use_mp = 1
+
+ # Get tests to re-run
+ tests = [result.test_name for result in need_rerun]
+ match_tests = self.get_rerun_match(need_rerun)
+ self.set_tests(tests)
+
+ # Clear previously failed tests
+ self.rerun_bad.extend(self.bad)
+ self.bad.clear()
+ self.need_rerun.clear()
+ # Re-run failed tests
+ self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses")
+ runtests = RunTests(tests, match_tests=match_tests, rerun=True)
+ self.all_runtests.append(runtests)
+ self._run_tests_mp(runtests)
+
+ def rerun_failed_tests(self, need_rerun):
if self.ns.python:
# Temp patch for https://github.com/python/cpython/issues/94052
self.log(
@@ -332,45 +365,10 @@ class Regrtest:
)
return
- self.ns.verbose = True
- self.ns.failfast = False
- self.ns.verbose3 = False
-
- self.first_result = self.get_tests_result()
-
- self.log("Re-running failed tests in verbose mode")
- rerun_list = list(self.need_rerun)
- self.need_rerun.clear()
- for result in rerun_list:
- test_name = result.test_name
- self.rerun.append(test_name)
-
- errors = result.errors or []
- failures = result.failures or []
- error_names = [
- self.normalize_test_name(test_full_name, is_error=True)
- for (test_full_name, *_) in errors]
- failure_names = [
- self.normalize_test_name(test_full_name)
- for (test_full_name, *_) in failures]
- self.ns.verbose = True
- orig_match_tests = self.ns.match_tests
- if errors or failures:
- if self.ns.match_tests is None:
- self.ns.match_tests = []
- self.ns.match_tests.extend(error_names)
- self.ns.match_tests.extend(failure_names)
- matching = "matching: " + ", ".join(self.ns.match_tests)
- self.log(f"Re-running {test_name} in verbose mode ({matching})")
- else:
- self.log(f"Re-running {test_name} in verbose mode")
- result = runtest(self.ns, test_name)
- self.ns.match_tests = orig_match_tests
+ self.first_state = self.get_tests_state()
- self.accumulate_result(result, rerun=True)
-
- if result.state == State.INTERRUPTED:
- break
+ print()
+ self._rerun_failed_tests(need_rerun)
if self.bad:
print(count(len(self.bad), 'test'), "failed again:")
@@ -378,28 +376,17 @@ class Regrtest:
self.display_result()
- def normalize_test_name(self, test_full_name, *, is_error=False):
- short_name = test_full_name.split(" ")[0]
- if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
- # This means that we have a failure in a life-cycle hook,
- # we need to rerun the whole module or class suite.
- # Basically the error looks like this:
- # ERROR: setUpClass (test.test_reg_ex.RegTest)
- # or
- # ERROR: setUpModule (test.test_reg_ex)
- # So, we need to parse the class / module name.
- lpar = test_full_name.index('(')
- rpar = test_full_name.index(')')
- return test_full_name[lpar + 1: rpar].split('.')[-1]
- return short_name
-
def display_result(self):
+ pgo = self.ns.pgo
+ quiet = self.ns.quiet
+ print_slow = self.ns.print_slow
+
# If running the test suite for PGO then no one cares about results.
- if self.ns.pgo:
+ if pgo:
return
print()
- print("== Tests result: %s ==" % self.get_tests_result())
+ print("== Tests result: %s ==" % self.get_tests_state())
if self.interrupted:
print("Test suite interrupted by signal SIGINT.")
@@ -410,7 +397,7 @@ class Regrtest:
print(count(len(omitted), "test"), "omitted:")
printlist(omitted)
- if self.good and not self.ns.quiet:
+ if self.good and not quiet:
print()
if (not self.bad
and not self.skipped
@@ -419,7 +406,7 @@ class Regrtest:
print("All", end=' ')
print(count(len(self.good), "test"), "OK.")
- if self.ns.print_slow:
+ if print_slow:
self.test_times.sort(reverse=True)
print()
print("10 slowest tests:")
@@ -437,11 +424,16 @@ class Regrtest:
count(len(self.environment_changed), "test")))
printlist(self.environment_changed)
- if self.skipped and not self.ns.quiet:
+ if self.skipped and not quiet:
print()
print(count(len(self.skipped), "test"), "skipped:")
printlist(self.skipped)
+ if self.resource_denied and not quiet:
+ print()
+ print(count(len(self.resource_denied), "test"), "skipped (resource denied):")
+ printlist(self.resource_denied)
+
if self.rerun:
print()
print("%s:" % count(len(self.rerun), "re-run test"))
@@ -452,40 +444,58 @@ class Regrtest:
print(count(len(self.run_no_tests), "test"), "run no tests:")
printlist(self.run_no_tests)
- def run_tests_sequential(self):
- if self.ns.trace:
+ def run_test(self, test_index, test_name, previous_test, save_modules):
+ text = test_name
+ if previous_test:
+ text = '%s -- %s' % (text, previous_test)
+ self.display_progress(test_index, text)
+
+ if self.tracer:
+ # If we're tracing code coverage, then we don't exit with status
+ # if on a false return value from main.
+ cmd = ('result = runtest(self.ns, test_name); '
+ 'self.accumulate_result(result)')
+ ns = dict(locals())
+ self.tracer.runctx(cmd, globals=globals(), locals=ns)
+ result = ns['result']
+ else:
+ result = runtest(self.ns, test_name)
+ self.accumulate_result(result)
+
+ # Unload the newly imported modules (best effort finalization)
+ for module in sys.modules.keys():
+ if module not in save_modules and module.startswith("test."):
+ support.unload(module)
+
+ return result
+
+ def run_tests_sequentially(self, runtests):
+ ns = self.ns
+ coverage = ns.trace
+ fail_fast = ns.failfast
+ fail_env_changed = ns.fail_env_changed
+ timeout = ns.timeout
+
+ if coverage:
import trace
self.tracer = trace.Trace(trace=False, count=True)
save_modules = sys.modules.keys()
msg = "Run tests sequentially"
- if self.ns.timeout:
- msg += " (timeout: %s)" % format_duration(self.ns.timeout)
+ if timeout:
+ msg += " (timeout: %s)" % format_duration(timeout)
self.log(msg)
previous_test = None
- for test_index, test_name in enumerate(self.tests, 1):
+ tests_iter = runtests.iter_tests()
+ for test_index, test_name in enumerate(tests_iter, 1):
start_time = time.perf_counter()
- text = test_name
- if previous_test:
- text = '%s -- %s' % (text, previous_test)
- self.display_progress(test_index, text)
-
- if self.tracer:
- # If we're tracing code coverage, then we don't exit with status
- # if on a false return value from main.
- cmd = ('result = runtest(self.ns, test_name); '
- 'self.accumulate_result(result)')
- ns = dict(locals())
- self.tracer.runctx(cmd, globals=globals(), locals=ns)
- result = ns['result']
- else:
- result = runtest(self.ns, test_name)
- self.accumulate_result(result)
+ result = self.run_test(test_index, test_name,
+ previous_test, save_modules)
- if result.state == State.INTERRUPTED:
+ if result.must_stop(fail_fast, fail_env_changed):
break
previous_test = str(result)
@@ -496,26 +506,9 @@ class Regrtest:
# be quiet: say nothing if the test passed shortly
previous_test = None
- # Unload the newly imported modules (best effort finalization)
- for module in sys.modules.keys():
- if module not in save_modules and module.startswith("test."):
- support.unload(module)
-
- if self.ns.failfast and result.is_failed(self.ns.fail_env_changed):
- break
-
if previous_test:
print(previous_test)
- def _test_forever(self, tests):
- while True:
- for test_name in tests:
- yield test_name
- if self.bad:
- return
- if self.ns.fail_env_changed and self.environment_changed:
- return
-
def display_header(self):
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
@@ -528,36 +521,45 @@ class Regrtest:
print("== CPU count:", cpu_count)
print("== encodings: locale=%s, FS=%s"
% (locale.getencoding(), sys.getfilesystemencoding()))
+ self.display_sanitizers()
+
+ def display_sanitizers(self):
+ # This makes it easier to remember what to set in your local
+ # environment when trying to reproduce a sanitizer failure.
asan = support.check_sanitizer(address=True)
msan = support.check_sanitizer(memory=True)
ubsan = support.check_sanitizer(ub=True)
- # This makes it easier to remember what to set in your local
- # environment when trying to reproduce a sanitizer failure.
- if asan or msan or ubsan:
- names = [n for n in (asan and "address",
- msan and "memory",
- ubsan and "undefined behavior")
- if n]
- print(f"== sanitizers: {', '.join(names)}")
- a_opts = os.environ.get("ASAN_OPTIONS")
- if asan and a_opts is not None:
- print(f"== ASAN_OPTIONS={a_opts}")
- m_opts = os.environ.get("ASAN_OPTIONS")
- if msan and m_opts is not None:
- print(f"== MSAN_OPTIONS={m_opts}")
- ub_opts = os.environ.get("UBSAN_OPTIONS")
- if ubsan and ub_opts is not None:
- print(f"== UBSAN_OPTIONS={ub_opts}")
+ sanitizers = []
+ if asan:
+ sanitizers.append("address")
+ if msan:
+ sanitizers.append("memory")
+ if ubsan:
+ sanitizers.append("undefined behavior")
+ if not sanitizers:
+ return
+
+ print(f"== sanitizers: {', '.join(sanitizers)}")
+ for sanitizer, env_var in (
+ (asan, "ASAN_OPTIONS"),
+ (msan, "MSAN_OPTIONS"),
+ (ubsan, "UBSAN_OPTIONS"),
+ ):
+ options= os.environ.get(env_var)
+ if sanitizer and options is not None:
+ print(f"== {env_var}={options!r}")
def no_tests_run(self):
return not any((self.good, self.bad, self.skipped, self.interrupted,
self.environment_changed))
- def get_tests_result(self):
+ def get_tests_state(self):
+ fail_env_changed = self.ns.fail_env_changed
+
result = []
if self.bad:
result.append("FAILURE")
- elif self.ns.fail_env_changed and self.environment_changed:
+ elif fail_env_changed and self.environment_changed:
result.append("ENV CHANGED")
elif self.no_tests_run():
result.append("NO TESTS RAN")
@@ -569,10 +571,40 @@ class Regrtest:
result.append("SUCCESS")
result = ', '.join(result)
- if self.first_result:
- result = '%s then %s' % (self.first_result, result)
+ if self.first_state:
+ result = '%s then %s' % (self.first_state, result)
return result
+ def _run_tests_mp(self, runtests: RunTests) -> None:
+ from test.libregrtest.runtest_mp import run_tests_multiprocess
+ # If we're on windows and this is the parent runner (not a worker),
+ # track the load average.
+ if sys.platform == 'win32':
+ from test.libregrtest.win_utils import WindowsLoadTracker
+
+ try:
+ self.win_load_tracker = WindowsLoadTracker()
+ except PermissionError as error:
+ # Standard accounts may not have access to the performance
+ # counters.
+ print(f'Failed to create WindowsLoadTracker: {error}')
+
+ try:
+ run_tests_multiprocess(self, runtests)
+ finally:
+ if self.win_load_tracker is not None:
+ self.win_load_tracker.close()
+ self.win_load_tracker = None
+
+ def set_tests(self, tests):
+ self.tests = tests
+ if self.ns.forever:
+ self.test_count_text = ''
+ self.test_count_width = 3
+ else:
+ self.test_count_text = '/{}'.format(len(self.tests))
+ self.test_count_width = len(self.test_count_text) - 1
+
def run_tests(self):
# For a partial run, we do not need to clutter the output.
if (self.ns.header
@@ -590,37 +622,14 @@ class Regrtest:
if self.ns.randomize:
print("Using random seed", self.ns.random_seed)
- if self.ns.forever:
- self.tests = self._test_forever(list(self.selected))
- self.test_count = ''
- self.test_count_width = 3
- else:
- self.tests = iter(self.selected)
- self.test_count = '/{}'.format(len(self.selected))
- self.test_count_width = len(self.test_count) - 1
-
+ tests = self.selected
+ self.set_tests(tests)
+ runtests = RunTests(tests, forever=self.ns.forever)
+ self.all_runtests.append(runtests)
if self.ns.use_mp:
- from test.libregrtest.runtest_mp import run_tests_multiprocess
- # If we're on windows and this is the parent runner (not a worker),
- # track the load average.
- if sys.platform == 'win32' and self.worker_test_name is None:
- from test.libregrtest.win_utils import WindowsLoadTracker
-
- try:
- self.win_load_tracker = WindowsLoadTracker()
- except PermissionError as error:
- # Standard accounts may not have access to the performance
- # counters.
- print(f'Failed to create WindowsLoadTracker: {error}')
-
- try:
- run_tests_multiprocess(self)
- finally:
- if self.win_load_tracker is not None:
- self.win_load_tracker.close()
- self.win_load_tracker = None
+ self._run_tests_mp(runtests)
else:
- self.run_tests_sequential()
+ self.run_tests_sequentially(runtests)
def finalize(self):
if self.next_single_filename:
@@ -635,23 +644,29 @@ class Regrtest:
r.write_results(show_missing=True, summary=True,
coverdir=self.ns.coverdir)
- print()
- self.display_summary()
-
if self.ns.runleaks:
os.system("leaks %d" % os.getpid())
+ self.save_xml_result()
+
def display_summary(self):
duration = time.perf_counter() - self.start_time
+ first_runtests = self.all_runtests[0]
+ # the second runtests (re-run failed tests) disables forever,
+ # use the first runtests
+ forever = first_runtests.forever
+ filtered = bool(self.ns.match_tests) or bool(self.ns.ignore_tests)
# Total duration
+ print()
print("Total duration: %s" % format_duration(duration))
# Total tests
- total = TestStats()
- for stats in self.stats_dict.values():
- total.accumulate(stats)
- stats = [f'run={total.tests_run:,}']
+ total = self.total_stats
+ text = f'run={total.tests_run:,}'
+ if filtered:
+ text = f"{text} (filtered)"
+ stats = [text]
if total.failures:
stats.append(f'failures={total.failures:,}')
if total.skipped:
@@ -659,23 +674,31 @@ class Regrtest:
print(f"Total tests: {' '.join(stats)}")
# Total test files
- report = [f'success={len(self.good)}']
- if self.bad:
- report.append(f'failed={len(self.bad)}')
- if self.environment_changed:
- report.append(f'env_changed={len(self.environment_changed)}')
- if self.skipped:
- report.append(f'skipped={len(self.skipped)}')
- if self.resource_denied:
- report.append(f'resource_denied={len(self.resource_denied)}')
- if self.rerun:
- report.append(f'rerun={len(self.rerun)}')
- if self.run_no_tests:
- report.append(f'run_no_tests={len(self.run_no_tests)}')
+ all_tests = [self.good, self.bad, self.rerun,
+ self.skipped,
+ self.environment_changed, self.run_no_tests]
+ run = sum(map(len, all_tests))
+ text = f'run={run}'
+ if not forever:
+ ntest = len(first_runtests.tests)
+ text = f"{text}/{ntest}"
+ if filtered:
+ text = f"{text} (filtered)"
+ report = [text]
+ for name, tests in (
+ ('failed', self.bad),
+ ('env_changed', self.environment_changed),
+ ('skipped', self.skipped),
+ ('resource_denied', self.resource_denied),
+ ('rerun', self.rerun),
+ ('run_no_tests', self.run_no_tests),
+ ):
+ if tests:
+ report.append(f'{name}={len(tests)}')
print(f"Total test files: {' '.join(report)}")
# Result
- result = self.get_tests_result()
+ result = self.get_tests_state()
print(f"Result: {result}")
def save_xml_result(self):
@@ -735,6 +758,9 @@ class Regrtest:
self.tmp_dir = os.path.abspath(self.tmp_dir)
+ def is_worker(self):
+ return (self.ns.worker_args is not None)
+
def create_temp_dir(self):
os.makedirs(self.tmp_dir, exist_ok=True)
@@ -747,7 +773,8 @@ class Regrtest:
nounce = random.randint(0, 1_000_000)
else:
nounce = os.getpid()
- if self.worker_test_name is not None:
+
+ if self.is_worker():
test_cwd = 'test_python_worker_{}'.format(nounce)
else:
test_cwd = 'test_python_{}'.format(nounce)
@@ -810,48 +837,53 @@ class Regrtest:
return None
+ def get_exitcode(self):
+ exitcode = 0
+ if self.bad:
+ exitcode = EXITCODE_BAD_TEST
+ elif self.interrupted:
+ exitcode = EXITCODE_INTERRUPTED
+ elif self.ns.fail_env_changed and self.environment_changed:
+ exitcode = EXITCODE_ENV_CHANGED
+ elif self.no_tests_run():
+ exitcode = EXITCODE_NO_TESTS_RAN
+ elif self.rerun and self.ns.fail_rerun:
+ exitcode = EXITCODE_RERUN_FAIL
+ return exitcode
+
+ def action_run_tests(self):
+ self.run_tests()
+ self.display_result()
+
+ need_rerun = self.need_rerun
+ if self.ns.rerun and need_rerun:
+ self.rerun_failed_tests(need_rerun)
+
+ self.display_summary()
+ self.finalize()
+
def _main(self, tests, kwargs):
- if self.worker_test_name is not None:
+ if self.is_worker():
from test.libregrtest.runtest_mp import run_tests_worker
- run_tests_worker(self.ns, self.worker_test_name)
+ run_tests_worker(self.ns.worker_args)
+ return
if self.ns.wait:
input("Press any key to continue...")
- support.PGO = self.ns.pgo
- support.PGO_EXTENDED = self.ns.pgo_extended
-
setup_tests(self.ns)
-
self.find_tests(tests)
+ exitcode = 0
if self.ns.list_tests:
self.list_tests()
- sys.exit(0)
-
- if self.ns.list_cases:
+ elif self.ns.list_cases:
self.list_cases()
- sys.exit(0)
-
- self.run_tests()
- self.display_result()
-
- if self.ns.verbose2 and self.bad:
- self.rerun_failed_tests()
-
- self.finalize()
-
- self.save_xml_result()
+ else:
+ self.action_run_tests()
+ exitcode = self.get_exitcode()
- if self.bad:
- sys.exit(EXITCODE_BAD_TEST)
- if self.interrupted:
- sys.exit(EXITCODE_INTERRUPTED)
- if self.ns.fail_env_changed and self.environment_changed:
- sys.exit(EXITCODE_ENV_CHANGED)
- if self.no_tests_run():
- sys.exit(EXITCODE_NO_TESTS_RAN)
- sys.exit(0)
+ sys.exit(exitcode)
def main(tests=None, **kwargs):
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
index 2c30269..16ae041 100644
--- a/Lib/test/libregrtest/runtest.py
+++ b/Lib/test/libregrtest/runtest.py
@@ -1,7 +1,6 @@
import dataclasses
import doctest
import faulthandler
-import functools
import gc
import importlib
import io
@@ -20,6 +19,10 @@ from test.libregrtest.save_env import saved_test_environment
from test.libregrtest.utils import clear_caches, format_duration, print_warning
+MatchTests = list[str]
+MatchTestsDict = dict[str, MatchTests]
+
+
# Avoid enum.Enum to reduce the number of imports when tests are run
class State:
PASSED = "PASSED"
@@ -56,6 +59,41 @@ class State:
State.MULTIPROCESSING_ERROR,
State.DID_NOT_RUN}
+ @staticmethod
+ def must_stop(state):
+ return state in {
+ State.INTERRUPTED,
+ State.MULTIPROCESSING_ERROR}
+
+
+# gh-90681: When rerunning tests, we might need to rerun the whole
+# class or module suite if some its life-cycle hooks fail.
+# Test level hooks are not affected.
+_TEST_LIFECYCLE_HOOKS = frozenset((
+ 'setUpClass', 'tearDownClass',
+ 'setUpModule', 'tearDownModule',
+))
+
+def normalize_test_name(test_full_name, *, is_error=False):
+ short_name = test_full_name.split(" ")[0]
+ if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
+ if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
+ # if setUpModule() or tearDownModule() failed, don't filter
+ # tests with the test file name, don't use use filters.
+ return None
+
+ # This means that we have a failure in a life-cycle hook,
+ # we need to rerun the whole module or class suite.
+ # Basically the error looks like this:
+ # ERROR: setUpClass (test.test_reg_ex.RegTest)
+ # or
+ # ERROR: setUpModule (test.test_reg_ex)
+ # So, we need to parse the class / module name.
+ lpar = test_full_name.index('(')
+ rpar = test_full_name.index(')')
+ return test_full_name[lpar + 1: rpar].split('.')[-1]
+ return short_name
+
@dataclasses.dataclass(slots=True)
class TestResult:
@@ -129,6 +167,58 @@ class TestResult:
if self.state is None or self.state == State.PASSED:
self.state = State.ENV_CHANGED
+ def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
+ if State.must_stop(self.state):
+ return True
+ if fail_fast and self.is_failed(fail_env_changed):
+ return True
+ return False
+
+ def get_rerun_match_tests(self):
+ match_tests = []
+
+ errors = self.errors or []
+ failures = self.failures or []
+ for error_list, is_error in (
+ (errors, True),
+ (failures, False),
+ ):
+ for full_name, *_ in error_list:
+ match_name = normalize_test_name(full_name, is_error=is_error)
+ if match_name is None:
+ # 'setUpModule (test.test_sys)': don't filter tests
+ return None
+ if not match_name:
+ error_type = "ERROR" if is_error else "FAIL"
+ print_warning(f"rerun failed to parse {error_type} test name: "
+ f"{full_name!r}: don't filter tests")
+ return None
+ match_tests.append(match_name)
+
+ return match_tests
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class RunTests:
+ tests: list[str]
+ match_tests: MatchTestsDict | None = None
+ rerun: bool = False
+ forever: bool = False
+
+ def get_match_tests(self, test_name) -> MatchTests | None:
+ if self.match_tests is not None:
+ return self.match_tests.get(test_name, None)
+ else:
+ return None
+
+ def iter_tests(self):
+ tests = tuple(self.tests)
+ if self.forever:
+ while True:
+ yield from tests
+ else:
+ yield from tests
+
# Minimum duration of a test to display its duration or to mention that
# the test is running in background
@@ -147,9 +237,6 @@ SPLITTESTDIRS = {
"test_multiprocessing_spawn",
}
-# Storage of uncollectable objects
-FOUND_GARBAGE = []
-
def findtestdir(path=None):
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
@@ -189,31 +276,41 @@ def split_test_packages(tests, *, testdir=None, exclude=(),
return splitted
-def get_abs_module(ns: Namespace, test_name: str) -> str:
- if test_name.startswith('test.') or ns.testdir:
+def abs_module_name(test_name: str, test_dir: str | None) -> str:
+ if test_name.startswith('test.') or test_dir:
return test_name
else:
# Import it from the test package
return 'test.' + test_name
-def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) -> None:
+def setup_support(ns: Namespace):
+ support.PGO = ns.pgo
+ support.PGO_EXTENDED = ns.pgo_extended
+ support.set_match_tests(ns.match_tests, ns.ignore_tests)
+ support.failfast = ns.failfast
+ support.verbose = ns.verbose
+ if ns.xmlpath:
+ support.junit_xml_list = []
+ else:
+ support.junit_xml_list = None
+
+
+def _runtest(result: TestResult, ns: Namespace) -> None:
# Capture stdout and stderr, set faulthandler timeout,
# and create JUnit XML report.
-
+ verbose = ns.verbose
output_on_failure = ns.verbose3
+ timeout = ns.timeout
use_timeout = (
- ns.timeout is not None and threading_helper.can_start_thread
+ timeout is not None and threading_helper.can_start_thread
)
if use_timeout:
- faulthandler.dump_traceback_later(ns.timeout, exit=True)
+ faulthandler.dump_traceback_later(timeout, exit=True)
try:
- support.set_match_tests(ns.match_tests, ns.ignore_tests)
- support.junit_xml_list = xml_list = [] if ns.xmlpath else None
- if ns.failfast:
- support.failfast = True
+ setup_support(ns)
if output_on_failure:
support.verbose = True
@@ -247,11 +344,10 @@ def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) ->
sys.stderr.flush()
else:
# Tell tests to be moderately quiet
- support.verbose = ns.verbose
-
- _runtest_env_changed_exc(result, ns,
- display_failure=not ns.verbose)
+ support.verbose = verbose
+ _runtest_env_changed_exc(result, ns, display_failure=not verbose)
+ xml_list = support.junit_xml_list
if xml_list:
import xml.etree.ElementTree as ET
result.xml_data = [ET.tostring(x).decode('us-ascii')
@@ -276,7 +372,7 @@ def runtest(ns: Namespace, test_name: str) -> TestResult:
start_time = time.perf_counter()
result = TestResult(test_name)
try:
- _runtest_capture_output_timeout_junit(result, ns)
+ _runtest(result, ns)
except:
if not ns.pgo:
msg = traceback.format_exc()
@@ -287,9 +383,9 @@ def runtest(ns: Namespace, test_name: str) -> TestResult:
return result
-def _test_module(the_module):
+def run_unittest(test_mod):
loader = unittest.TestLoader()
- tests = loader.loadTestsFromModule(the_module)
+ tests = loader.loadTestsFromModule(test_mod)
for error in loader.errors:
print(error, file=sys.stderr)
if loader.errors:
@@ -304,7 +400,6 @@ def save_env(ns: Namespace, test_name: str):
def regrtest_runner(result, test_func, ns) -> None:
# Run test_func(), collect statistics, and detect reference and memory
# leaks.
-
if ns.huntrleaks:
from test.libregrtest.refleak import dash_R
refleak, test_result = dash_R(ns, result.test_name, test_func)
@@ -332,23 +427,24 @@ def regrtest_runner(result, test_func, ns) -> None:
result.stats = stats
+# Storage of uncollectable objects
+FOUND_GARBAGE = []
+
+
def _load_run_test(result: TestResult, ns: Namespace) -> None:
# Load the test function, run the test function.
+ module_name = abs_module_name(result.test_name, ns.testdir)
- abstest = get_abs_module(ns, result.test_name)
-
- # remove the module from sys.module to reload it if it was already imported
- try:
- del sys.modules[abstest]
- except KeyError:
- pass
+ # Remove the module from sys.module to reload it if it was already imported
+ sys.modules.pop(module_name, None)
- the_module = importlib.import_module(abstest)
+ test_mod = importlib.import_module(module_name)
- if hasattr(the_module, "test_main"):
+ if hasattr(test_mod, "test_main"):
# https://github.com/python/cpython/issues/89392
raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest")
- test_func = functools.partial(_test_module, the_module)
+ def test_func():
+ return run_unittest(test_mod)
try:
with save_env(ns, result.test_name):
@@ -360,12 +456,12 @@ def _load_run_test(result: TestResult, ns: Namespace) -> None:
# failures.
support.gc_collect()
- cleanup_test_droppings(result.test_name, ns.verbose)
+ remove_testfn(result.test_name, ns.verbose)
if gc.garbage:
support.environment_altered = True
print_warning(f"{result.test_name} created {len(gc.garbage)} "
- f"uncollectable object(s).")
+ f"uncollectable object(s)")
# move the uncollectable objects somewhere,
# so we don't see them again
@@ -443,35 +539,37 @@ def _runtest_env_changed_exc(result: TestResult, ns: Namespace,
result.state = State.PASSED
-def cleanup_test_droppings(test_name: str, verbose: int) -> None:
- # Try to clean up junk commonly left behind. While tests shouldn't leave
- # any files or directories behind, when a test fails that can be tedious
- # for it to arrange. The consequences can be especially nasty on Windows,
- # since if a test leaves a file open, it cannot be deleted by name (while
- # there's nothing we can do about that here either, we can display the
- # name of the offending test, which is a real help).
- for name in (os_helper.TESTFN,):
- if not os.path.exists(name):
- continue
+def remove_testfn(test_name: str, verbose: int) -> None:
+ # Try to clean up os_helper.TESTFN if left behind.
+ #
+ # While tests shouldn't leave any files or directories behind, when a test
+ # fails that can be tedious for it to arrange. The consequences can be
+ # especially nasty on Windows, since if a test leaves a file open, it
+ # cannot be deleted by name (while there's nothing we can do about that
+ # here either, we can display the name of the offending test, which is a
+ # real help).
+ name = os_helper.TESTFN
+ if not os.path.exists(name):
+ return
- if os.path.isdir(name):
- import shutil
- kind, nuker = "directory", shutil.rmtree
- elif os.path.isfile(name):
- kind, nuker = "file", os.unlink
- else:
- raise RuntimeError(f"os.path says {name!r} exists but is neither "
- f"directory nor file")
-
- if verbose:
- print_warning(f"{test_name} left behind {kind} {name!r}")
- support.environment_altered = True
-
- try:
- import stat
- # fix possible permissions problems that might prevent cleanup
- os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
- nuker(name)
- except Exception as exc:
- print_warning(f"{test_name} left behind {kind} {name!r} "
- f"and it couldn't be removed: {exc}")
+ if os.path.isdir(name):
+ import shutil
+ kind, nuker = "directory", shutil.rmtree
+ elif os.path.isfile(name):
+ kind, nuker = "file", os.unlink
+ else:
+ raise RuntimeError(f"os.path says {name!r} exists but is neither "
+ f"directory nor file")
+
+ if verbose:
+ print_warning(f"{test_name} left behind {kind} {name!r}")
+ support.environment_altered = True
+
+ try:
+ import stat
+ # fix possible permissions problems that might prevent cleanup
+ os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
+ nuker(name)
+ except Exception as exc:
+ print_warning(f"{test_name} left behind {kind} {name!r} "
+ f"and it couldn't be removed: {exc}")
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
index fb1f80b..6008955 100644
--- a/Lib/test/libregrtest/runtest_mp.py
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -19,8 +19,8 @@ from test.support import TestStats
from test.libregrtest.cmdline import Namespace
from test.libregrtest.main import Regrtest
from test.libregrtest.runtest import (
- runtest, TestResult, State,
- PROGRESS_MIN_TIME)
+ runtest, TestResult, State, PROGRESS_MIN_TIME,
+ MatchTests, RunTests)
from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import format_duration, print_warning
@@ -44,26 +44,54 @@ JOIN_TIMEOUT = 30.0 # seconds
USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
-def must_stop(result: TestResult, ns: Namespace) -> bool:
- if result.state == State.INTERRUPTED:
- return True
- if ns.failfast and result.is_failed(ns.fail_env_changed):
- return True
- return False
+@dataclasses.dataclass(slots=True)
+class WorkerJob:
+ test_name: str
+ namespace: Namespace
+ rerun: bool = False
+ match_tests: MatchTests | None = None
-def parse_worker_args(worker_args) -> tuple[Namespace, str]:
- ns_dict, test_name = json.loads(worker_args)
- ns = Namespace(**ns_dict)
- return (ns, test_name)
+class _EncodeWorkerJob(json.JSONEncoder):
+ def default(self, o: Any) -> dict[str, Any]:
+ match o:
+ case WorkerJob():
+ result = dataclasses.asdict(o)
+ result["__worker_job__"] = True
+ return result
+ case Namespace():
+ result = vars(o)
+ result["__namespace__"] = True
+ return result
+ case _:
+ return super().default(o)
+
+
+def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]:
+ if "__worker_job__" in d:
+ d.pop('__worker_job__')
+ return WorkerJob(**d)
+ if "__namespace__" in d:
+ d.pop('__namespace__')
+ return Namespace(**d)
+ else:
+ return d
+
+
+def _parse_worker_args(worker_json: str) -> tuple[Namespace, str]:
+ return json.loads(worker_json,
+ object_hook=_decode_worker_job)
-def run_test_in_subprocess(testname: str, ns: Namespace, tmp_dir: str, stdout_fh: TextIO) -> subprocess.Popen:
- ns_dict = vars(ns)
- worker_args = (ns_dict, testname)
- worker_args = json.dumps(worker_args)
- if ns.python is not None:
- executable = ns.python
+def run_test_in_subprocess(worker_job: WorkerJob,
+ output_file: TextIO,
+ tmp_dir: str | None = None) -> subprocess.Popen:
+ ns = worker_job.namespace
+ python = ns.python
+ worker_args = json.dumps(worker_job, cls=_EncodeWorkerJob)
+
+ if python is not None:
+ executable = python
else:
executable = [sys.executable]
cmd = [*executable, *support.args_from_interpreter_flags(),
@@ -82,9 +110,9 @@ def run_test_in_subprocess(testname: str, ns: Namespace, tmp_dir: str, stdout_fh
# sysconfig.is_python_build() is true. See issue 15300.
kw = dict(
env=env,
- stdout=stdout_fh,
+ stdout=output_file,
# bpo-45410: Write stderr into stdout to keep messages order
- stderr=stdout_fh,
+ stderr=output_file,
text=True,
close_fds=(os.name != 'nt'),
cwd=os_helper.SAVEDCWD,
@@ -94,11 +122,27 @@ def run_test_in_subprocess(testname: str, ns: Namespace, tmp_dir: str, stdout_fh
return subprocess.Popen(cmd, **kw)
-def run_tests_worker(ns: Namespace, test_name: str) -> NoReturn:
+def run_tests_worker(worker_json: str) -> NoReturn:
+ worker_job = _parse_worker_args(worker_json)
+ ns = worker_job.namespace
+ test_name = worker_job.test_name
+ rerun = worker_job.rerun
+ match_tests = worker_job.match_tests
+
setup_tests(ns)
- result = runtest(ns, test_name)
+ if rerun:
+ if match_tests:
+ matching = "matching: " + ", ".join(match_tests)
+ print(f"Re-running {test_name} in verbose mode ({matching})", flush=True)
+ else:
+ print(f"Re-running {test_name} in verbose mode", flush=True)
+ ns.verbose = True
+ if match_tests is not None:
+ ns.match_tests = match_tests
+
+ result = runtest(ns, test_name)
print() # Force a newline (just in case)
# Serialize TestResult as dict in JSON
@@ -148,11 +192,13 @@ class TestWorkerProcess(threading.Thread):
def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
super().__init__()
self.worker_id = worker_id
+ self.runtests = runner.runtests
self.pending = runner.pending
self.output = runner.output
self.ns = runner.ns
self.timeout = runner.worker_timeout
self.regrtest = runner.regrtest
+ self.rerun = runner.rerun
self.current_test_name = None
self.start_time = None
self._popen = None
@@ -216,10 +262,11 @@ class TestWorkerProcess(threading.Thread):
) -> MultiprocessResult:
return MultiprocessResult(test_result, stdout, err_msg)
- def _run_process(self, test_name: str, tmp_dir: str, stdout_fh: TextIO) -> int:
- self.current_test_name = test_name
+ def _run_process(self, worker_job, output_file: TextIO,
+ tmp_dir: str | None = None) -> int:
+ self.current_test_name = worker_job.test_name
try:
- popen = run_test_in_subprocess(test_name, self.ns, tmp_dir, stdout_fh)
+ popen = run_test_in_subprocess(worker_job, output_file, tmp_dir)
self._killed = False
self._popen = popen
@@ -277,9 +324,15 @@ class TestWorkerProcess(threading.Thread):
else:
encoding = sys.stdout.encoding
+ match_tests = self.runtests.get_match_tests(test_name)
+
# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
- with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_fh:
+ with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file:
+ worker_job = WorkerJob(test_name,
+ namespace=self.ns,
+ rerun=self.rerun,
+ match_tests=match_tests)
# gh-93353: Check for leaked temporary files in the parent process,
# since the deletion of temporary files can happen late during
# Python finalization: too late for libregrtest.
@@ -290,17 +343,17 @@ class TestWorkerProcess(threading.Thread):
tmp_dir = tempfile.mkdtemp(prefix="test_python_")
tmp_dir = os.path.abspath(tmp_dir)
try:
- retcode = self._run_process(test_name, tmp_dir, stdout_fh)
+ retcode = self._run_process(worker_job, stdout_file, tmp_dir)
finally:
tmp_files = os.listdir(tmp_dir)
os_helper.rmtree(tmp_dir)
else:
- retcode = self._run_process(test_name, None, stdout_fh)
+ retcode = self._run_process(worker_job, stdout_file)
tmp_files = ()
- stdout_fh.seek(0)
+ stdout_file.seek(0)
try:
- stdout = stdout_fh.read().strip()
+ stdout = stdout_file.read().strip()
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
@@ -342,6 +395,8 @@ class TestWorkerProcess(threading.Thread):
return MultiprocessResult(result, stdout)
def run(self) -> None:
+ fail_fast = self.ns.failfast
+ fail_env_changed = self.ns.fail_env_changed
while not self._stopped:
try:
try:
@@ -354,7 +409,7 @@ class TestWorkerProcess(threading.Thread):
mp_result.result.duration = time.monotonic() - self.start_time
self.output.put((False, mp_result))
- if must_stop(mp_result.result, self.ns):
+ if mp_result.result.must_stop(fail_fast, fail_env_changed):
break
except ExitThread:
break
@@ -410,29 +465,36 @@ def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
class MultiprocessTestRunner:
- def __init__(self, regrtest: Regrtest) -> None:
+ def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None:
+ ns = regrtest.ns
+ timeout = ns.timeout
+
self.regrtest = regrtest
+ self.runtests = runtests
+ self.rerun = runtests.rerun
self.log = self.regrtest.log
- self.ns = regrtest.ns
+ self.ns = ns
self.output: queue.Queue[QueueOutput] = queue.Queue()
- self.pending = MultiprocessIterator(self.regrtest.tests)
- if self.ns.timeout is not None:
+ tests_iter = runtests.iter_tests()
+ self.pending = MultiprocessIterator(tests_iter)
+ if timeout is not None:
# Rely on faulthandler to kill a worker process. This timouet is
# when faulthandler fails to kill a worker process. Give a maximum
# of 5 minutes to faulthandler to kill the worker.
- self.worker_timeout = min(self.ns.timeout * 1.5,
- self.ns.timeout + 5 * 60)
+ self.worker_timeout = min(timeout * 1.5, timeout + 5 * 60)
else:
self.worker_timeout = None
self.workers = None
def start_workers(self) -> None:
+ use_mp = self.ns.use_mp
+ timeout = self.ns.timeout
self.workers = [TestWorkerProcess(index, self)
- for index in range(1, self.ns.use_mp + 1)]
+ for index in range(1, use_mp + 1)]
msg = f"Run tests in parallel using {len(self.workers)} child processes"
- if self.ns.timeout:
+ if timeout:
msg += (" (timeout: %s, worker timeout: %s)"
- % (format_duration(self.ns.timeout),
+ % (format_duration(timeout),
format_duration(self.worker_timeout)))
self.log(msg)
for worker in self.workers:
@@ -446,6 +508,7 @@ class MultiprocessTestRunner:
worker.wait_stopped(start_time)
def _get_result(self) -> QueueOutput | None:
+ pgo = self.ns.pgo
use_faulthandler = (self.ns.timeout is not None)
timeout = PROGRESS_UPDATE
@@ -464,7 +527,7 @@ class MultiprocessTestRunner:
# display progress
running = get_running(self.workers)
- if running and not self.ns.pgo:
+ if running and not pgo:
self.log('running: %s' % ', '.join(running))
# all worker threads are done: consume pending results
@@ -475,42 +538,46 @@ class MultiprocessTestRunner:
def display_result(self, mp_result: MultiprocessResult) -> None:
result = mp_result.result
+ pgo = self.ns.pgo
text = str(result)
if mp_result.err_msg:
# MULTIPROCESSING_ERROR
text += ' (%s)' % mp_result.err_msg
- elif (result.duration >= PROGRESS_MIN_TIME and not self.ns.pgo):
+ elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
text += ' (%s)' % format_duration(result.duration)
running = get_running(self.workers)
- if running and not self.ns.pgo:
+ if running and not pgo:
text += ' -- running: %s' % ', '.join(running)
self.regrtest.display_progress(self.test_index, text)
def _process_result(self, item: QueueOutput) -> bool:
"""Returns True if test runner must stop."""
+ rerun = self.runtests.rerun
if item[0]:
# Thread got an exception
format_exc = item[1]
print_warning(f"regrtest worker thread failed: {format_exc}")
result = TestResult("<regrtest worker>", state=State.MULTIPROCESSING_ERROR)
- self.regrtest.accumulate_result(result)
- return True
+ self.regrtest.accumulate_result(result, rerun=rerun)
+ return result
self.test_index += 1
mp_result = item[1]
- self.regrtest.accumulate_result(mp_result.result)
+ result = mp_result.result
+ self.regrtest.accumulate_result(result, rerun=rerun)
self.display_result(mp_result)
if mp_result.worker_stdout:
print(mp_result.worker_stdout, flush=True)
- if must_stop(mp_result.result, self.ns):
- return True
-
- return False
+ return result
def run_tests(self) -> None:
+ fail_fast = self.ns.failfast
+ fail_env_changed = self.ns.fail_env_changed
+ timeout = self.ns.timeout
+
self.start_workers()
self.test_index = 0
@@ -520,14 +587,14 @@ class MultiprocessTestRunner:
if item is None:
break
- stop = self._process_result(item)
- if stop:
+ result = self._process_result(item)
+ if result.must_stop(fail_fast, fail_env_changed):
break
except KeyboardInterrupt:
print()
self.regrtest.interrupted = True
finally:
- if self.ns.timeout is not None:
+ if timeout is not None:
faulthandler.cancel_dump_traceback_later()
# Always ensure that all worker processes are no longer
@@ -536,8 +603,8 @@ class MultiprocessTestRunner:
self.stop_workers()
-def run_tests_multiprocess(regrtest: Regrtest) -> None:
- MultiprocessTestRunner(regrtest).run_tests()
+def run_tests_multiprocess(regrtest: Regrtest, runtests: RunTests) -> None:
+ MultiprocessTestRunner(regrtest, runtests).run_tests()
class EncodeTestResult(json.JSONEncoder):
@@ -552,7 +619,7 @@ class EncodeTestResult(json.JSONEncoder):
return super().default(o)
-def decode_test_result(d: dict[str, Any]) -> TestResult | TestStats | dict[str, Any]:
+def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]:
"""Decode a TestResult (sub)class object from a JSON dict."""
if "__test_result__" not in d:
diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py
index fd46819..5e16b1a 100644
--- a/Lib/test/libregrtest/utils.py
+++ b/Lib/test/libregrtest/utils.py
@@ -31,7 +31,7 @@ def format_duration(seconds):
return ' '.join(parts)
-def removepy(names):
+def strip_py_suffix(names: list[str]):
if not names:
return
for idx, name in enumerate(names):
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 37d0fde..878b6fc 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -1191,7 +1191,6 @@ def _is_full_match_test(pattern):
def set_match_tests(accept_patterns=None, ignore_patterns=None):
global _match_test_func, _accept_test_patterns, _ignore_test_patterns
-
if accept_patterns is None:
accept_patterns = ()
if ignore_patterns is None:
diff --git a/Lib/test/support/testresult.py b/Lib/test/support/testresult.py
index 14474be..de23fdd 100644
--- a/Lib/test/support/testresult.py
+++ b/Lib/test/support/testresult.py
@@ -8,6 +8,7 @@ import sys
import time
import traceback
import unittest
+from test import support
class RegressionTestResult(unittest.TextTestResult):
USE_XML = False
@@ -112,6 +113,8 @@ class RegressionTestResult(unittest.TextTestResult):
def addFailure(self, test, err):
self._add_result(test, True, failure=self.__makeErrorDict(*err))
super().addFailure(test, err)
+ if support.failfast:
+ self.stop()
def addSkip(self, test, reason):
self._add_result(test, skipped=reason)
diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py
index 36278bd..1e7d434 100644
--- a/Lib/test/test_concurrent_futures/executor.py
+++ b/Lib/test/test_concurrent_futures/executor.py
@@ -53,6 +53,7 @@ class ExecutorTest:
self.assertEqual(i.__next__(), (0, 1))
self.assertRaises(ZeroDivisionError, i.__next__)
+ @support.requires_resource('walltime')
def test_map_timeout(self):
results = []
try:
diff --git a/Lib/test/test_concurrent_futures/test_wait.py b/Lib/test/test_concurrent_futures/test_wait.py
index e4bea8b..3f64ca1 100644
--- a/Lib/test/test_concurrent_futures/test_wait.py
+++ b/Lib/test/test_concurrent_futures/test_wait.py
@@ -3,6 +3,7 @@ import threading
import time
import unittest
from concurrent import futures
+from test import support
from .util import (
CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
@@ -53,6 +54,7 @@ class WaitTests:
finished)
self.assertEqual(set([future1]), pending)
+ @support.requires_resource('walltime')
def test_first_exception(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(sleep_and_raise, 1.5)
@@ -110,6 +112,7 @@ class WaitTests:
future2]), finished)
self.assertEqual(set(), pending)
+ @support.requires_resource('walltime')
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 6)
diff --git a/Lib/test/test_eintr.py b/Lib/test/test_eintr.py
index 5281478..49b15f1 100644
--- a/Lib/test/test_eintr.py
+++ b/Lib/test/test_eintr.py
@@ -9,6 +9,7 @@ from test.support import script_helper
class EINTRTests(unittest.TestCase):
@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
+ @support.requires_resource('walltime')
def test_all(self):
# Run the tester in a sub-process, to make sure there is only one
# thread (for reliable signal delivery).
diff --git a/Lib/test/test_faulthandler.py b/Lib/test/test_faulthandler.py
index 2e97de5..cfc7ce5 100644
--- a/Lib/test/test_faulthandler.py
+++ b/Lib/test/test_faulthandler.py
@@ -676,6 +676,7 @@ class FaultHandlerTests(unittest.TestCase):
with tempfile.TemporaryFile('wb+') as fp:
self.check_dump_traceback_later(fd=fp.fileno())
+ @support.requires_resource('walltime')
def test_dump_traceback_later_twice(self):
self.check_dump_traceback_later(loops=2)
diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py
index fe8105e..676725c 100644
--- a/Lib/test/test_httplib.py
+++ b/Lib/test/test_httplib.py
@@ -1954,6 +1954,7 @@ class HTTPSTest(TestCase):
h.close()
self.assertIn('nginx', server_string)
+ @support.requires_resource('walltime')
def test_networked_bad_cert(self):
# We feed a "CA" cert that is unrelated to the server's cert
import ssl
diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py
index cfa1c10..4b38355 100644
--- a/Lib/test/test_imaplib.py
+++ b/Lib/test/test_imaplib.py
@@ -10,7 +10,7 @@ import calendar
import threading
import socket
-from test.support import verbose, run_with_tz, run_with_locale, cpython_only
+from test.support import verbose, run_with_tz, run_with_locale, cpython_only, requires_resource
from test.support import hashlib_helper
from test.support import threading_helper
import unittest
@@ -457,6 +457,7 @@ class NewIMAPTestsMixin():
with self.imap_class(*server.server_address):
pass
+ @requires_resource('walltime')
def test_imaplib_timeout_test(self):
_, server = self._setup(SimpleIMAPHandler)
addr = server.server_address[1]
@@ -550,6 +551,7 @@ class NewIMAPSSLTests(NewIMAPTestsMixin, unittest.TestCase):
imap_class = IMAP4_SSL
server_class = SecureTCPServer
+ @requires_resource('walltime')
def test_ssl_raises(self):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self.assertEqual(ssl_context.verify_mode, ssl.CERT_REQUIRED)
@@ -564,6 +566,7 @@ class NewIMAPSSLTests(NewIMAPTestsMixin, unittest.TestCase):
ssl_context=ssl_context)
client.shutdown()
+ @requires_resource('walltime')
def test_ssl_verified(self):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_verify_locations(CAFILE)
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index fc56c1d..e032325 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -4478,10 +4478,12 @@ class CMiscIOTest(MiscIOTest):
self.assertFalse(err.strip('.!'))
@threading_helper.requires_working_threading()
+ @support.requires_resource('walltime')
def test_daemon_threads_shutdown_stdout_deadlock(self):
self.check_daemon_threads_shutdown_deadlock('stdout')
@threading_helper.requires_working_threading()
+ @support.requires_resource('walltime')
def test_daemon_threads_shutdown_stderr_deadlock(self):
self.check_daemon_threads_shutdown_deadlock('stderr')
@@ -4655,11 +4657,13 @@ class SignalsTest(unittest.TestCase):
os.close(r)
@requires_alarm
+ @support.requires_resource('walltime')
def test_interrupted_read_retry_buffered(self):
self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
mode="rb")
@requires_alarm
+ @support.requires_resource('walltime')
def test_interrupted_read_retry_text(self):
self.check_interrupted_read_retry(lambda x: x,
mode="r", encoding="latin1")
@@ -4733,10 +4737,12 @@ class SignalsTest(unittest.TestCase):
raise
@requires_alarm
+ @support.requires_resource('walltime')
def test_interrupted_write_retry_buffered(self):
self.check_interrupted_write_retry(b"x", mode="wb")
@requires_alarm
+ @support.requires_resource('walltime')
def test_interrupted_write_retry_text(self):
self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index def976f..b7f4c6e 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -680,6 +680,7 @@ class HandlerTest(BaseTest):
support.is_emscripten, "Emscripten cannot fstat unlinked files."
)
@threading_helper.requires_working_threading()
+ @support.requires_resource('walltime')
def test_race(self):
# Issue #14632 refers.
def remove_loop(fname, tries):
diff --git a/Lib/test/test_poll.py b/Lib/test/test_poll.py
index 02165a0..1847ae9 100644
--- a/Lib/test/test_poll.py
+++ b/Lib/test/test_poll.py
@@ -8,7 +8,7 @@ import threading
import time
import unittest
from test.support import (
- cpython_only, requires_subprocess, requires_working_socket
+ cpython_only, requires_subprocess, requires_working_socket, requires_resource
)
from test.support import threading_helper
from test.support.os_helper import TESTFN
@@ -124,6 +124,7 @@ class PollTests(unittest.TestCase):
# select(), modified to use poll() instead.
@requires_subprocess()
+ @requires_resource('walltime')
def test_poll2(self):
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py
index 68f7136..a72c052 100644
--- a/Lib/test/test_regrtest.py
+++ b/Lib/test/test_regrtest.py
@@ -5,6 +5,7 @@ Note: test_regrtest cannot be run twice in parallel.
"""
import contextlib
+import dataclasses
import glob
import io
import locale
@@ -21,6 +22,7 @@ from test import libregrtest
from test import support
from test.support import os_helper, TestStats
from test.libregrtest import utils, setup
+from test.libregrtest.runtest import normalize_test_name
if not support.has_subprocess_support:
raise unittest.SkipTest("test module requires subprocess")
@@ -32,6 +34,7 @@ LOG_PREFIX = r'[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?'
EXITCODE_BAD_TEST = 2
EXITCODE_ENV_CHANGED = 3
EXITCODE_NO_TESTS_RAN = 4
+EXITCODE_RERUN_FAIL = 5
EXITCODE_INTERRUPTED = 130
TEST_INTERRUPTED = textwrap.dedent("""
@@ -96,11 +99,11 @@ class ParseArgsTestCase(unittest.TestCase):
ns = libregrtest._parse_args([])
self.assertEqual(ns.verbose, 0)
- def test_verbose2(self):
- for opt in '-w', '--verbose2':
+ def test_rerun(self):
+ for opt in '-w', '--rerun', '--verbose2':
with self.subTest(opt=opt):
ns = libregrtest._parse_args([opt])
- self.assertTrue(ns.verbose2)
+ self.assertTrue(ns.rerun)
def test_verbose3(self):
for opt in '-W', '--verbose3':
@@ -362,6 +365,13 @@ class ParseArgsTestCase(unittest.TestCase):
'unrecognized arguments: --unknown-option')
+@dataclasses.dataclass(slots=True)
+class Rerun:
+ name: str
+ match: str | None
+ success: bool
+
+
class BaseTestCase(unittest.TestCase):
TEST_UNIQUE_ID = 1
TESTNAME_PREFIX = 'test_regrtest_'
@@ -423,11 +433,11 @@ class BaseTestCase(unittest.TestCase):
def check_executed_tests(self, output, tests, skipped=(), failed=(),
env_changed=(), omitted=(),
- rerun={}, run_no_tests=(),
+ rerun=None, run_no_tests=(),
resource_denied=(),
randomize=False, interrupted=False,
fail_env_changed=False,
- *, stats):
+ *, stats, forever=False, filtered=False):
if isinstance(tests, str):
tests = [tests]
if isinstance(skipped, str):
@@ -445,11 +455,20 @@ class BaseTestCase(unittest.TestCase):
if isinstance(stats, int):
stats = TestStats(stats)
+ rerun_failed = []
+ if rerun is not None:
+ failed = [rerun.name]
+ if not rerun.success:
+ rerun_failed.append(rerun.name)
+
executed = self.parse_executed_tests(output)
+ total_tests = list(tests)
+ if rerun is not None:
+ total_tests.append(rerun.name)
if randomize:
- self.assertEqual(set(executed), set(tests), output)
+ self.assertEqual(set(executed), set(total_tests), output)
else:
- self.assertEqual(executed, tests, output)
+ self.assertEqual(executed, total_tests, output)
def plural(count):
return 's' if count != 1 else ''
@@ -465,6 +484,10 @@ class BaseTestCase(unittest.TestCase):
regex = list_regex('%s test%s skipped', skipped)
self.check_line(output, regex)
+ if resource_denied:
+ regex = list_regex(r'%s test%s skipped \(resource denied\)', resource_denied)
+ self.check_line(output, regex)
+
if failed:
regex = list_regex('%s test%s failed', failed)
self.check_line(output, regex)
@@ -478,32 +501,36 @@ class BaseTestCase(unittest.TestCase):
regex = list_regex('%s test%s omitted', omitted)
self.check_line(output, regex)
- if rerun:
- regex = list_regex('%s re-run test%s', rerun.keys())
+ if rerun is not None:
+ regex = list_regex('%s re-run test%s', [rerun.name])
self.check_line(output, regex)
- regex = LOG_PREFIX + r"Re-running failed tests in verbose mode"
+ regex = LOG_PREFIX + fr"Re-running 1 failed tests in verbose mode"
+ self.check_line(output, regex)
+ regex = fr"Re-running {rerun.name} in verbose mode"
+ if rerun.match:
+ regex = fr"{regex} \(matching: {rerun.match}\)"
self.check_line(output, regex)
- for name, match in rerun.items():
- regex = LOG_PREFIX + f"Re-running {name} in verbose mode \\(matching: {match}\\)"
- self.check_line(output, regex)
if run_no_tests:
regex = list_regex('%s test%s run no tests', run_no_tests)
self.check_line(output, regex)
- good = (len(tests) - len(skipped) - len(failed)
+ good = (len(tests) - len(skipped) - len(resource_denied) - len(failed)
- len(omitted) - len(env_changed) - len(run_no_tests))
if good:
- regex = r'%s test%s OK\.$' % (good, plural(good))
- if not skipped and not failed and good > 1:
+ regex = r'%s test%s OK\.' % (good, plural(good))
+ if not skipped and not failed and (rerun is None or rerun.success) and good > 1:
regex = 'All %s' % regex
- self.check_line(output, regex)
+ self.check_line(output, regex, full=True)
if interrupted:
self.check_line(output, 'Test suite interrupted by signal SIGINT.')
# Total tests
- parts = [f'run={stats.tests_run:,}']
+ text = f'run={stats.tests_run:,}'
+ if filtered:
+ text = fr'{text} \(filtered\)'
+ parts = [text]
if stats.failures:
parts.append(f'failures={stats.failures:,}')
if stats.skipped:
@@ -512,39 +539,52 @@ class BaseTestCase(unittest.TestCase):
self.check_line(output, line, full=True)
# Total test files
- report = [f'success={good}']
- if failed:
- report.append(f'failed={len(failed)}')
- if env_changed:
- report.append(f'env_changed={len(env_changed)}')
- if skipped:
- report.append(f'skipped={len(skipped)}')
- if resource_denied:
- report.append(f'resource_denied={len(resource_denied)}')
- if rerun:
- report.append(f'rerun={len(rerun)}')
- if run_no_tests:
- report.append(f'run_no_tests={len(run_no_tests)}')
+ run = len(total_tests) - len(resource_denied)
+ if rerun is not None:
+ total_failed = len(rerun_failed)
+ total_rerun = 1
+ else:
+ total_failed = len(failed)
+ total_rerun = 0
+ if interrupted:
+ run = 0
+ text = f'run={run}'
+ if not forever:
+ text = f'{text}/{len(tests)}'
+ if filtered:
+ text = fr'{text} \(filtered\)'
+ report = [text]
+ for name, ntest in (
+ ('failed', total_failed),
+ ('env_changed', len(env_changed)),
+ ('skipped', len(skipped)),
+ ('resource_denied', len(resource_denied)),
+ ('rerun', total_rerun),
+ ('run_no_tests', len(run_no_tests)),
+ ):
+ if ntest:
+ report.append(f'{name}={ntest}')
line = fr'Total test files: {" ".join(report)}'
self.check_line(output, line, full=True)
# Result
- result = []
+ state = []
if failed:
- result.append('FAILURE')
+ state.append('FAILURE')
elif fail_env_changed and env_changed:
- result.append('ENV CHANGED')
+ state.append('ENV CHANGED')
if interrupted:
- result.append('INTERRUPTED')
- if not any((good, result, failed, interrupted, skipped,
+ state.append('INTERRUPTED')
+ if not any((good, failed, interrupted, skipped,
env_changed, fail_env_changed)):
- result.append("NO TESTS RAN")
- elif not result:
- result.append('SUCCESS')
- result = ', '.join(result)
- if rerun:
- result = 'FAILURE then %s' % result
- self.check_line(output, f'Result: {result}', full=True)
+ state.append("NO TESTS RAN")
+ elif not state:
+ state.append('SUCCESS')
+ state = ', '.join(state)
+ if rerun is not None:
+ new_state = 'SUCCESS' if rerun.success else 'FAILURE'
+ state = 'FAILURE then ' + new_state
+ self.check_line(output, f'Result: {state}', full=True)
def parse_random_seed(self, output):
match = self.regex_search(r'Using random seed ([0-9]+)', output)
@@ -563,13 +603,13 @@ class BaseTestCase(unittest.TestCase):
stdout=subprocess.PIPE,
**kw)
if proc.returncode != exitcode:
- msg = ("Command %s failed with exit code %s\n"
+ msg = ("Command %s failed with exit code %s, but exit code %s expected!\n"
"\n"
"stdout:\n"
"---\n"
"%s\n"
"---\n"
- % (str(args), proc.returncode, proc.stdout))
+ % (str(args), proc.returncode, exitcode, proc.stdout))
if proc.stderr:
msg += ("\n"
"stderr:\n"
@@ -734,6 +774,40 @@ class ArgsTestCase(BaseTestCase):
cmdargs = ['-m', 'test', '--testdir=%s' % self.tmptestdir, *testargs]
return self.run_python(cmdargs, **kw)
+ def test_success(self):
+ code = textwrap.dedent("""
+ import unittest
+
+ class PassingTests(unittest.TestCase):
+ def test_test1(self):
+ pass
+
+ def test_test2(self):
+ pass
+
+ def test_test3(self):
+ pass
+ """)
+ tests = [self.create_test(f'ok{i}', code=code) for i in range(1, 6)]
+
+ output = self.run_tests(*tests)
+ self.check_executed_tests(output, tests,
+ stats=3 * len(tests))
+
+ def test_skip(self):
+ code = textwrap.dedent("""
+ import unittest
+ raise unittest.SkipTest("nope")
+ """)
+ test_ok = self.create_test('ok')
+ test_skip = self.create_test('skip', code=code)
+ tests = [test_ok, test_skip]
+
+ output = self.run_tests(*tests)
+ self.check_executed_tests(output, tests,
+ skipped=[test_skip],
+ stats=1)
+
def test_failing_test(self):
# test a failing test
code = textwrap.dedent("""
@@ -773,14 +847,12 @@ class ArgsTestCase(BaseTestCase):
# -u audio: 1 resource enabled
output = self.run_tests('-uaudio', *test_names)
self.check_executed_tests(output, test_names,
- skipped=tests['network'],
resource_denied=tests['network'],
stats=1)
# no option: 0 resources enabled
- output = self.run_tests(*test_names)
+ output = self.run_tests(*test_names, exitcode=EXITCODE_NO_TESTS_RAN)
self.check_executed_tests(output, test_names,
- skipped=test_names,
resource_denied=test_names,
stats=0)
@@ -926,9 +998,21 @@ class ArgsTestCase(BaseTestCase):
builtins.__dict__['RUN'] = 1
""")
test = self.create_test('forever', code=code)
+
+ # --forever
output = self.run_tests('--forever', test, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [test]*3, failed=test,
- stats=TestStats(1, 1))
+ stats=TestStats(3, 1),
+ forever=True)
+
+ # --forever --rerun
+ output = self.run_tests('--forever', '--rerun', test, exitcode=0)
+ self.check_executed_tests(output, [test]*3,
+ rerun=Rerun(test,
+ match='test_run',
+ success=True),
+ stats=TestStats(4, 1),
+ forever=True)
def check_leak(self, code, what):
test = self.create_test('huntrleaks', code=code)
@@ -1139,33 +1223,55 @@ class ArgsTestCase(BaseTestCase):
""")
testname = self.create_test(code=code)
- output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
+ output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
- failed=testname,
- rerun={testname: "test_fail_always"},
- stats=TestStats(1, 1))
+ rerun=Rerun(testname,
+ "test_fail_always",
+ success=False),
+ stats=TestStats(3, 2))
def test_rerun_success(self):
# FAILURE then SUCCESS
- code = textwrap.dedent("""
- import builtins
+ marker_filename = os.path.abspath("regrtest_marker_filename")
+ self.addCleanup(os_helper.unlink, marker_filename)
+ self.assertFalse(os.path.exists(marker_filename))
+
+ code = textwrap.dedent(f"""
+ import os.path
import unittest
+ marker_filename = {marker_filename!r}
+
class Tests(unittest.TestCase):
def test_succeed(self):
return
def test_fail_once(self):
- if not hasattr(builtins, '_test_failed'):
- builtins._test_failed = True
+ if not os.path.exists(marker_filename):
+ open(marker_filename, "w").close()
self.fail("bug")
""")
testname = self.create_test(code=code)
- output = self.run_tests("-w", testname, exitcode=0)
+ # FAILURE then SUCCESS => exit code 0
+ output = self.run_tests("--rerun", testname, exitcode=0)
self.check_executed_tests(output, [testname],
- rerun={testname: "test_fail_once"},
- stats=1)
+ rerun=Rerun(testname,
+ match="test_fail_once",
+ success=True),
+ stats=TestStats(3, 1))
+ os_helper.unlink(marker_filename)
+
+ # with --fail-rerun, exit code EXITCODE_RERUN_FAIL
+ # on "FAILURE then SUCCESS" state.
+ output = self.run_tests("--rerun", "--fail-rerun", testname,
+ exitcode=EXITCODE_RERUN_FAIL)
+ self.check_executed_tests(output, [testname],
+ rerun=Rerun(testname,
+ match="test_fail_once",
+ success=True),
+ stats=TestStats(3, 1))
+ os_helper.unlink(marker_filename)
def test_rerun_setup_class_hook_failure(self):
# FAILURE then FAILURE
@@ -1182,10 +1288,12 @@ class ArgsTestCase(BaseTestCase):
""")
testname = self.create_test(code=code)
- output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
+ output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "ExampleTests"},
+ rerun=Rerun(testname,
+ match="ExampleTests",
+ success=False),
stats=0)
def test_rerun_teardown_class_hook_failure(self):
@@ -1203,11 +1311,13 @@ class ArgsTestCase(BaseTestCase):
""")
testname = self.create_test(code=code)
- output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
+ output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "ExampleTests"},
- stats=1)
+ rerun=Rerun(testname,
+ match="ExampleTests",
+ success=False),
+ stats=2)
def test_rerun_setup_module_hook_failure(self):
# FAILURE then FAILURE
@@ -1223,10 +1333,12 @@ class ArgsTestCase(BaseTestCase):
""")
testname = self.create_test(code=code)
- output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
+ output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: testname},
+ rerun=Rerun(testname,
+ match=None,
+ success=False),
stats=0)
def test_rerun_teardown_module_hook_failure(self):
@@ -1243,11 +1355,13 @@ class ArgsTestCase(BaseTestCase):
""")
testname = self.create_test(code=code)
- output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
- self.check_executed_tests(output, testname,
+ output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST)
+ self.check_executed_tests(output, [testname],
failed=[testname],
- rerun={testname: testname},
- stats=1)
+ rerun=Rerun(testname,
+ match=None,
+ success=False),
+ stats=2)
def test_rerun_setup_hook_failure(self):
# FAILURE then FAILURE
@@ -1263,11 +1377,13 @@ class ArgsTestCase(BaseTestCase):
""")
testname = self.create_test(code=code)
- output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
+ output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "test_success"},
- stats=1)
+ rerun=Rerun(testname,
+ match="test_success",
+ success=False),
+ stats=2)
def test_rerun_teardown_hook_failure(self):
# FAILURE then FAILURE
@@ -1283,11 +1399,13 @@ class ArgsTestCase(BaseTestCase):
""")
testname = self.create_test(code=code)
- output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
+ output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "test_success"},
- stats=1)
+ rerun=Rerun(testname,
+ match="test_success",
+ success=False),
+ stats=2)
def test_rerun_async_setup_hook_failure(self):
# FAILURE then FAILURE
@@ -1303,11 +1421,12 @@ class ArgsTestCase(BaseTestCase):
""")
testname = self.create_test(code=code)
- output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
+ output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
- failed=[testname],
- rerun={testname: "test_success"},
- stats=1)
+ rerun=Rerun(testname,
+ match="test_success",
+ success=False),
+ stats=2)
def test_rerun_async_teardown_hook_failure(self):
# FAILURE then FAILURE
@@ -1323,11 +1442,13 @@ class ArgsTestCase(BaseTestCase):
""")
testname = self.create_test(code=code)
- output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
+ output = self.run_tests("--rerun", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "test_success"},
- stats=1)
+ rerun=Rerun(testname,
+ match="test_success",
+ success=False),
+ stats=2)
def test_no_tests_ran(self):
code = textwrap.dedent("""
@@ -1343,7 +1464,7 @@ class ArgsTestCase(BaseTestCase):
exitcode=EXITCODE_NO_TESTS_RAN)
self.check_executed_tests(output, [testname],
run_no_tests=testname,
- stats=0)
+ stats=0, filtered=True)
def test_no_tests_ran_skip(self):
code = textwrap.dedent("""
@@ -1374,7 +1495,7 @@ class ArgsTestCase(BaseTestCase):
exitcode=EXITCODE_NO_TESTS_RAN)
self.check_executed_tests(output, [testname, testname2],
run_no_tests=[testname, testname2],
- stats=0)
+ stats=0, filtered=True)
def test_no_test_ran_some_test_exist_some_not(self):
code = textwrap.dedent("""
@@ -1398,7 +1519,7 @@ class ArgsTestCase(BaseTestCase):
"-m", "test_other_bug", exitcode=0)
self.check_executed_tests(output, [testname, testname2],
run_no_tests=[testname],
- stats=1)
+ stats=1, filtered=True)
@support.cpython_only
def test_uncollectable(self):
@@ -1715,6 +1836,17 @@ class TestUtils(unittest.TestCase):
self.assertEqual(utils.format_duration(3 * 3600 + 1),
'3 hour 1 sec')
+ def test_normalize_test_name(self):
+ normalize = normalize_test_name
+ self.assertEqual(normalize('test_access (test.test_os.FileTests.test_access)'),
+ 'test_access')
+ self.assertEqual(normalize('setUpClass (test.test_os.ChownFileTests)', is_error=True),
+ 'ChownFileTests')
+ self.assertEqual(normalize('test_success (test.test_bug.ExampleTests.test_success)', is_error=True),
+ 'test_success')
+ self.assertIsNone(normalize('setUpModule (test.test_x)', is_error=True))
+ self.assertIsNone(normalize('tearDownModule (test.test_module)', is_error=True))
+
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index 25afd6a..2a1a1ee 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -745,6 +745,7 @@ class SiginterruptTest(unittest.TestCase):
interrupted = self.readpipe_interrupted(True)
self.assertTrue(interrupted)
+ @support.requires_resource('walltime')
def test_siginterrupt_off(self):
# If a signal handler is installed and siginterrupt is called with
# a false value for the second argument, when that signal arrives, it
diff --git a/Lib/test/test_smtpnet.py b/Lib/test/test_smtpnet.py
index 72f51cd..2e0dc1a 100644
--- a/Lib/test/test_smtpnet.py
+++ b/Lib/test/test_smtpnet.py
@@ -61,6 +61,7 @@ class SmtpSSLTest(unittest.TestCase):
server.ehlo()
server.quit()
+ @support.requires_resource('walltime')
def test_connect_using_sslcontext(self):
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 4e49dc5..2c32fec 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -2182,6 +2182,7 @@ class NetworkedTests(unittest.TestCase):
self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'Needs IPv6')
+ @support.requires_resource('walltime')
def test_get_server_certificate_ipv6(self):
with socket_helper.transient_internet('ipv6.google.com'):
_test_get_server_certificate(self, 'ipv6.google.com', 443)
@@ -2740,6 +2741,7 @@ def try_protocol_combo(server_protocol, client_protocol, expect_success,
class ThreadedTests(unittest.TestCase):
+ @support.requires_resource('walltime')
def test_echo(self):
"""Basic test of an SSL client connecting to a server"""
if support.verbose:
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
index 0b9e9e1..d95ef72 100644
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -269,6 +269,7 @@ class ProcessTestCase(BaseTestCase):
self.assertIn('stdin', c.exception.args[0])
self.assertIn('input', c.exception.args[0])
+ @support.requires_resource('walltime')
def test_check_output_timeout(self):
# check_output() function with timeout arg
with self.assertRaises(subprocess.TimeoutExpired) as c:
@@ -1643,6 +1644,7 @@ class RunFuncTestCase(BaseTestCase):
self.assertIn('stdin', c.exception.args[0])
self.assertIn('input', c.exception.args[0])
+ @support.requires_resource('walltime')
def test_check_output_timeout(self):
with self.assertRaises(subprocess.TimeoutExpired) as c:
cp = self.run_python((
diff --git a/Lib/test/test_urllib2net.py b/Lib/test/test_urllib2net.py
index d8d882b..f0874d8 100644
--- a/Lib/test/test_urllib2net.py
+++ b/Lib/test/test_urllib2net.py
@@ -133,6 +133,7 @@ class OtherNetworkTests(unittest.TestCase):
# XXX The rest of these tests aren't very good -- they don't check much.
# They do sometimes catch some major disasters, though.
+ @support.requires_resource('walltime')
def test_ftp(self):
# Testing the same URL twice exercises the caching in CacheFTPHandler
urls = [
@@ -196,6 +197,7 @@ class OtherNetworkTests(unittest.TestCase):
self.assertEqual(res.geturl(),
"http://www.pythontest.net/index.html#frag")
+ @support.requires_resource('walltime')
def test_redirect_url_withfrag(self):
redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/"
with socket_helper.transient_internet(redirect_url_with_frag):
@@ -334,6 +336,7 @@ class TimeoutTest(unittest.TestCase):
FTP_HOST = 'ftp://www.pythontest.net/'
+ @support.requires_resource('walltime')
def test_ftp_basic(self):
self.assertIsNone(socket.getdefaulttimeout())
with socket_helper.transient_internet(self.FTP_HOST, timeout=None):
@@ -352,6 +355,7 @@ class TimeoutTest(unittest.TestCase):
socket.setdefaulttimeout(None)
self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
+ @support.requires_resource('walltime')
def test_ftp_no_timeout(self):
self.assertIsNone(socket.getdefaulttimeout())
with socket_helper.transient_internet(self.FTP_HOST):
@@ -363,6 +367,7 @@ class TimeoutTest(unittest.TestCase):
socket.setdefaulttimeout(None)
self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
+ @support.requires_resource('walltime')
def test_ftp_timeout(self):
with socket_helper.transient_internet(self.FTP_HOST):
u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
diff --git a/Lib/test/test_urllibnet.py b/Lib/test/test_urllibnet.py
index 773101c..49a3b5a 100644
--- a/Lib/test/test_urllibnet.py
+++ b/Lib/test/test_urllibnet.py
@@ -109,6 +109,7 @@ class urlopenNetworkTests(unittest.TestCase):
open_url.close()
self.assertEqual(code, 404)
+ @support.requires_resource('walltime')
def test_bad_address(self):
# Make sure proper exception is raised when connecting to a bogus
# address.
@@ -191,6 +192,7 @@ class urlretrieveNetworkTests(unittest.TestCase):
logo = "http://www.pythontest.net/"
+ @support.requires_resource('walltime')
def test_data_header(self):
with self.urlretrieve(self.logo) as (file_location, fileheaders):
datevalue = fileheaders.get('Date')
diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py
index 9ff5545..7f517dc 100644
--- a/Lib/test/test_xmlrpc.py
+++ b/Lib/test/test_xmlrpc.py
@@ -1031,38 +1031,47 @@ class MultiPathServerTestCase(BaseServerTestCase):
self.assertEqual(p.add(6,8), 6+8)
self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
+ @support.requires_resource('walltime')
def test_path3(self):
p = xmlrpclib.ServerProxy(URL+"/is/broken")
self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
+ @support.requires_resource('walltime')
def test_invalid_path(self):
p = xmlrpclib.ServerProxy(URL+"/invalid")
self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
+ @support.requires_resource('walltime')
def test_path_query_fragment(self):
p = xmlrpclib.ServerProxy(URL+"/foo?k=v#frag")
self.assertEqual(p.test(), "/foo?k=v#frag")
+ @support.requires_resource('walltime')
def test_path_fragment(self):
p = xmlrpclib.ServerProxy(URL+"/foo#frag")
self.assertEqual(p.test(), "/foo#frag")
+ @support.requires_resource('walltime')
def test_path_query(self):
p = xmlrpclib.ServerProxy(URL+"/foo?k=v")
self.assertEqual(p.test(), "/foo?k=v")
+ @support.requires_resource('walltime')
def test_empty_path(self):
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.test(), "/RPC2")
+ @support.requires_resource('walltime')
def test_root_path(self):
p = xmlrpclib.ServerProxy(URL + "/")
self.assertEqual(p.test(), "/")
+ @support.requires_resource('walltime')
def test_empty_path_query(self):
p = xmlrpclib.ServerProxy(URL + "?k=v")
self.assertEqual(p.test(), "?k=v")
+ @support.requires_resource('walltime')
def test_empty_path_fragment(self):
p = xmlrpclib.ServerProxy(URL + "#frag")
self.assertEqual(p.test(), "#frag")
diff --git a/Misc/NEWS.d/next/Tests/2023-09-03-02-01-55.gh-issue-108834.iAwXzj.rst b/Misc/NEWS.d/next/Tests/2023-09-03-02-01-55.gh-issue-108834.iAwXzj.rst
new file mode 100644
index 0000000..43b9948
--- /dev/null
+++ b/Misc/NEWS.d/next/Tests/2023-09-03-02-01-55.gh-issue-108834.iAwXzj.rst
@@ -0,0 +1,6 @@
+When regrtest reruns failed tests in verbose mode (``./python -m test
+--rerun``), tests are now rerun in fresh worker processes rather than being
+executed in the main process. If a test does crash or is killed by a timeout,
+the main process can detect and handle the killed worker process. Tests are
+rerun in parallel if the ``-jN`` option is used to run tests in parallel.
+Patch by Victor Stinner.
diff --git a/Misc/NEWS.d/next/Tests/2023-09-03-06-17-12.gh-issue-108834.fjV-CJ.rst b/Misc/NEWS.d/next/Tests/2023-09-03-06-17-12.gh-issue-108834.fjV-CJ.rst
new file mode 100644
index 0000000..734cc66
--- /dev/null
+++ b/Misc/NEWS.d/next/Tests/2023-09-03-06-17-12.gh-issue-108834.fjV-CJ.rst
@@ -0,0 +1,2 @@
+Rename regrtest ``--verbose2`` option (``-w``) to ``--rerun``. Keep
+``--verbose2`` as a deprecated alias. Patch by Victor Stinner.
diff --git a/Misc/NEWS.d/next/Tests/2023-09-03-20-15-49.gh-issue-108834.Osvmhf.rst b/Misc/NEWS.d/next/Tests/2023-09-03-20-15-49.gh-issue-108834.Osvmhf.rst
new file mode 100644
index 0000000..098861f
--- /dev/null
+++ b/Misc/NEWS.d/next/Tests/2023-09-03-20-15-49.gh-issue-108834.Osvmhf.rst
@@ -0,0 +1,3 @@
+Add ``--fail-rerun option`` option to regrtest: if a test failed when then
+passed when rerun in verbose mode, exit the process with exit code 2
+(error), instead of exit code 0 (success). Patch by Victor Stinner.