diff options
Diffstat (limited to 'Lib/test/libregrtest/runtest.py')
-rw-r--r-- | Lib/test/libregrtest/runtest.py | 234 |
1 files changed, 167 insertions, 67 deletions
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py index 6fa6069..6e3fab1 100644 --- a/Lib/test/libregrtest/runtest.py +++ b/Lib/test/libregrtest/runtest.py @@ -1,7 +1,6 @@ import dataclasses import doctest import faulthandler -import functools import gc import importlib import io @@ -20,6 +19,10 @@ from test.libregrtest.save_env import saved_test_environment from test.libregrtest.utils import clear_caches, format_duration, print_warning +MatchTests = list[str] +MatchTestsDict = dict[str, MatchTests] + + # Avoid enum.Enum to reduce the number of imports when tests are run class State: PASSED = "PASSED" @@ -56,6 +59,41 @@ class State: State.MULTIPROCESSING_ERROR, State.DID_NOT_RUN} + @staticmethod + def must_stop(state): + return state in { + State.INTERRUPTED, + State.MULTIPROCESSING_ERROR} + + +# gh-90681: When rerunning tests, we might need to rerun the whole +# class or module suite if some its life-cycle hooks fail. +# Test level hooks are not affected. +_TEST_LIFECYCLE_HOOKS = frozenset(( + 'setUpClass', 'tearDownClass', + 'setUpModule', 'tearDownModule', +)) + +def normalize_test_name(test_full_name, *, is_error=False): + short_name = test_full_name.split(" ")[0] + if is_error and short_name in _TEST_LIFECYCLE_HOOKS: + if test_full_name.startswith(('setUpModule (', 'tearDownModule (')): + # if setUpModule() or tearDownModule() failed, don't filter + # tests with the test file name, don't use use filters. + return None + + # This means that we have a failure in a life-cycle hook, + # we need to rerun the whole module or class suite. + # Basically the error looks like this: + # ERROR: setUpClass (test.test_reg_ex.RegTest) + # or + # ERROR: setUpModule (test.test_reg_ex) + # So, we need to parse the class / module name. + lpar = test_full_name.index('(') + rpar = test_full_name.index(')') + return test_full_name[lpar + 1: rpar].split('.')[-1] + return short_name + @dataclasses.dataclass(slots=True) class TestResult: @@ -129,6 +167,58 @@ class TestResult: if self.state is None or self.state == State.PASSED: self.state = State.ENV_CHANGED + def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool: + if State.must_stop(self.state): + return True + if fail_fast and self.is_failed(fail_env_changed): + return True + return False + + def get_rerun_match_tests(self): + match_tests = [] + + errors = self.errors or [] + failures = self.failures or [] + for error_list, is_error in ( + (errors, True), + (failures, False), + ): + for full_name, *_ in error_list: + match_name = normalize_test_name(full_name, is_error=is_error) + if match_name is None: + # 'setUpModule (test.test_sys)': don't filter tests + return None + if not match_name: + error_type = "ERROR" if is_error else "FAIL" + print_warning(f"rerun failed to parse {error_type} test name: " + f"{full_name!r}: don't filter tests") + return None + match_tests.append(match_name) + + return match_tests + + +@dataclasses.dataclass(slots=True, frozen=True) +class RunTests: + tests: list[str] + match_tests: MatchTestsDict | None = None + rerun: bool = False + forever: bool = False + + def get_match_tests(self, test_name) -> MatchTests | None: + if self.match_tests is not None: + return self.match_tests.get(test_name, None) + else: + return None + + def iter_tests(self): + tests = tuple(self.tests) + if self.forever: + while True: + yield from tests + else: + yield from tests + # Minimum duration of a test to display its duration or to mention that # the test is running in background @@ -147,9 +237,6 @@ SPLITTESTDIRS = { "test_multiprocessing_spawn", } -# Storage of uncollectable objects -FOUND_GARBAGE = [] - def findtestdir(path=None): return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir @@ -189,31 +276,41 @@ def split_test_packages(tests, *, testdir=None, exclude=(), return splitted -def get_abs_module(ns: Namespace, test_name: str) -> str: - if test_name.startswith('test.') or ns.testdir: +def abs_module_name(test_name: str, test_dir: str | None) -> str: + if test_name.startswith('test.') or test_dir: return test_name else: # Import it from the test package return 'test.' + test_name -def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) -> None: +def setup_support(ns: Namespace): + support.PGO = ns.pgo + support.PGO_EXTENDED = ns.pgo_extended + support.set_match_tests(ns.match_tests, ns.ignore_tests) + support.failfast = ns.failfast + support.verbose = ns.verbose + if ns.xmlpath: + support.junit_xml_list = [] + else: + support.junit_xml_list = None + + +def _runtest(result: TestResult, ns: Namespace) -> None: # Capture stdout and stderr, set faulthandler timeout, # and create JUnit XML report. - + verbose = ns.verbose output_on_failure = ns.verbose3 + timeout = ns.timeout use_timeout = ( - ns.timeout is not None and threading_helper.can_start_thread + timeout is not None and threading_helper.can_start_thread ) if use_timeout: - faulthandler.dump_traceback_later(ns.timeout, exit=True) + faulthandler.dump_traceback_later(timeout, exit=True) try: - support.set_match_tests(ns.match_tests, ns.ignore_tests) - support.junit_xml_list = xml_list = [] if ns.xmlpath else None - if ns.failfast: - support.failfast = True + setup_support(ns) if output_on_failure: support.verbose = True @@ -247,11 +344,10 @@ def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) -> sys.stderr.flush() else: # Tell tests to be moderately quiet - support.verbose = ns.verbose - - _runtest_env_changed_exc(result, ns, - display_failure=not ns.verbose) + support.verbose = verbose + _runtest_env_changed_exc(result, ns, display_failure=not verbose) + xml_list = support.junit_xml_list if xml_list: import xml.etree.ElementTree as ET result.xml_data = [ET.tostring(x).decode('us-ascii') @@ -276,7 +372,7 @@ def runtest(ns: Namespace, test_name: str) -> TestResult: start_time = time.perf_counter() result = TestResult(test_name) try: - _runtest_capture_output_timeout_junit(result, ns) + _runtest(result, ns) except: if not ns.pgo: msg = traceback.format_exc() @@ -287,9 +383,9 @@ def runtest(ns: Namespace, test_name: str) -> TestResult: return result -def _test_module(the_module): +def run_unittest(test_mod): loader = unittest.TestLoader() - tests = loader.loadTestsFromModule(the_module) + tests = loader.loadTestsFromModule(test_mod) for error in loader.errors: print(error, file=sys.stderr) if loader.errors: @@ -304,7 +400,6 @@ def save_env(ns: Namespace, test_name: str): def regrtest_runner(result, test_func, ns) -> None: # Run test_func(), collect statistics, and detect reference and memory # leaks. - if ns.huntrleaks: from test.libregrtest.refleak import dash_R refleak, test_result = dash_R(ns, result.test_name, test_func) @@ -332,24 +427,27 @@ def regrtest_runner(result, test_func, ns) -> None: result.stats = stats +# Storage of uncollectable objects +FOUND_GARBAGE = [] + + def _load_run_test(result: TestResult, ns: Namespace) -> None: # Load the test function, run the test function. + module_name = abs_module_name(result.test_name, ns.testdir) - abstest = get_abs_module(ns, result.test_name) - - # remove the module from sys.module to reload it if it was already imported - try: - del sys.modules[abstest] - except KeyError: - pass + # Remove the module from sys.module to reload it if it was already imported + sys.modules.pop(module_name, None) - the_module = importlib.import_module(abstest) + test_mod = importlib.import_module(module_name) # If the test has a test_main, that will run the appropriate - # tests. If not, use normal unittest test loading. - test_func = getattr(the_module, "test_main", None) - if test_func is None: - test_func = functools.partial(_test_module, the_module) + # tests. If not, use normal unittest test runner. + test_main = getattr(test_mod, "test_main", None) + if test_main is not None: + test_func = test_main + else: + def test_func(): + return run_unittest(test_mod) try: with save_env(ns, result.test_name): @@ -361,12 +459,12 @@ def _load_run_test(result: TestResult, ns: Namespace) -> None: # failures. support.gc_collect() - cleanup_test_droppings(result.test_name, ns.verbose) + remove_testfn(result.test_name, ns.verbose) if gc.garbage: support.environment_altered = True print_warning(f"{result.test_name} created {len(gc.garbage)} " - f"uncollectable object(s).") + f"uncollectable object(s)") # move the uncollectable objects somewhere, # so we don't see them again @@ -444,35 +542,37 @@ def _runtest_env_changed_exc(result: TestResult, ns: Namespace, result.state = State.PASSED -def cleanup_test_droppings(test_name: str, verbose: int) -> None: - # Try to clean up junk commonly left behind. While tests shouldn't leave - # any files or directories behind, when a test fails that can be tedious - # for it to arrange. The consequences can be especially nasty on Windows, - # since if a test leaves a file open, it cannot be deleted by name (while - # there's nothing we can do about that here either, we can display the - # name of the offending test, which is a real help). - for name in (os_helper.TESTFN,): - if not os.path.exists(name): - continue +def remove_testfn(test_name: str, verbose: int) -> None: + # Try to clean up os_helper.TESTFN if left behind. + # + # While tests shouldn't leave any files or directories behind, when a test + # fails that can be tedious for it to arrange. The consequences can be + # especially nasty on Windows, since if a test leaves a file open, it + # cannot be deleted by name (while there's nothing we can do about that + # here either, we can display the name of the offending test, which is a + # real help). + name = os_helper.TESTFN + if not os.path.exists(name): + return - if os.path.isdir(name): - import shutil - kind, nuker = "directory", shutil.rmtree - elif os.path.isfile(name): - kind, nuker = "file", os.unlink - else: - raise RuntimeError(f"os.path says {name!r} exists but is neither " - f"directory nor file") - - if verbose: - print_warning(f"{test_name} left behind {kind} {name!r}") - support.environment_altered = True - - try: - import stat - # fix possible permissions problems that might prevent cleanup - os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) - nuker(name) - except Exception as exc: - print_warning(f"{test_name} left behind {kind} {name!r} " - f"and it couldn't be removed: {exc}") + if os.path.isdir(name): + import shutil + kind, nuker = "directory", shutil.rmtree + elif os.path.isfile(name): + kind, nuker = "file", os.unlink + else: + raise RuntimeError(f"os.path says {name!r} exists but is neither " + f"directory nor file") + + if verbose: + print_warning(f"{test_name} left behind {kind} {name!r}") + support.environment_altered = True + + try: + import stat + # fix possible permissions problems that might prevent cleanup + os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + nuker(name) + except Exception as exc: + print_warning(f"{test_name} left behind {kind} {name!r} " + f"and it couldn't be removed: {exc}") |