diff options
author | Victor Stinner <vstinner@redhat.com> | 2019-04-26 02:08:53 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-04-26 02:08:53 (GMT) |
commit | 4d29983185bc12ca685a1eb3873bacb8a7b67416 (patch) | |
tree | 26457c1f05ba31ed1cb1421b7ea50b8f98630a21 | |
parent | 9db0324712f6982d89620b420f507a6aa5da152f (diff) | |
download | cpython-4d29983185bc12ca685a1eb3873bacb8a7b67416.zip cpython-4d29983185bc12ca685a1eb3873bacb8a7b67416.tar.gz cpython-4d29983185bc12ca685a1eb3873bacb8a7b67416.tar.bz2 |
bpo-36725: regrtest: add TestResult type (GH-12960)
* Add TestResult and MultiprocessResult types to ensure that results
always have the same fields.
* runtest() now handles KeyboardInterrupt
* accumulate_result() and format_test_result() now takes a TestResult
* cleanup_test_droppings() is now called by runtest() and mark the
test as ENV_CHANGED if the test leaks support.TESTFN file.
* runtest() now includes code "around" the test in the test timing
* Add print_warning() in test.libregrtest.utils to standardize how
libregrtest logs warnings to ease parsing the test output.
* support.unload() is now called with abstest rather than test_name
* Rename 'test' variable/parameter to 'test_name'
* dash_R(): remove unused the_module parameter
* Remove unused imports
-rw-r--r-- | Lib/test/libregrtest/main.py | 88 | ||||
-rw-r--r-- | Lib/test/libregrtest/refleak.py | 3 | ||||
-rw-r--r-- | Lib/test/libregrtest/runtest.py | 274 | ||||
-rw-r--r-- | Lib/test/libregrtest/runtest_mp.py | 85 | ||||
-rw-r--r-- | Lib/test/libregrtest/save_env.py | 4 | ||||
-rw-r--r-- | Lib/test/libregrtest/utils.py | 7 | ||||
-rw-r--r-- | Lib/test/libregrtest/win_utils.py | 5 | ||||
-rw-r--r-- | Lib/test/test_regrtest.py | 28 |
8 files changed, 286 insertions, 208 deletions
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index d20e174..ef1336a 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -105,26 +105,30 @@ class Regrtest: # used by --junit-xml self.testsuite_xml = None - def accumulate_result(self, test, result): - ok, test_time, xml_data = result + def accumulate_result(self, result): + test_name = result.test_name + ok = result.result + if ok not in (CHILD_ERROR, INTERRUPTED): - self.test_times.append((test_time, test)) + self.test_times.append((result.test_time, test_name)) + if ok == PASSED: - self.good.append(test) + self.good.append(test_name) elif ok in (FAILED, CHILD_ERROR): - self.bad.append(test) + self.bad.append(test_name) elif ok == ENV_CHANGED: - self.environment_changed.append(test) + self.environment_changed.append(test_name) elif ok == SKIPPED: - self.skipped.append(test) + self.skipped.append(test_name) elif ok == RESOURCE_DENIED: - self.skipped.append(test) - self.resource_denieds.append(test) + self.skipped.append(test_name) + self.resource_denieds.append(test_name) elif ok == TEST_DID_NOT_RUN: - self.run_no_tests.append(test) + self.run_no_tests.append(test_name) elif ok != INTERRUPTED: raise ValueError("invalid test result: %r" % ok) + xml_data = result.xml_data if xml_data: import xml.etree.ElementTree as ET for e in xml_data: @@ -134,7 +138,7 @@ class Regrtest: print(xml_data, file=sys.__stderr__) raise - def display_progress(self, test_index, test): + def display_progress(self, test_index, text): if self.ns.quiet: return @@ -143,7 +147,7 @@ class Regrtest: fails = len(self.bad) + len(self.environment_changed) if fails and not self.ns.pgo: line = f"{line}/{fails}" - line = f"[{line}] {test}" + line = f"[{line}] {text}" # add the system load prefix: "load avg: 1.80 " if self.getloadavg: @@ -275,13 +279,13 @@ class Regrtest: support.verbose = False support.set_match_tests(self.ns.match_tests) - for test in self.selected: - abstest = get_abs_module(self.ns, test) + for test_name in self.selected: + abstest = get_abs_module(self.ns, test_name) try: suite = unittest.defaultTestLoader.loadTestsFromName(abstest) self._list_cases(suite) except unittest.SkipTest: - self.skipped.append(test) + self.skipped.append(test_name) if self.skipped: print(file=sys.stderr) @@ -298,19 +302,19 @@ class Regrtest: print() print("Re-running failed tests in verbose mode") self.rerun = self.bad[:] - for test in self.rerun: - print("Re-running test %r in verbose mode" % test, flush=True) - try: - self.ns.verbose = True - ok = runtest(self.ns, test) - except KeyboardInterrupt: - self.interrupted = True + for test_name in self.rerun: + print("Re-running test %r in verbose mode" % test_name, flush=True) + self.ns.verbose = True + ok = runtest(self.ns, test_name) + + if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}: + self.bad.remove(test_name) + + if ok.result == INTERRUPTED: # print a newline separate from the ^C print() + self.interrupted = True break - else: - if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}: - self.bad.remove(test) else: if self.bad: print(count(len(self.bad), 'test'), "failed again:") @@ -348,8 +352,8 @@ class Regrtest: self.test_times.sort(reverse=True) print() print("10 slowest tests:") - for time, test in self.test_times[:10]: - print("- %s: %s" % (test, format_duration(time))) + for test_time, test in self.test_times[:10]: + print("- %s: %s" % (test, format_duration(test_time))) if self.bad: print() @@ -387,10 +391,10 @@ class Regrtest: print("Run tests sequentially") previous_test = None - for test_index, test in enumerate(self.tests, 1): + for test_index, test_name in enumerate(self.tests, 1): start_time = time.monotonic() - text = test + text = test_name if previous_test: text = '%s -- %s' % (text, previous_test) self.display_progress(test_index, text) @@ -398,22 +402,20 @@ class Regrtest: if self.tracer: # If we're tracing code coverage, then we don't exit with status # if on a false return value from main. - cmd = ('result = runtest(self.ns, test); ' - 'self.accumulate_result(test, result)') + cmd = ('result = runtest(self.ns, test_name); ' + 'self.accumulate_result(result)') ns = dict(locals()) self.tracer.runctx(cmd, globals=globals(), locals=ns) result = ns['result'] else: - try: - result = runtest(self.ns, test) - except KeyboardInterrupt: - self.interrupted = True - self.accumulate_result(test, (INTERRUPTED, None, None)) - break - else: - self.accumulate_result(test, result) - - previous_test = format_test_result(test, result[0]) + result = runtest(self.ns, test_name) + self.accumulate_result(result) + + if result.result == INTERRUPTED: + self.interrupted = True + break + + previous_test = format_test_result(result) test_time = time.monotonic() - start_time if test_time >= PROGRESS_MIN_TIME: previous_test = "%s in %s" % (previous_test, format_duration(test_time)) @@ -441,8 +443,8 @@ class Regrtest: def _test_forever(self, tests): while True: - for test in tests: - yield test + for test_name in tests: + yield test_name if self.bad: return if self.ns.fail_env_changed and self.environment_changed: diff --git a/Lib/test/libregrtest/refleak.py b/Lib/test/libregrtest/refleak.py index 235d6bf..8d22123 100644 --- a/Lib/test/libregrtest/refleak.py +++ b/Lib/test/libregrtest/refleak.py @@ -1,4 +1,3 @@ -import errno import os import re import sys @@ -18,7 +17,7 @@ except ImportError: cls._abc_negative_cache, cls._abc_negative_cache_version) -def dash_R(ns, the_module, test_name, test_func): +def dash_R(ns, test_name, test_func): """Run a test multiple times, looking for reference leaks. Returns: diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py index 0a9533c..55913b3 100644 --- a/Lib/test/libregrtest/runtest.py +++ b/Lib/test/libregrtest/runtest.py @@ -1,4 +1,6 @@ +import collections import faulthandler +import functools import importlib import io import os @@ -9,6 +11,7 @@ import unittest from test import support from test.libregrtest.refleak import dash_R, clear_caches from test.libregrtest.save_env import saved_test_environment +from test.libregrtest.utils import print_warning # Test result constants. @@ -55,9 +58,17 @@ STDTESTS = [ NOTTESTS = set() -def format_test_result(test_name, result): - fmt = _FORMAT_TEST_RESULT.get(result, "%s") - return fmt % test_name +# used by --findleaks, store for gc.garbage +found_garbage = [] + + +def format_test_result(result): + fmt = _FORMAT_TEST_RESULT.get(result.result, "%s") + return fmt % result.test_name + + +def findtestdir(path=None): + return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS): @@ -73,48 +84,34 @@ def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS): return stdtests + sorted(tests) -def get_abs_module(ns, test): - if test.startswith('test.') or ns.testdir: - return test +def get_abs_module(ns, test_name): + if test_name.startswith('test.') or ns.testdir: + return test_name else: - # Always import it from the test package - return 'test.' + test - + # Import it from the test package + return 'test.' + test_name -def runtest(ns, test): - """Run a single test. - - ns -- regrtest namespace of options - test -- the name of the test - Returns the tuple (result, test_time, xml_data), where result is one - of the constants: - - INTERRUPTED KeyboardInterrupt when run under -j - RESOURCE_DENIED test skipped because resource denied - SKIPPED test skipped for some other reason - ENV_CHANGED test failed because it changed the execution environment - FAILED test failed - PASSED test passed - EMPTY_TEST_SUITE test ran no subtests. +TestResult = collections.namedtuple('TestResult', + 'test_name result test_time xml_data') - If ns.xmlpath is not None, xml_data is a list containing each - generated testsuite element. - """ +def _runtest(ns, test_name): + # Handle faulthandler timeout, capture stdout+stderr, XML serialization + # and measure time. output_on_failure = ns.verbose3 use_timeout = (ns.timeout is not None) if use_timeout: faulthandler.dump_traceback_later(ns.timeout, exit=True) + + start_time = time.perf_counter() try: support.set_match_tests(ns.match_tests) - # reset the environment_altered flag to detect if a test altered - # the environment - support.environment_altered = False support.junit_xml_list = xml_list = [] if ns.xmlpath else None if ns.failfast: support.failfast = True + if output_on_failure: support.verbose = True @@ -124,8 +121,9 @@ def runtest(ns, test): try: sys.stdout = stream sys.stderr = stream - result = runtest_inner(ns, test, display_failure=False) - if result[0] != PASSED: + result = _runtest_inner(ns, test_name, + display_failure=False) + if result != PASSED: output = stream.getvalue() orig_stderr.write(output) orig_stderr.flush() @@ -133,98 +131,170 @@ def runtest(ns, test): sys.stdout = orig_stdout sys.stderr = orig_stderr else: - support.verbose = ns.verbose # Tell tests to be moderately quiet - result = runtest_inner(ns, test, display_failure=not ns.verbose) + # Tell tests to be moderately quiet + support.verbose = ns.verbose + + result = _runtest_inner(ns, test_name, + display_failure=not ns.verbose) if xml_list: import xml.etree.ElementTree as ET xml_data = [ET.tostring(x).decode('us-ascii') for x in xml_list] else: xml_data = None - return result + (xml_data,) + + test_time = time.perf_counter() - start_time + + return TestResult(test_name, result, test_time, xml_data) finally: if use_timeout: faulthandler.cancel_dump_traceback_later() - cleanup_test_droppings(test, ns.verbose) support.junit_xml_list = None +def runtest(ns, test_name): + """Run a single test. + + ns -- regrtest namespace of options + test_name -- the name of the test + + Returns the tuple (result, test_time, xml_data), where result is one + of the constants: + + INTERRUPTED KeyboardInterrupt + RESOURCE_DENIED test skipped because resource denied + SKIPPED test skipped for some other reason + ENV_CHANGED test failed because it changed the execution environment + FAILED test failed + PASSED test passed + EMPTY_TEST_SUITE test ran no subtests. + + If ns.xmlpath is not None, xml_data is a list containing each + generated testsuite element. + """ + try: + return _runtest(ns, test_name) + except: + if not ns.pgo: + msg = traceback.format_exc() + print(f"test {test_name} crashed -- {msg}", + file=sys.stderr, flush=True) + return TestResult(test_name, FAILED, 0.0, None) + + def post_test_cleanup(): + support.gc_collect() support.reap_children() -def runtest_inner(ns, test, display_failure=True): - support.unload(test) +def _test_module(the_module): + loader = unittest.TestLoader() + tests = loader.loadTestsFromModule(the_module) + for error in loader.errors: + print(error, file=sys.stderr) + if loader.errors: + raise Exception("errors while loading tests") + support.run_unittest(tests) + + +def _runtest_inner2(ns, test_name): + # Load the test function, run the test function, handle huntrleaks + # and findleaks to detect leaks + + abstest = get_abs_module(ns, test_name) + + # remove the module from sys.module to reload it if it was already imported + support.unload(abstest) + + the_module = importlib.import_module(abstest) + + # If the test has a test_main, that will run the appropriate + # tests. If not, use normal unittest test loading. + test_runner = getattr(the_module, "test_main", None) + if test_runner is None: + test_runner = functools.partial(_test_module, the_module) + + try: + if ns.huntrleaks: + # Return True if the test leaked references + refleak = dash_R(ns, test_name, test_runner) + else: + test_runner() + refleak = False + finally: + cleanup_test_droppings(test_name, ns.verbose) + + if ns.findleaks: + import gc + support.gc_collect() + if gc.garbage: + import gc + gc.garbage = [1] + print_warning(f"{test_name} created {len(gc.garbage)} " + f"uncollectable object(s).") + # move the uncollectable objects somewhere, + # so we don't see them again + found_garbage.extend(gc.garbage) + gc.garbage.clear() + support.environment_altered = True + + post_test_cleanup() + + return refleak + + +def _runtest_inner(ns, test_name, display_failure=True): + # Detect environment changes, handle exceptions. + + # Reset the environment_altered flag to detect if a test altered + # the environment + support.environment_altered = False + + if ns.pgo: + display_failure = False - test_time = 0.0 - refleak = False # True if the test leaked references. try: - abstest = get_abs_module(ns, test) clear_caches() - with saved_test_environment(test, ns.verbose, ns.quiet, pgo=ns.pgo) as environment: - start_time = time.perf_counter() - the_module = importlib.import_module(abstest) - # If the test has a test_main, that will run the appropriate - # tests. If not, use normal unittest test loading. - test_runner = getattr(the_module, "test_main", None) - if test_runner is None: - def test_runner(): - loader = unittest.TestLoader() - tests = loader.loadTestsFromModule(the_module) - for error in loader.errors: - print(error, file=sys.stderr) - if loader.errors: - raise Exception("errors while loading tests") - support.run_unittest(tests) - if ns.huntrleaks: - refleak = dash_R(ns, the_module, test, test_runner) - else: - test_runner() - test_time = time.perf_counter() - start_time - post_test_cleanup() + + with saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) as environment: + refleak = _runtest_inner2(ns, test_name) except support.ResourceDenied as msg: if not ns.quiet and not ns.pgo: - print(test, "skipped --", msg, flush=True) - return RESOURCE_DENIED, test_time + print(f"{test_name} skipped -- {msg}", flush=True) + return RESOURCE_DENIED except unittest.SkipTest as msg: if not ns.quiet and not ns.pgo: - print(test, "skipped --", msg, flush=True) - return SKIPPED, test_time - except KeyboardInterrupt: - raise - except support.TestFailed as msg: - if not ns.pgo: - if display_failure: - print("test", test, "failed --", msg, file=sys.stderr, - flush=True) - else: - print("test", test, "failed", file=sys.stderr, flush=True) - return FAILED, test_time + print(f"{test_name} skipped -- {msg}", flush=True) + return SKIPPED + except support.TestFailed as exc: + msg = f"test {test_name} failed" + if display_failure: + msg = f"{msg} -- {exc}" + print(msg, file=sys.stderr, flush=True) + return FAILED except support.TestDidNotRun: - return TEST_DID_NOT_RUN, test_time + return TEST_DID_NOT_RUN + except KeyboardInterrupt: + return INTERRUPTED except: - msg = traceback.format_exc() if not ns.pgo: - print("test", test, "crashed --", msg, file=sys.stderr, - flush=True) - return FAILED, test_time - else: - if refleak: - return FAILED, test_time - if environment.changed: - return ENV_CHANGED, test_time - return PASSED, test_time + msg = traceback.format_exc() + print(f"test {test_name} crashed -- {msg}", + file=sys.stderr, flush=True) + return FAILED + if refleak: + return FAILED + if environment.changed: + return ENV_CHANGED + return PASSED -def cleanup_test_droppings(testname, verbose): - import shutil - import stat - import gc +def cleanup_test_droppings(test_name, verbose): # First kill any dangling references to open files etc. # This can also issue some ResourceWarnings which would otherwise get # triggered during the following test run, and possibly produce failures. - gc.collect() + support.gc_collect() # Try to clean up junk commonly left behind. While tests shouldn't leave # any files or directories behind, when a test fails that can be tedious @@ -239,23 +309,23 @@ def cleanup_test_droppings(testname, verbose): continue if os.path.isdir(name): + import shutil kind, nuker = "directory", shutil.rmtree elif os.path.isfile(name): kind, nuker = "file", os.unlink else: - raise SystemError("os.path says %r exists but is neither " - "directory nor file" % name) + raise RuntimeError(f"os.path says {name!r} exists but is neither " + f"directory nor file") if verbose: - print("%r left behind %s %r" % (testname, kind, name)) + print_warning("%r left behind %s %r" % (test_name, kind, name)) + support.environment_altered = True + try: + import stat # fix possible permissions problems that might prevent cleanup os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) nuker(name) - except Exception as msg: - print(("%r left behind %s %r and it couldn't be " - "removed: %s" % (testname, kind, name, msg)), file=sys.stderr) - - -def findtestdir(path=None): - return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir + except Exception as exc: + print_warning(f"{test_name} left behind {kind} {name!r} " + f"and it couldn't be removed: {exc}") diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py index 6190574..0a95bf6 100644 --- a/Lib/test/libregrtest/runtest_mp.py +++ b/Lib/test/libregrtest/runtest_mp.py @@ -1,3 +1,4 @@ +import collections import faulthandler import json import os @@ -5,13 +6,12 @@ import queue import sys import threading import time -import traceback import types from test import support from test.libregrtest.runtest import ( runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME, - format_test_result) + format_test_result, TestResult) from test.libregrtest.setup import setup_tests from test.libregrtest.utils import format_duration @@ -64,15 +64,9 @@ def run_tests_worker(worker_args): setup_tests(ns) - try: - result = runtest(ns, testname) - except KeyboardInterrupt: - result = INTERRUPTED, '', None - except BaseException as e: - traceback.print_exc() - result = CHILD_ERROR, str(e) - + result = runtest(ns, testname) print() # Force a newline (just in case) + print(json.dumps(result), flush=True) sys.exit(0) @@ -97,45 +91,51 @@ class MultiprocessIterator: return next(self.tests) +MultiprocessResult = collections.namedtuple('MultiprocessResult', + 'result stdout stderr error_msg') + class MultiprocessThread(threading.Thread): def __init__(self, pending, output, ns): super().__init__() self.pending = pending self.output = output self.ns = ns - self.current_test = None + self.current_test_name = None self.start_time = None def _runtest(self): try: - test = next(self.pending) + test_name = next(self.pending) except StopIteration: - self.output.put((None, None, None, None)) + self.output.put(None) return True try: self.start_time = time.monotonic() - self.current_test = test + self.current_test_name = test_name - retcode, stdout, stderr = run_test_in_subprocess(test, self.ns) + retcode, stdout, stderr = run_test_in_subprocess(test_name, self.ns) finally: - self.current_test = None + self.current_test_name = None if retcode != 0: - result = (CHILD_ERROR, "Exit code %s" % retcode, None) - self.output.put((test, stdout.rstrip(), stderr.rstrip(), - result)) + test_time = time.monotonic() - self.start_time + result = TestResult(test_name, CHILD_ERROR, test_time, None) + err_msg = "Exit code %s" % retcode + mp_result = MultiprocessResult(result, stdout.rstrip(), stderr.rstrip(), err_msg) + self.output.put(mp_result) return False stdout, _, result = stdout.strip().rpartition("\n") if not result: - self.output.put((None, None, None, None)) + self.output.put(None) return True + # deserialize run_tests_worker() output result = json.loads(result) - assert len(result) == 3, f"Invalid result tuple: {result!r}" - self.output.put((test, stdout.rstrip(), stderr.rstrip(), - result)) + result = TestResult(*result) + mp_result = MultiprocessResult(result, stdout.rstrip(), stderr.rstrip(), None) + self.output.put(mp_result) return False def run(self): @@ -144,7 +144,7 @@ class MultiprocessThread(threading.Thread): while not stop: stop = self._runtest() except BaseException: - self.output.put((None, None, None, None)) + self.output.put(None) raise @@ -164,12 +164,12 @@ def run_tests_multiprocess(regrtest): def get_running(workers): running = [] for worker in workers: - current_test = worker.current_test - if not current_test: + current_test_name = worker.current_test_name + if not current_test_name: continue dt = time.monotonic() - worker.start_time if dt >= PROGRESS_MIN_TIME: - text = '%s (%s)' % (current_test, format_duration(dt)) + text = '%s (%s)' % (current_test_name, format_duration(dt)) running.append(text) return running @@ -182,40 +182,41 @@ def run_tests_multiprocess(regrtest): faulthandler.dump_traceback_later(test_timeout, exit=True) try: - item = output.get(timeout=get_timeout) + mp_result = output.get(timeout=get_timeout) except queue.Empty: running = get_running(workers) if running and not regrtest.ns.pgo: print('running: %s' % ', '.join(running), flush=True) continue - test, stdout, stderr, result = item - if test is None: + if mp_result is None: finished += 1 continue - regrtest.accumulate_result(test, result) + result = mp_result.result + regrtest.accumulate_result(result) # Display progress - ok, test_time, xml_data = result - text = format_test_result(test, ok) + ok = result.result + + text = format_test_result(result) if (ok not in (CHILD_ERROR, INTERRUPTED) - and test_time >= PROGRESS_MIN_TIME + and result.test_time >= PROGRESS_MIN_TIME and not regrtest.ns.pgo): - text += ' (%s)' % format_duration(test_time) + text += ' (%s)' % format_duration(result.test_time) elif ok == CHILD_ERROR: - text = '%s (%s)' % (text, test_time) + text = '%s (%s)' % (text, mp_result.error_msg) running = get_running(workers) if running and not regrtest.ns.pgo: text += ' -- running: %s' % ', '.join(running) regrtest.display_progress(test_index, text) # Copy stdout and stderr from the child process - if stdout: - print(stdout, flush=True) - if stderr and not regrtest.ns.pgo: - print(stderr, file=sys.stderr, flush=True) + if mp_result.stdout: + print(mp_result.stdout, flush=True) + if mp_result.stderr and not regrtest.ns.pgo: + print(mp_result.stderr, file=sys.stderr, flush=True) - if result[0] == INTERRUPTED: + if result.result == INTERRUPTED: raise KeyboardInterrupt test_index += 1 except KeyboardInterrupt: @@ -229,7 +230,7 @@ def run_tests_multiprocess(regrtest): # If tests are interrupted, wait until tests complete wait_start = time.monotonic() while True: - running = [worker.current_test for worker in workers] + running = [worker.current_test_name for worker in workers] running = list(filter(bool, running)) if not running: break diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py index 2313b71..31931f2 100644 --- a/Lib/test/libregrtest/save_env.py +++ b/Lib/test/libregrtest/save_env.py @@ -9,6 +9,7 @@ import sysconfig import threading import warnings from test import support +from test.libregrtest.utils import print_warning try: import _multiprocessing, multiprocessing.process except ImportError: @@ -283,8 +284,7 @@ class saved_test_environment: self.changed = True restore(original) if not self.quiet and not self.pgo: - print(f"Warning -- {name} was modified by {self.testname}", - file=sys.stderr, flush=True) + print_warning(f"{name} was modified by {self.testname}") print(f" Before: {original}\n After: {current} ", file=sys.stderr, flush=True) return False diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py index d36bf91..fb9971a 100644 --- a/Lib/test/libregrtest/utils.py +++ b/Lib/test/libregrtest/utils.py @@ -1,5 +1,6 @@ -import os.path import math +import os.path +import sys import textwrap @@ -54,3 +55,7 @@ def printlist(x, width=70, indent=4, file=None): print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width, initial_indent=blanks, subsequent_indent=blanks), file=file) + + +def print_warning(msg): + print(f"Warning -- {msg}", file=sys.stderr, flush=True) diff --git a/Lib/test/libregrtest/win_utils.py b/Lib/test/libregrtest/win_utils.py index 2e64922..ca27f36 100644 --- a/Lib/test/libregrtest/win_utils.py +++ b/Lib/test/libregrtest/win_utils.py @@ -1,8 +1,7 @@ -import subprocess -import sys -import os import _winapi import msvcrt +import os +import subprocess import uuid from test import support diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py index a9febd0..5c65e6d 100644 --- a/Lib/test/test_regrtest.py +++ b/Lib/test/test_regrtest.py @@ -26,8 +26,9 @@ ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..') ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR)) TEST_INTERRUPTED = textwrap.dedent(""" - from signal import SIGINT, raise_signal + from signal import SIGINT try: + from signal import raise_signal raise_signal(SIGINT) except ImportError: import os @@ -108,7 +109,7 @@ class ParseArgsTestCase(unittest.TestCase): self.assertTrue(ns.quiet) self.assertEqual(ns.verbose, 0) - def test_slow(self): + def test_slowest(self): for opt in '-o', '--slowest': with self.subTest(opt=opt): ns = libregrtest._parse_args([opt]) @@ -780,22 +781,23 @@ class ArgsTestCase(BaseTestCase): % (self.TESTNAME_REGEX, len(tests))) self.check_line(output, regex) - def test_slow_interrupted(self): + def test_slowest_interrupted(self): # Issue #25373: test --slowest with an interrupted test code = TEST_INTERRUPTED test = self.create_test("sigint", code=code) for multiprocessing in (False, True): - if multiprocessing: - args = ("--slowest", "-j2", test) - else: - args = ("--slowest", test) - output = self.run_tests(*args, exitcode=130) - self.check_executed_tests(output, test, - omitted=test, interrupted=True) - - regex = ('10 slowest tests:\n') - self.check_line(output, regex) + with self.subTest(multiprocessing=multiprocessing): + if multiprocessing: + args = ("--slowest", "-j2", test) + else: + args = ("--slowest", test) + output = self.run_tests(*args, exitcode=130) + self.check_executed_tests(output, test, + omitted=test, interrupted=True) + + regex = ('10 slowest tests:\n') + self.check_line(output, regex) def test_coverage(self): # test --coverage |