summaryrefslogtreecommitdiffstats
path: root/Lib/test/libregrtest/runtest.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/libregrtest/runtest.py')
-rw-r--r--Lib/test/libregrtest/runtest.py234
1 files changed, 167 insertions, 67 deletions
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
index 6fa6069..6e3fab1 100644
--- a/Lib/test/libregrtest/runtest.py
+++ b/Lib/test/libregrtest/runtest.py
@@ -1,7 +1,6 @@
import dataclasses
import doctest
import faulthandler
-import functools
import gc
import importlib
import io
@@ -20,6 +19,10 @@ from test.libregrtest.save_env import saved_test_environment
from test.libregrtest.utils import clear_caches, format_duration, print_warning
+MatchTests = list[str]
+MatchTestsDict = dict[str, MatchTests]
+
+
# Avoid enum.Enum to reduce the number of imports when tests are run
class State:
PASSED = "PASSED"
@@ -56,6 +59,41 @@ class State:
State.MULTIPROCESSING_ERROR,
State.DID_NOT_RUN}
+ @staticmethod
+ def must_stop(state):
+ return state in {
+ State.INTERRUPTED,
+ State.MULTIPROCESSING_ERROR}
+
+
+# gh-90681: When rerunning tests, we might need to rerun the whole
+# class or module suite if some its life-cycle hooks fail.
+# Test level hooks are not affected.
+_TEST_LIFECYCLE_HOOKS = frozenset((
+ 'setUpClass', 'tearDownClass',
+ 'setUpModule', 'tearDownModule',
+))
+
+def normalize_test_name(test_full_name, *, is_error=False):
+ short_name = test_full_name.split(" ")[0]
+ if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
+ if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
+ # if setUpModule() or tearDownModule() failed, don't filter
+ # tests with the test file name, don't use use filters.
+ return None
+
+ # This means that we have a failure in a life-cycle hook,
+ # we need to rerun the whole module or class suite.
+ # Basically the error looks like this:
+ # ERROR: setUpClass (test.test_reg_ex.RegTest)
+ # or
+ # ERROR: setUpModule (test.test_reg_ex)
+ # So, we need to parse the class / module name.
+ lpar = test_full_name.index('(')
+ rpar = test_full_name.index(')')
+ return test_full_name[lpar + 1: rpar].split('.')[-1]
+ return short_name
+
@dataclasses.dataclass(slots=True)
class TestResult:
@@ -129,6 +167,58 @@ class TestResult:
if self.state is None or self.state == State.PASSED:
self.state = State.ENV_CHANGED
+ def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
+ if State.must_stop(self.state):
+ return True
+ if fail_fast and self.is_failed(fail_env_changed):
+ return True
+ return False
+
+ def get_rerun_match_tests(self):
+ match_tests = []
+
+ errors = self.errors or []
+ failures = self.failures or []
+ for error_list, is_error in (
+ (errors, True),
+ (failures, False),
+ ):
+ for full_name, *_ in error_list:
+ match_name = normalize_test_name(full_name, is_error=is_error)
+ if match_name is None:
+ # 'setUpModule (test.test_sys)': don't filter tests
+ return None
+ if not match_name:
+ error_type = "ERROR" if is_error else "FAIL"
+ print_warning(f"rerun failed to parse {error_type} test name: "
+ f"{full_name!r}: don't filter tests")
+ return None
+ match_tests.append(match_name)
+
+ return match_tests
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class RunTests:
+ tests: list[str]
+ match_tests: MatchTestsDict | None = None
+ rerun: bool = False
+ forever: bool = False
+
+ def get_match_tests(self, test_name) -> MatchTests | None:
+ if self.match_tests is not None:
+ return self.match_tests.get(test_name, None)
+ else:
+ return None
+
+ def iter_tests(self):
+ tests = tuple(self.tests)
+ if self.forever:
+ while True:
+ yield from tests
+ else:
+ yield from tests
+
# Minimum duration of a test to display its duration or to mention that
# the test is running in background
@@ -147,9 +237,6 @@ SPLITTESTDIRS = {
"test_multiprocessing_spawn",
}
-# Storage of uncollectable objects
-FOUND_GARBAGE = []
-
def findtestdir(path=None):
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
@@ -189,31 +276,41 @@ def split_test_packages(tests, *, testdir=None, exclude=(),
return splitted
-def get_abs_module(ns: Namespace, test_name: str) -> str:
- if test_name.startswith('test.') or ns.testdir:
+def abs_module_name(test_name: str, test_dir: str | None) -> str:
+ if test_name.startswith('test.') or test_dir:
return test_name
else:
# Import it from the test package
return 'test.' + test_name
-def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) -> None:
+def setup_support(ns: Namespace):
+ support.PGO = ns.pgo
+ support.PGO_EXTENDED = ns.pgo_extended
+ support.set_match_tests(ns.match_tests, ns.ignore_tests)
+ support.failfast = ns.failfast
+ support.verbose = ns.verbose
+ if ns.xmlpath:
+ support.junit_xml_list = []
+ else:
+ support.junit_xml_list = None
+
+
+def _runtest(result: TestResult, ns: Namespace) -> None:
# Capture stdout and stderr, set faulthandler timeout,
# and create JUnit XML report.
-
+ verbose = ns.verbose
output_on_failure = ns.verbose3
+ timeout = ns.timeout
use_timeout = (
- ns.timeout is not None and threading_helper.can_start_thread
+ timeout is not None and threading_helper.can_start_thread
)
if use_timeout:
- faulthandler.dump_traceback_later(ns.timeout, exit=True)
+ faulthandler.dump_traceback_later(timeout, exit=True)
try:
- support.set_match_tests(ns.match_tests, ns.ignore_tests)
- support.junit_xml_list = xml_list = [] if ns.xmlpath else None
- if ns.failfast:
- support.failfast = True
+ setup_support(ns)
if output_on_failure:
support.verbose = True
@@ -247,11 +344,10 @@ def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) ->
sys.stderr.flush()
else:
# Tell tests to be moderately quiet
- support.verbose = ns.verbose
-
- _runtest_env_changed_exc(result, ns,
- display_failure=not ns.verbose)
+ support.verbose = verbose
+ _runtest_env_changed_exc(result, ns, display_failure=not verbose)
+ xml_list = support.junit_xml_list
if xml_list:
import xml.etree.ElementTree as ET
result.xml_data = [ET.tostring(x).decode('us-ascii')
@@ -276,7 +372,7 @@ def runtest(ns: Namespace, test_name: str) -> TestResult:
start_time = time.perf_counter()
result = TestResult(test_name)
try:
- _runtest_capture_output_timeout_junit(result, ns)
+ _runtest(result, ns)
except:
if not ns.pgo:
msg = traceback.format_exc()
@@ -287,9 +383,9 @@ def runtest(ns: Namespace, test_name: str) -> TestResult:
return result
-def _test_module(the_module):
+def run_unittest(test_mod):
loader = unittest.TestLoader()
- tests = loader.loadTestsFromModule(the_module)
+ tests = loader.loadTestsFromModule(test_mod)
for error in loader.errors:
print(error, file=sys.stderr)
if loader.errors:
@@ -304,7 +400,6 @@ def save_env(ns: Namespace, test_name: str):
def regrtest_runner(result, test_func, ns) -> None:
# Run test_func(), collect statistics, and detect reference and memory
# leaks.
-
if ns.huntrleaks:
from test.libregrtest.refleak import dash_R
refleak, test_result = dash_R(ns, result.test_name, test_func)
@@ -332,24 +427,27 @@ def regrtest_runner(result, test_func, ns) -> None:
result.stats = stats
+# Storage of uncollectable objects
+FOUND_GARBAGE = []
+
+
def _load_run_test(result: TestResult, ns: Namespace) -> None:
# Load the test function, run the test function.
+ module_name = abs_module_name(result.test_name, ns.testdir)
- abstest = get_abs_module(ns, result.test_name)
-
- # remove the module from sys.module to reload it if it was already imported
- try:
- del sys.modules[abstest]
- except KeyError:
- pass
+ # Remove the module from sys.module to reload it if it was already imported
+ sys.modules.pop(module_name, None)
- the_module = importlib.import_module(abstest)
+ test_mod = importlib.import_module(module_name)
# If the test has a test_main, that will run the appropriate
- # tests. If not, use normal unittest test loading.
- test_func = getattr(the_module, "test_main", None)
- if test_func is None:
- test_func = functools.partial(_test_module, the_module)
+ # tests. If not, use normal unittest test runner.
+ test_main = getattr(test_mod, "test_main", None)
+ if test_main is not None:
+ test_func = test_main
+ else:
+ def test_func():
+ return run_unittest(test_mod)
try:
with save_env(ns, result.test_name):
@@ -361,12 +459,12 @@ def _load_run_test(result: TestResult, ns: Namespace) -> None:
# failures.
support.gc_collect()
- cleanup_test_droppings(result.test_name, ns.verbose)
+ remove_testfn(result.test_name, ns.verbose)
if gc.garbage:
support.environment_altered = True
print_warning(f"{result.test_name} created {len(gc.garbage)} "
- f"uncollectable object(s).")
+ f"uncollectable object(s)")
# move the uncollectable objects somewhere,
# so we don't see them again
@@ -444,35 +542,37 @@ def _runtest_env_changed_exc(result: TestResult, ns: Namespace,
result.state = State.PASSED
-def cleanup_test_droppings(test_name: str, verbose: int) -> None:
- # Try to clean up junk commonly left behind. While tests shouldn't leave
- # any files or directories behind, when a test fails that can be tedious
- # for it to arrange. The consequences can be especially nasty on Windows,
- # since if a test leaves a file open, it cannot be deleted by name (while
- # there's nothing we can do about that here either, we can display the
- # name of the offending test, which is a real help).
- for name in (os_helper.TESTFN,):
- if not os.path.exists(name):
- continue
+def remove_testfn(test_name: str, verbose: int) -> None:
+ # Try to clean up os_helper.TESTFN if left behind.
+ #
+ # While tests shouldn't leave any files or directories behind, when a test
+ # fails that can be tedious for it to arrange. The consequences can be
+ # especially nasty on Windows, since if a test leaves a file open, it
+ # cannot be deleted by name (while there's nothing we can do about that
+ # here either, we can display the name of the offending test, which is a
+ # real help).
+ name = os_helper.TESTFN
+ if not os.path.exists(name):
+ return
- if os.path.isdir(name):
- import shutil
- kind, nuker = "directory", shutil.rmtree
- elif os.path.isfile(name):
- kind, nuker = "file", os.unlink
- else:
- raise RuntimeError(f"os.path says {name!r} exists but is neither "
- f"directory nor file")
-
- if verbose:
- print_warning(f"{test_name} left behind {kind} {name!r}")
- support.environment_altered = True
-
- try:
- import stat
- # fix possible permissions problems that might prevent cleanup
- os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
- nuker(name)
- except Exception as exc:
- print_warning(f"{test_name} left behind {kind} {name!r} "
- f"and it couldn't be removed: {exc}")
+ if os.path.isdir(name):
+ import shutil
+ kind, nuker = "directory", shutil.rmtree
+ elif os.path.isfile(name):
+ kind, nuker = "file", os.unlink
+ else:
+ raise RuntimeError(f"os.path says {name!r} exists but is neither "
+ f"directory nor file")
+
+ if verbose:
+ print_warning(f"{test_name} left behind {kind} {name!r}")
+ support.environment_altered = True
+
+ try:
+ import stat
+ # fix possible permissions problems that might prevent cleanup
+ os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
+ nuker(name)
+ except Exception as exc:
+ print_warning(f"{test_name} left behind {kind} {name!r} "
+ f"and it couldn't be removed: {exc}")