summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/test/libregrtest/main.py88
-rw-r--r--Lib/test/libregrtest/refleak.py3
-rw-r--r--Lib/test/libregrtest/runtest.py274
-rw-r--r--Lib/test/libregrtest/runtest_mp.py85
-rw-r--r--Lib/test/libregrtest/save_env.py4
-rw-r--r--Lib/test/libregrtest/utils.py7
-rw-r--r--Lib/test/libregrtest/win_utils.py5
-rw-r--r--Lib/test/test_regrtest.py28
8 files changed, 286 insertions, 208 deletions
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index d20e174..ef1336a 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -105,26 +105,30 @@ class Regrtest:
# used by --junit-xml
self.testsuite_xml = None
- def accumulate_result(self, test, result):
- ok, test_time, xml_data = result
+ def accumulate_result(self, result):
+ test_name = result.test_name
+ ok = result.result
+
if ok not in (CHILD_ERROR, INTERRUPTED):
- self.test_times.append((test_time, test))
+ self.test_times.append((result.test_time, test_name))
+
if ok == PASSED:
- self.good.append(test)
+ self.good.append(test_name)
elif ok in (FAILED, CHILD_ERROR):
- self.bad.append(test)
+ self.bad.append(test_name)
elif ok == ENV_CHANGED:
- self.environment_changed.append(test)
+ self.environment_changed.append(test_name)
elif ok == SKIPPED:
- self.skipped.append(test)
+ self.skipped.append(test_name)
elif ok == RESOURCE_DENIED:
- self.skipped.append(test)
- self.resource_denieds.append(test)
+ self.skipped.append(test_name)
+ self.resource_denieds.append(test_name)
elif ok == TEST_DID_NOT_RUN:
- self.run_no_tests.append(test)
+ self.run_no_tests.append(test_name)
elif ok != INTERRUPTED:
raise ValueError("invalid test result: %r" % ok)
+ xml_data = result.xml_data
if xml_data:
import xml.etree.ElementTree as ET
for e in xml_data:
@@ -134,7 +138,7 @@ class Regrtest:
print(xml_data, file=sys.__stderr__)
raise
- def display_progress(self, test_index, test):
+ def display_progress(self, test_index, text):
if self.ns.quiet:
return
@@ -143,7 +147,7 @@ class Regrtest:
fails = len(self.bad) + len(self.environment_changed)
if fails and not self.ns.pgo:
line = f"{line}/{fails}"
- line = f"[{line}] {test}"
+ line = f"[{line}] {text}"
# add the system load prefix: "load avg: 1.80 "
if self.getloadavg:
@@ -275,13 +279,13 @@ class Regrtest:
support.verbose = False
support.set_match_tests(self.ns.match_tests)
- for test in self.selected:
- abstest = get_abs_module(self.ns, test)
+ for test_name in self.selected:
+ abstest = get_abs_module(self.ns, test_name)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
self._list_cases(suite)
except unittest.SkipTest:
- self.skipped.append(test)
+ self.skipped.append(test_name)
if self.skipped:
print(file=sys.stderr)
@@ -298,19 +302,19 @@ class Regrtest:
print()
print("Re-running failed tests in verbose mode")
self.rerun = self.bad[:]
- for test in self.rerun:
- print("Re-running test %r in verbose mode" % test, flush=True)
- try:
- self.ns.verbose = True
- ok = runtest(self.ns, test)
- except KeyboardInterrupt:
- self.interrupted = True
+ for test_name in self.rerun:
+ print("Re-running test %r in verbose mode" % test_name, flush=True)
+ self.ns.verbose = True
+ ok = runtest(self.ns, test_name)
+
+ if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
+ self.bad.remove(test_name)
+
+ if ok.result == INTERRUPTED:
# print a newline separate from the ^C
print()
+ self.interrupted = True
break
- else:
- if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
- self.bad.remove(test)
else:
if self.bad:
print(count(len(self.bad), 'test'), "failed again:")
@@ -348,8 +352,8 @@ class Regrtest:
self.test_times.sort(reverse=True)
print()
print("10 slowest tests:")
- for time, test in self.test_times[:10]:
- print("- %s: %s" % (test, format_duration(time)))
+ for test_time, test in self.test_times[:10]:
+ print("- %s: %s" % (test, format_duration(test_time)))
if self.bad:
print()
@@ -387,10 +391,10 @@ class Regrtest:
print("Run tests sequentially")
previous_test = None
- for test_index, test in enumerate(self.tests, 1):
+ for test_index, test_name in enumerate(self.tests, 1):
start_time = time.monotonic()
- text = test
+ text = test_name
if previous_test:
text = '%s -- %s' % (text, previous_test)
self.display_progress(test_index, text)
@@ -398,22 +402,20 @@ class Regrtest:
if self.tracer:
# If we're tracing code coverage, then we don't exit with status
# if on a false return value from main.
- cmd = ('result = runtest(self.ns, test); '
- 'self.accumulate_result(test, result)')
+ cmd = ('result = runtest(self.ns, test_name); '
+ 'self.accumulate_result(result)')
ns = dict(locals())
self.tracer.runctx(cmd, globals=globals(), locals=ns)
result = ns['result']
else:
- try:
- result = runtest(self.ns, test)
- except KeyboardInterrupt:
- self.interrupted = True
- self.accumulate_result(test, (INTERRUPTED, None, None))
- break
- else:
- self.accumulate_result(test, result)
-
- previous_test = format_test_result(test, result[0])
+ result = runtest(self.ns, test_name)
+ self.accumulate_result(result)
+
+ if result.result == INTERRUPTED:
+ self.interrupted = True
+ break
+
+ previous_test = format_test_result(result)
test_time = time.monotonic() - start_time
if test_time >= PROGRESS_MIN_TIME:
previous_test = "%s in %s" % (previous_test, format_duration(test_time))
@@ -441,8 +443,8 @@ class Regrtest:
def _test_forever(self, tests):
while True:
- for test in tests:
- yield test
+ for test_name in tests:
+ yield test_name
if self.bad:
return
if self.ns.fail_env_changed and self.environment_changed:
diff --git a/Lib/test/libregrtest/refleak.py b/Lib/test/libregrtest/refleak.py
index 235d6bf..8d22123 100644
--- a/Lib/test/libregrtest/refleak.py
+++ b/Lib/test/libregrtest/refleak.py
@@ -1,4 +1,3 @@
-import errno
import os
import re
import sys
@@ -18,7 +17,7 @@ except ImportError:
cls._abc_negative_cache, cls._abc_negative_cache_version)
-def dash_R(ns, the_module, test_name, test_func):
+def dash_R(ns, test_name, test_func):
"""Run a test multiple times, looking for reference leaks.
Returns:
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
index 0a9533c..55913b3 100644
--- a/Lib/test/libregrtest/runtest.py
+++ b/Lib/test/libregrtest/runtest.py
@@ -1,4 +1,6 @@
+import collections
import faulthandler
+import functools
import importlib
import io
import os
@@ -9,6 +11,7 @@ import unittest
from test import support
from test.libregrtest.refleak import dash_R, clear_caches
from test.libregrtest.save_env import saved_test_environment
+from test.libregrtest.utils import print_warning
# Test result constants.
@@ -55,9 +58,17 @@ STDTESTS = [
NOTTESTS = set()
-def format_test_result(test_name, result):
- fmt = _FORMAT_TEST_RESULT.get(result, "%s")
- return fmt % test_name
+# used by --findleaks, store for gc.garbage
+found_garbage = []
+
+
+def format_test_result(result):
+ fmt = _FORMAT_TEST_RESULT.get(result.result, "%s")
+ return fmt % result.test_name
+
+
+def findtestdir(path=None):
+ return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
@@ -73,48 +84,34 @@ def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
return stdtests + sorted(tests)
-def get_abs_module(ns, test):
- if test.startswith('test.') or ns.testdir:
- return test
+def get_abs_module(ns, test_name):
+ if test_name.startswith('test.') or ns.testdir:
+ return test_name
else:
- # Always import it from the test package
- return 'test.' + test
-
+ # Import it from the test package
+ return 'test.' + test_name
-def runtest(ns, test):
- """Run a single test.
-
- ns -- regrtest namespace of options
- test -- the name of the test
- Returns the tuple (result, test_time, xml_data), where result is one
- of the constants:
-
- INTERRUPTED KeyboardInterrupt when run under -j
- RESOURCE_DENIED test skipped because resource denied
- SKIPPED test skipped for some other reason
- ENV_CHANGED test failed because it changed the execution environment
- FAILED test failed
- PASSED test passed
- EMPTY_TEST_SUITE test ran no subtests.
+TestResult = collections.namedtuple('TestResult',
+ 'test_name result test_time xml_data')
- If ns.xmlpath is not None, xml_data is a list containing each
- generated testsuite element.
- """
+def _runtest(ns, test_name):
+ # Handle faulthandler timeout, capture stdout+stderr, XML serialization
+ # and measure time.
output_on_failure = ns.verbose3
use_timeout = (ns.timeout is not None)
if use_timeout:
faulthandler.dump_traceback_later(ns.timeout, exit=True)
+
+ start_time = time.perf_counter()
try:
support.set_match_tests(ns.match_tests)
- # reset the environment_altered flag to detect if a test altered
- # the environment
- support.environment_altered = False
support.junit_xml_list = xml_list = [] if ns.xmlpath else None
if ns.failfast:
support.failfast = True
+
if output_on_failure:
support.verbose = True
@@ -124,8 +121,9 @@ def runtest(ns, test):
try:
sys.stdout = stream
sys.stderr = stream
- result = runtest_inner(ns, test, display_failure=False)
- if result[0] != PASSED:
+ result = _runtest_inner(ns, test_name,
+ display_failure=False)
+ if result != PASSED:
output = stream.getvalue()
orig_stderr.write(output)
orig_stderr.flush()
@@ -133,98 +131,170 @@ def runtest(ns, test):
sys.stdout = orig_stdout
sys.stderr = orig_stderr
else:
- support.verbose = ns.verbose # Tell tests to be moderately quiet
- result = runtest_inner(ns, test, display_failure=not ns.verbose)
+ # Tell tests to be moderately quiet
+ support.verbose = ns.verbose
+
+ result = _runtest_inner(ns, test_name,
+ display_failure=not ns.verbose)
if xml_list:
import xml.etree.ElementTree as ET
xml_data = [ET.tostring(x).decode('us-ascii') for x in xml_list]
else:
xml_data = None
- return result + (xml_data,)
+
+ test_time = time.perf_counter() - start_time
+
+ return TestResult(test_name, result, test_time, xml_data)
finally:
if use_timeout:
faulthandler.cancel_dump_traceback_later()
- cleanup_test_droppings(test, ns.verbose)
support.junit_xml_list = None
+def runtest(ns, test_name):
+ """Run a single test.
+
+ ns -- regrtest namespace of options
+ test_name -- the name of the test
+
+ Returns the tuple (result, test_time, xml_data), where result is one
+ of the constants:
+
+ INTERRUPTED KeyboardInterrupt
+ RESOURCE_DENIED test skipped because resource denied
+ SKIPPED test skipped for some other reason
+ ENV_CHANGED test failed because it changed the execution environment
+ FAILED test failed
+ PASSED test passed
+ EMPTY_TEST_SUITE test ran no subtests.
+
+ If ns.xmlpath is not None, xml_data is a list containing each
+ generated testsuite element.
+ """
+ try:
+ return _runtest(ns, test_name)
+ except:
+ if not ns.pgo:
+ msg = traceback.format_exc()
+ print(f"test {test_name} crashed -- {msg}",
+ file=sys.stderr, flush=True)
+ return TestResult(test_name, FAILED, 0.0, None)
+
+
def post_test_cleanup():
+ support.gc_collect()
support.reap_children()
-def runtest_inner(ns, test, display_failure=True):
- support.unload(test)
+def _test_module(the_module):
+ loader = unittest.TestLoader()
+ tests = loader.loadTestsFromModule(the_module)
+ for error in loader.errors:
+ print(error, file=sys.stderr)
+ if loader.errors:
+ raise Exception("errors while loading tests")
+ support.run_unittest(tests)
+
+
+def _runtest_inner2(ns, test_name):
+ # Load the test function, run the test function, handle huntrleaks
+ # and findleaks to detect leaks
+
+ abstest = get_abs_module(ns, test_name)
+
+ # remove the module from sys.module to reload it if it was already imported
+ support.unload(abstest)
+
+ the_module = importlib.import_module(abstest)
+
+ # If the test has a test_main, that will run the appropriate
+ # tests. If not, use normal unittest test loading.
+ test_runner = getattr(the_module, "test_main", None)
+ if test_runner is None:
+ test_runner = functools.partial(_test_module, the_module)
+
+ try:
+ if ns.huntrleaks:
+ # Return True if the test leaked references
+ refleak = dash_R(ns, test_name, test_runner)
+ else:
+ test_runner()
+ refleak = False
+ finally:
+ cleanup_test_droppings(test_name, ns.verbose)
+
+ if ns.findleaks:
+ import gc
+ support.gc_collect()
+ if gc.garbage:
+ import gc
+ gc.garbage = [1]
+ print_warning(f"{test_name} created {len(gc.garbage)} "
+ f"uncollectable object(s).")
+ # move the uncollectable objects somewhere,
+ # so we don't see them again
+ found_garbage.extend(gc.garbage)
+ gc.garbage.clear()
+ support.environment_altered = True
+
+ post_test_cleanup()
+
+ return refleak
+
+
+def _runtest_inner(ns, test_name, display_failure=True):
+ # Detect environment changes, handle exceptions.
+
+ # Reset the environment_altered flag to detect if a test altered
+ # the environment
+ support.environment_altered = False
+
+ if ns.pgo:
+ display_failure = False
- test_time = 0.0
- refleak = False # True if the test leaked references.
try:
- abstest = get_abs_module(ns, test)
clear_caches()
- with saved_test_environment(test, ns.verbose, ns.quiet, pgo=ns.pgo) as environment:
- start_time = time.perf_counter()
- the_module = importlib.import_module(abstest)
- # If the test has a test_main, that will run the appropriate
- # tests. If not, use normal unittest test loading.
- test_runner = getattr(the_module, "test_main", None)
- if test_runner is None:
- def test_runner():
- loader = unittest.TestLoader()
- tests = loader.loadTestsFromModule(the_module)
- for error in loader.errors:
- print(error, file=sys.stderr)
- if loader.errors:
- raise Exception("errors while loading tests")
- support.run_unittest(tests)
- if ns.huntrleaks:
- refleak = dash_R(ns, the_module, test, test_runner)
- else:
- test_runner()
- test_time = time.perf_counter() - start_time
- post_test_cleanup()
+
+ with saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) as environment:
+ refleak = _runtest_inner2(ns, test_name)
except support.ResourceDenied as msg:
if not ns.quiet and not ns.pgo:
- print(test, "skipped --", msg, flush=True)
- return RESOURCE_DENIED, test_time
+ print(f"{test_name} skipped -- {msg}", flush=True)
+ return RESOURCE_DENIED
except unittest.SkipTest as msg:
if not ns.quiet and not ns.pgo:
- print(test, "skipped --", msg, flush=True)
- return SKIPPED, test_time
- except KeyboardInterrupt:
- raise
- except support.TestFailed as msg:
- if not ns.pgo:
- if display_failure:
- print("test", test, "failed --", msg, file=sys.stderr,
- flush=True)
- else:
- print("test", test, "failed", file=sys.stderr, flush=True)
- return FAILED, test_time
+ print(f"{test_name} skipped -- {msg}", flush=True)
+ return SKIPPED
+ except support.TestFailed as exc:
+ msg = f"test {test_name} failed"
+ if display_failure:
+ msg = f"{msg} -- {exc}"
+ print(msg, file=sys.stderr, flush=True)
+ return FAILED
except support.TestDidNotRun:
- return TEST_DID_NOT_RUN, test_time
+ return TEST_DID_NOT_RUN
+ except KeyboardInterrupt:
+ return INTERRUPTED
except:
- msg = traceback.format_exc()
if not ns.pgo:
- print("test", test, "crashed --", msg, file=sys.stderr,
- flush=True)
- return FAILED, test_time
- else:
- if refleak:
- return FAILED, test_time
- if environment.changed:
- return ENV_CHANGED, test_time
- return PASSED, test_time
+ msg = traceback.format_exc()
+ print(f"test {test_name} crashed -- {msg}",
+ file=sys.stderr, flush=True)
+ return FAILED
+ if refleak:
+ return FAILED
+ if environment.changed:
+ return ENV_CHANGED
+ return PASSED
-def cleanup_test_droppings(testname, verbose):
- import shutil
- import stat
- import gc
+def cleanup_test_droppings(test_name, verbose):
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
# triggered during the following test run, and possibly produce failures.
- gc.collect()
+ support.gc_collect()
# Try to clean up junk commonly left behind. While tests shouldn't leave
# any files or directories behind, when a test fails that can be tedious
@@ -239,23 +309,23 @@ def cleanup_test_droppings(testname, verbose):
continue
if os.path.isdir(name):
+ import shutil
kind, nuker = "directory", shutil.rmtree
elif os.path.isfile(name):
kind, nuker = "file", os.unlink
else:
- raise SystemError("os.path says %r exists but is neither "
- "directory nor file" % name)
+ raise RuntimeError(f"os.path says {name!r} exists but is neither "
+ f"directory nor file")
if verbose:
- print("%r left behind %s %r" % (testname, kind, name))
+ print_warning("%r left behind %s %r" % (test_name, kind, name))
+ support.environment_altered = True
+
try:
+ import stat
# fix possible permissions problems that might prevent cleanup
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
nuker(name)
- except Exception as msg:
- print(("%r left behind %s %r and it couldn't be "
- "removed: %s" % (testname, kind, name, msg)), file=sys.stderr)
-
-
-def findtestdir(path=None):
- return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
+ except Exception as exc:
+ print_warning(f"{test_name} left behind {kind} {name!r} "
+ f"and it couldn't be removed: {exc}")
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
index 6190574..0a95bf6 100644
--- a/Lib/test/libregrtest/runtest_mp.py
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -1,3 +1,4 @@
+import collections
import faulthandler
import json
import os
@@ -5,13 +6,12 @@ import queue
import sys
import threading
import time
-import traceback
import types
from test import support
from test.libregrtest.runtest import (
runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
- format_test_result)
+ format_test_result, TestResult)
from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import format_duration
@@ -64,15 +64,9 @@ def run_tests_worker(worker_args):
setup_tests(ns)
- try:
- result = runtest(ns, testname)
- except KeyboardInterrupt:
- result = INTERRUPTED, '', None
- except BaseException as e:
- traceback.print_exc()
- result = CHILD_ERROR, str(e)
-
+ result = runtest(ns, testname)
print() # Force a newline (just in case)
+
print(json.dumps(result), flush=True)
sys.exit(0)
@@ -97,45 +91,51 @@ class MultiprocessIterator:
return next(self.tests)
+MultiprocessResult = collections.namedtuple('MultiprocessResult',
+ 'result stdout stderr error_msg')
+
class MultiprocessThread(threading.Thread):
def __init__(self, pending, output, ns):
super().__init__()
self.pending = pending
self.output = output
self.ns = ns
- self.current_test = None
+ self.current_test_name = None
self.start_time = None
def _runtest(self):
try:
- test = next(self.pending)
+ test_name = next(self.pending)
except StopIteration:
- self.output.put((None, None, None, None))
+ self.output.put(None)
return True
try:
self.start_time = time.monotonic()
- self.current_test = test
+ self.current_test_name = test_name
- retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
+ retcode, stdout, stderr = run_test_in_subprocess(test_name, self.ns)
finally:
- self.current_test = None
+ self.current_test_name = None
if retcode != 0:
- result = (CHILD_ERROR, "Exit code %s" % retcode, None)
- self.output.put((test, stdout.rstrip(), stderr.rstrip(),
- result))
+ test_time = time.monotonic() - self.start_time
+ result = TestResult(test_name, CHILD_ERROR, test_time, None)
+ err_msg = "Exit code %s" % retcode
+ mp_result = MultiprocessResult(result, stdout.rstrip(), stderr.rstrip(), err_msg)
+ self.output.put(mp_result)
return False
stdout, _, result = stdout.strip().rpartition("\n")
if not result:
- self.output.put((None, None, None, None))
+ self.output.put(None)
return True
+ # deserialize run_tests_worker() output
result = json.loads(result)
- assert len(result) == 3, f"Invalid result tuple: {result!r}"
- self.output.put((test, stdout.rstrip(), stderr.rstrip(),
- result))
+ result = TestResult(*result)
+ mp_result = MultiprocessResult(result, stdout.rstrip(), stderr.rstrip(), None)
+ self.output.put(mp_result)
return False
def run(self):
@@ -144,7 +144,7 @@ class MultiprocessThread(threading.Thread):
while not stop:
stop = self._runtest()
except BaseException:
- self.output.put((None, None, None, None))
+ self.output.put(None)
raise
@@ -164,12 +164,12 @@ def run_tests_multiprocess(regrtest):
def get_running(workers):
running = []
for worker in workers:
- current_test = worker.current_test
- if not current_test:
+ current_test_name = worker.current_test_name
+ if not current_test_name:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
- text = '%s (%s)' % (current_test, format_duration(dt))
+ text = '%s (%s)' % (current_test_name, format_duration(dt))
running.append(text)
return running
@@ -182,40 +182,41 @@ def run_tests_multiprocess(regrtest):
faulthandler.dump_traceback_later(test_timeout, exit=True)
try:
- item = output.get(timeout=get_timeout)
+ mp_result = output.get(timeout=get_timeout)
except queue.Empty:
running = get_running(workers)
if running and not regrtest.ns.pgo:
print('running: %s' % ', '.join(running), flush=True)
continue
- test, stdout, stderr, result = item
- if test is None:
+ if mp_result is None:
finished += 1
continue
- regrtest.accumulate_result(test, result)
+ result = mp_result.result
+ regrtest.accumulate_result(result)
# Display progress
- ok, test_time, xml_data = result
- text = format_test_result(test, ok)
+ ok = result.result
+
+ text = format_test_result(result)
if (ok not in (CHILD_ERROR, INTERRUPTED)
- and test_time >= PROGRESS_MIN_TIME
+ and result.test_time >= PROGRESS_MIN_TIME
and not regrtest.ns.pgo):
- text += ' (%s)' % format_duration(test_time)
+ text += ' (%s)' % format_duration(result.test_time)
elif ok == CHILD_ERROR:
- text = '%s (%s)' % (text, test_time)
+ text = '%s (%s)' % (text, mp_result.error_msg)
running = get_running(workers)
if running and not regrtest.ns.pgo:
text += ' -- running: %s' % ', '.join(running)
regrtest.display_progress(test_index, text)
# Copy stdout and stderr from the child process
- if stdout:
- print(stdout, flush=True)
- if stderr and not regrtest.ns.pgo:
- print(stderr, file=sys.stderr, flush=True)
+ if mp_result.stdout:
+ print(mp_result.stdout, flush=True)
+ if mp_result.stderr and not regrtest.ns.pgo:
+ print(mp_result.stderr, file=sys.stderr, flush=True)
- if result[0] == INTERRUPTED:
+ if result.result == INTERRUPTED:
raise KeyboardInterrupt
test_index += 1
except KeyboardInterrupt:
@@ -229,7 +230,7 @@ def run_tests_multiprocess(regrtest):
# If tests are interrupted, wait until tests complete
wait_start = time.monotonic()
while True:
- running = [worker.current_test for worker in workers]
+ running = [worker.current_test_name for worker in workers]
running = list(filter(bool, running))
if not running:
break
diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py
index 2313b71..31931f2 100644
--- a/Lib/test/libregrtest/save_env.py
+++ b/Lib/test/libregrtest/save_env.py
@@ -9,6 +9,7 @@ import sysconfig
import threading
import warnings
from test import support
+from test.libregrtest.utils import print_warning
try:
import _multiprocessing, multiprocessing.process
except ImportError:
@@ -283,8 +284,7 @@ class saved_test_environment:
self.changed = True
restore(original)
if not self.quiet and not self.pgo:
- print(f"Warning -- {name} was modified by {self.testname}",
- file=sys.stderr, flush=True)
+ print_warning(f"{name} was modified by {self.testname}")
print(f" Before: {original}\n After: {current} ",
file=sys.stderr, flush=True)
return False
diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py
index d36bf91..fb9971a 100644
--- a/Lib/test/libregrtest/utils.py
+++ b/Lib/test/libregrtest/utils.py
@@ -1,5 +1,6 @@
-import os.path
import math
+import os.path
+import sys
import textwrap
@@ -54,3 +55,7 @@ def printlist(x, width=70, indent=4, file=None):
print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
initial_indent=blanks, subsequent_indent=blanks),
file=file)
+
+
+def print_warning(msg):
+ print(f"Warning -- {msg}", file=sys.stderr, flush=True)
diff --git a/Lib/test/libregrtest/win_utils.py b/Lib/test/libregrtest/win_utils.py
index 2e64922..ca27f36 100644
--- a/Lib/test/libregrtest/win_utils.py
+++ b/Lib/test/libregrtest/win_utils.py
@@ -1,8 +1,7 @@
-import subprocess
-import sys
-import os
import _winapi
import msvcrt
+import os
+import subprocess
import uuid
from test import support
diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py
index a9febd0..5c65e6d 100644
--- a/Lib/test/test_regrtest.py
+++ b/Lib/test/test_regrtest.py
@@ -26,8 +26,9 @@ ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..')
ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR))
TEST_INTERRUPTED = textwrap.dedent("""
- from signal import SIGINT, raise_signal
+ from signal import SIGINT
try:
+ from signal import raise_signal
raise_signal(SIGINT)
except ImportError:
import os
@@ -108,7 +109,7 @@ class ParseArgsTestCase(unittest.TestCase):
self.assertTrue(ns.quiet)
self.assertEqual(ns.verbose, 0)
- def test_slow(self):
+ def test_slowest(self):
for opt in '-o', '--slowest':
with self.subTest(opt=opt):
ns = libregrtest._parse_args([opt])
@@ -780,22 +781,23 @@ class ArgsTestCase(BaseTestCase):
% (self.TESTNAME_REGEX, len(tests)))
self.check_line(output, regex)
- def test_slow_interrupted(self):
+ def test_slowest_interrupted(self):
# Issue #25373: test --slowest with an interrupted test
code = TEST_INTERRUPTED
test = self.create_test("sigint", code=code)
for multiprocessing in (False, True):
- if multiprocessing:
- args = ("--slowest", "-j2", test)
- else:
- args = ("--slowest", test)
- output = self.run_tests(*args, exitcode=130)
- self.check_executed_tests(output, test,
- omitted=test, interrupted=True)
-
- regex = ('10 slowest tests:\n')
- self.check_line(output, regex)
+ with self.subTest(multiprocessing=multiprocessing):
+ if multiprocessing:
+ args = ("--slowest", "-j2", test)
+ else:
+ args = ("--slowest", test)
+ output = self.run_tests(*args, exitcode=130)
+ self.check_executed_tests(output, test,
+ omitted=test, interrupted=True)
+
+ regex = ('10 slowest tests:\n')
+ self.check_line(output, regex)
def test_coverage(self):
# test --coverage