summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-09-11 00:07:18 (GMT)
committerGitHub <noreply@github.com>2023-09-11 00:07:18 (GMT)
commit1ec45378e97a2d3812ed8bc18e1bd1eb8ddcc9a0 (patch)
treeba25a061da1e444e058c1ef12a1a058f8390ca27
parenta939b65aa63e2cae1eec7d27e1dc56324aee01d7 (diff)
downloadcpython-1ec45378e97a2d3812ed8bc18e1bd1eb8ddcc9a0.zip
cpython-1ec45378e97a2d3812ed8bc18e1bd1eb8ddcc9a0.tar.gz
cpython-1ec45378e97a2d3812ed8bc18e1bd1eb8ddcc9a0.tar.bz2
gh-109162: libregrtest: add single.py and result.py (#109243)
* Add single.py and result.py files. * Rename runtest.py to runtests.py. * Move run_single_test() function and its helper functions to single.py. * Move remove_testfn(), abs_module_name() and normalize_test_name() to utils.py. * Move setup_support() to setup.py. * Move type hints like TestName to utils.py. * Rename runtest.py to runtests.py.
-rw-r--r--Lib/test/libregrtest/findtests.py58
-rw-r--r--Lib/test/libregrtest/logger.py2
-rw-r--r--Lib/test/libregrtest/main.py14
-rw-r--r--Lib/test/libregrtest/refleak.py5
-rw-r--r--Lib/test/libregrtest/result.py184
-rw-r--r--Lib/test/libregrtest/results.py8
-rw-r--r--Lib/test/libregrtest/runtest.py676
-rw-r--r--Lib/test/libregrtest/runtest_mp.py9
-rw-r--r--Lib/test/libregrtest/runtests.py83
-rw-r--r--Lib/test/libregrtest/setup.py13
-rw-r--r--Lib/test/libregrtest/single.py275
-rw-r--r--Lib/test/libregrtest/utils.py82
-rw-r--r--Lib/test/libregrtest/worker.py8
-rw-r--r--Lib/test/test_regrtest.py2
14 files changed, 722 insertions, 697 deletions
diff --git a/Lib/test/libregrtest/findtests.py b/Lib/test/libregrtest/findtests.py
new file mode 100644
index 0000000..88570be
--- /dev/null
+++ b/Lib/test/libregrtest/findtests.py
@@ -0,0 +1,58 @@
+import os
+
+from test.libregrtest.utils import StrPath, TestName, TestList
+
+
+# If these test directories are encountered recurse into them and treat each
+# "test_*.py" file or each sub-directory as a separate test module. This can
+# increase parallelism.
+#
+# Beware this can't generally be done for any directory with sub-tests as the
+# __init__.py may do things which alter what tests are to be run.
+SPLITTESTDIRS: set[TestName] = {
+ "test_asyncio",
+ "test_concurrent_futures",
+ "test_multiprocessing_fork",
+ "test_multiprocessing_forkserver",
+ "test_multiprocessing_spawn",
+}
+
+
+def findtestdir(path=None):
+ return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
+
+
+def findtests(*, testdir: StrPath | None = None, exclude=(),
+ split_test_dirs: set[TestName] = SPLITTESTDIRS,
+ base_mod: str = "") -> TestList:
+ """Return a list of all applicable test modules."""
+ testdir = findtestdir(testdir)
+ tests = []
+ for name in os.listdir(testdir):
+ mod, ext = os.path.splitext(name)
+ if (not mod.startswith("test_")) or (mod in exclude):
+ continue
+ if mod in split_test_dirs:
+ subdir = os.path.join(testdir, mod)
+ mod = f"{base_mod or 'test'}.{mod}"
+ tests.extend(findtests(testdir=subdir, exclude=exclude,
+ split_test_dirs=split_test_dirs,
+ base_mod=mod))
+ elif ext in (".py", ""):
+ tests.append(f"{base_mod}.{mod}" if base_mod else mod)
+ return sorted(tests)
+
+
+def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(),
+ split_test_dirs=SPLITTESTDIRS):
+ testdir = findtestdir(testdir)
+ splitted = []
+ for name in tests:
+ if name in split_test_dirs:
+ subdir = os.path.join(testdir, name)
+ splitted.extend(findtests(testdir=subdir, exclude=exclude,
+ split_test_dirs=split_test_dirs,
+ base_mod=name))
+ else:
+ splitted.append(name)
+ return splitted
diff --git a/Lib/test/libregrtest/logger.py b/Lib/test/libregrtest/logger.py
index c4498a4..05b9307 100644
--- a/Lib/test/libregrtest/logger.py
+++ b/Lib/test/libregrtest/logger.py
@@ -1,7 +1,7 @@
import os
import time
-from test.libregrtest.runtest import RunTests
+from test.libregrtest.runtests import RunTests
from test.libregrtest.utils import print_warning, MS_WINDOWS
if MS_WINDOWS:
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index ed0813d..0227a2d 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -11,17 +11,19 @@ from test import support
from test.support import os_helper
from test.libregrtest.cmdline import _parse_args, Namespace
+from test.libregrtest.findtests import findtests, split_test_packages
from test.libregrtest.logger import Logger
-from test.libregrtest.runtest import (
- findtests, split_test_packages, run_single_test, abs_module_name,
- PROGRESS_MIN_TIME, State, RunTests, HuntRefleak,
- FilterTuple, TestList, StrJSON, TestName)
+from test.libregrtest.result import State
+from test.libregrtest.runtests import RunTests, HuntRefleak
from test.libregrtest.setup import setup_tests, setup_test_dir
+from test.libregrtest.single import run_single_test, PROGRESS_MIN_TIME
from test.libregrtest.pgo import setup_pgo_tests
from test.libregrtest.results import TestResults
from test.libregrtest.utils import (
- strip_py_suffix, count, format_duration, StrPath,
- printlist, get_build_info, get_temp_dir, get_work_dir, exit_timeout)
+ StrPath, StrJSON, TestName, TestList, FilterTuple,
+ strip_py_suffix, count, format_duration,
+ printlist, get_build_info, get_temp_dir, get_work_dir, exit_timeout,
+ abs_module_name)
class Regrtest:
diff --git a/Lib/test/libregrtest/refleak.py b/Lib/test/libregrtest/refleak.py
index 81f163c..6b1d708 100644
--- a/Lib/test/libregrtest/refleak.py
+++ b/Lib/test/libregrtest/refleak.py
@@ -1,10 +1,11 @@
-import os
import sys
import warnings
from inspect import isabstract
+
from test import support
from test.support import os_helper
-from test.libregrtest.runtest import HuntRefleak
+
+from test.libregrtest.runtests import HuntRefleak
from test.libregrtest.utils import clear_caches
try:
diff --git a/Lib/test/libregrtest/result.py b/Lib/test/libregrtest/result.py
new file mode 100644
index 0000000..4a68872
--- /dev/null
+++ b/Lib/test/libregrtest/result.py
@@ -0,0 +1,184 @@
+import dataclasses
+import json
+from typing import Any
+
+from test.support import TestStats
+
+from test.libregrtest.utils import (
+ TestName, FilterTuple,
+ format_duration, normalize_test_name, print_warning)
+
+
+# Avoid enum.Enum to reduce the number of imports when tests are run
+class State:
+ PASSED = "PASSED"
+ FAILED = "FAILED"
+ SKIPPED = "SKIPPED"
+ UNCAUGHT_EXC = "UNCAUGHT_EXC"
+ REFLEAK = "REFLEAK"
+ ENV_CHANGED = "ENV_CHANGED"
+ RESOURCE_DENIED = "RESOURCE_DENIED"
+ INTERRUPTED = "INTERRUPTED"
+ MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
+ DID_NOT_RUN = "DID_NOT_RUN"
+ TIMEOUT = "TIMEOUT"
+
+ @staticmethod
+ def is_failed(state):
+ return state in {
+ State.FAILED,
+ State.UNCAUGHT_EXC,
+ State.REFLEAK,
+ State.MULTIPROCESSING_ERROR,
+ State.TIMEOUT}
+
+ @staticmethod
+ def has_meaningful_duration(state):
+ # Consider that the duration is meaningless for these cases.
+ # For example, if a whole test file is skipped, its duration
+ # is unlikely to be the duration of executing its tests,
+ # but just the duration to execute code which skips the test.
+ return state not in {
+ State.SKIPPED,
+ State.RESOURCE_DENIED,
+ State.INTERRUPTED,
+ State.MULTIPROCESSING_ERROR,
+ State.DID_NOT_RUN}
+
+ @staticmethod
+ def must_stop(state):
+ return state in {
+ State.INTERRUPTED,
+ State.MULTIPROCESSING_ERROR}
+
+
+@dataclasses.dataclass(slots=True)
+class TestResult:
+ test_name: TestName
+ state: str | None = None
+ # Test duration in seconds
+ duration: float | None = None
+ xml_data: list[str] | None = None
+ stats: TestStats | None = None
+
+ # errors and failures copied from support.TestFailedWithDetails
+ errors: list[tuple[str, str]] | None = None
+ failures: list[tuple[str, str]] | None = None
+
+ def is_failed(self, fail_env_changed: bool) -> bool:
+ if self.state == State.ENV_CHANGED:
+ return fail_env_changed
+ return State.is_failed(self.state)
+
+ def _format_failed(self):
+ if self.errors and self.failures:
+ le = len(self.errors)
+ lf = len(self.failures)
+ error_s = "error" + ("s" if le > 1 else "")
+ failure_s = "failure" + ("s" if lf > 1 else "")
+ return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
+
+ if self.errors:
+ le = len(self.errors)
+ error_s = "error" + ("s" if le > 1 else "")
+ return f"{self.test_name} failed ({le} {error_s})"
+
+ if self.failures:
+ lf = len(self.failures)
+ failure_s = "failure" + ("s" if lf > 1 else "")
+ return f"{self.test_name} failed ({lf} {failure_s})"
+
+ return f"{self.test_name} failed"
+
+ def __str__(self) -> str:
+ match self.state:
+ case State.PASSED:
+ return f"{self.test_name} passed"
+ case State.FAILED:
+ return self._format_failed()
+ case State.SKIPPED:
+ return f"{self.test_name} skipped"
+ case State.UNCAUGHT_EXC:
+ return f"{self.test_name} failed (uncaught exception)"
+ case State.REFLEAK:
+ return f"{self.test_name} failed (reference leak)"
+ case State.ENV_CHANGED:
+ return f"{self.test_name} failed (env changed)"
+ case State.RESOURCE_DENIED:
+ return f"{self.test_name} skipped (resource denied)"
+ case State.INTERRUPTED:
+ return f"{self.test_name} interrupted"
+ case State.MULTIPROCESSING_ERROR:
+ return f"{self.test_name} process crashed"
+ case State.DID_NOT_RUN:
+ return f"{self.test_name} ran no tests"
+ case State.TIMEOUT:
+ return f"{self.test_name} timed out ({format_duration(self.duration)})"
+ case _:
+ raise ValueError("unknown result state: {state!r}")
+
+ def has_meaningful_duration(self):
+ return State.has_meaningful_duration(self.state)
+
+ def set_env_changed(self):
+ if self.state is None or self.state == State.PASSED:
+ self.state = State.ENV_CHANGED
+
+ def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
+ if State.must_stop(self.state):
+ return True
+ if fail_fast and self.is_failed(fail_env_changed):
+ return True
+ return False
+
+ def get_rerun_match_tests(self) -> FilterTuple | None:
+ match_tests = []
+
+ errors = self.errors or []
+ failures = self.failures or []
+ for error_list, is_error in (
+ (errors, True),
+ (failures, False),
+ ):
+ for full_name, *_ in error_list:
+ match_name = normalize_test_name(full_name, is_error=is_error)
+ if match_name is None:
+ # 'setUpModule (test.test_sys)': don't filter tests
+ return None
+ if not match_name:
+ error_type = "ERROR" if is_error else "FAIL"
+ print_warning(f"rerun failed to parse {error_type} test name: "
+ f"{full_name!r}: don't filter tests")
+ return None
+ match_tests.append(match_name)
+
+ if not match_tests:
+ return None
+ return tuple(match_tests)
+
+ def write_json(self, file) -> None:
+ json.dump(self, file, cls=_EncodeTestResult)
+
+ @staticmethod
+ def from_json(worker_json) -> 'TestResult':
+ return json.loads(worker_json, object_hook=_decode_test_result)
+
+
+class _EncodeTestResult(json.JSONEncoder):
+ def default(self, o: Any) -> dict[str, Any]:
+ if isinstance(o, TestResult):
+ result = dataclasses.asdict(o)
+ result["__test_result__"] = o.__class__.__name__
+ return result
+ else:
+ return super().default(o)
+
+
+def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]:
+ if "__test_result__" in data:
+ data.pop('__test_result__')
+ if data['stats'] is not None:
+ data['stats'] = TestStats(**data['stats'])
+ return TestResult(**data)
+ else:
+ return data
diff --git a/Lib/test/libregrtest/results.py b/Lib/test/libregrtest/results.py
index e443019..b7a044e 100644
--- a/Lib/test/libregrtest/results.py
+++ b/Lib/test/libregrtest/results.py
@@ -1,11 +1,11 @@
import sys
from test.support import TestStats
-from test.libregrtest.runtest import (
- TestName, TestTuple, TestList, FilterDict, State,
- TestResult, RunTests)
+from test.libregrtest.runtests import RunTests
+from test.libregrtest.result import State, TestResult
from test.libregrtest.utils import (
- printlist, count, format_duration, StrPath)
+ StrPath, TestName, TestTuple, TestList, FilterDict,
+ printlist, count, format_duration)
EXITCODE_BAD_TEST = 2
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
deleted file mode 100644
index a12c7fc..0000000
--- a/Lib/test/libregrtest/runtest.py
+++ /dev/null
@@ -1,676 +0,0 @@
-import dataclasses
-import doctest
-import faulthandler
-import gc
-import importlib
-import io
-import json
-import os
-import sys
-import time
-import traceback
-import unittest
-from typing import Any
-
-from test import support
-from test.support import TestStats
-from test.support import os_helper
-from test.support import threading_helper
-from test.libregrtest.save_env import saved_test_environment
-from test.libregrtest.utils import (
- clear_caches, format_duration, print_warning, StrPath)
-
-
-StrJSON = str
-TestName = str
-TestTuple = tuple[TestName, ...]
-TestList = list[TestName]
-
-# --match and --ignore options: list of patterns
-# ('*' joker character can be used)
-FilterTuple = tuple[TestName, ...]
-FilterDict = dict[TestName, FilterTuple]
-
-
-@dataclasses.dataclass(slots=True, frozen=True)
-class HuntRefleak:
- warmups: int
- runs: int
- filename: StrPath
-
-
-# Avoid enum.Enum to reduce the number of imports when tests are run
-class State:
- PASSED = "PASSED"
- FAILED = "FAILED"
- SKIPPED = "SKIPPED"
- UNCAUGHT_EXC = "UNCAUGHT_EXC"
- REFLEAK = "REFLEAK"
- ENV_CHANGED = "ENV_CHANGED"
- RESOURCE_DENIED = "RESOURCE_DENIED"
- INTERRUPTED = "INTERRUPTED"
- MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
- DID_NOT_RUN = "DID_NOT_RUN"
- TIMEOUT = "TIMEOUT"
-
- @staticmethod
- def is_failed(state):
- return state in {
- State.FAILED,
- State.UNCAUGHT_EXC,
- State.REFLEAK,
- State.MULTIPROCESSING_ERROR,
- State.TIMEOUT}
-
- @staticmethod
- def has_meaningful_duration(state):
- # Consider that the duration is meaningless for these cases.
- # For example, if a whole test file is skipped, its duration
- # is unlikely to be the duration of executing its tests,
- # but just the duration to execute code which skips the test.
- return state not in {
- State.SKIPPED,
- State.RESOURCE_DENIED,
- State.INTERRUPTED,
- State.MULTIPROCESSING_ERROR,
- State.DID_NOT_RUN}
-
- @staticmethod
- def must_stop(state):
- return state in {
- State.INTERRUPTED,
- State.MULTIPROCESSING_ERROR}
-
-
-# gh-90681: When rerunning tests, we might need to rerun the whole
-# class or module suite if some its life-cycle hooks fail.
-# Test level hooks are not affected.
-_TEST_LIFECYCLE_HOOKS = frozenset((
- 'setUpClass', 'tearDownClass',
- 'setUpModule', 'tearDownModule',
-))
-
-def normalize_test_name(test_full_name, *, is_error=False):
- short_name = test_full_name.split(" ")[0]
- if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
- if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
- # if setUpModule() or tearDownModule() failed, don't filter
- # tests with the test file name, don't use use filters.
- return None
-
- # This means that we have a failure in a life-cycle hook,
- # we need to rerun the whole module or class suite.
- # Basically the error looks like this:
- # ERROR: setUpClass (test.test_reg_ex.RegTest)
- # or
- # ERROR: setUpModule (test.test_reg_ex)
- # So, we need to parse the class / module name.
- lpar = test_full_name.index('(')
- rpar = test_full_name.index(')')
- return test_full_name[lpar + 1: rpar].split('.')[-1]
- return short_name
-
-
-@dataclasses.dataclass(slots=True)
-class TestResult:
- test_name: TestName
- state: str | None = None
- # Test duration in seconds
- duration: float | None = None
- xml_data: list[str] | None = None
- stats: TestStats | None = None
-
- # errors and failures copied from support.TestFailedWithDetails
- errors: list[tuple[str, str]] | None = None
- failures: list[tuple[str, str]] | None = None
-
- def is_failed(self, fail_env_changed: bool) -> bool:
- if self.state == State.ENV_CHANGED:
- return fail_env_changed
- return State.is_failed(self.state)
-
- def _format_failed(self):
- if self.errors and self.failures:
- le = len(self.errors)
- lf = len(self.failures)
- error_s = "error" + ("s" if le > 1 else "")
- failure_s = "failure" + ("s" if lf > 1 else "")
- return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
-
- if self.errors:
- le = len(self.errors)
- error_s = "error" + ("s" if le > 1 else "")
- return f"{self.test_name} failed ({le} {error_s})"
-
- if self.failures:
- lf = len(self.failures)
- failure_s = "failure" + ("s" if lf > 1 else "")
- return f"{self.test_name} failed ({lf} {failure_s})"
-
- return f"{self.test_name} failed"
-
- def __str__(self) -> str:
- match self.state:
- case State.PASSED:
- return f"{self.test_name} passed"
- case State.FAILED:
- return self._format_failed()
- case State.SKIPPED:
- return f"{self.test_name} skipped"
- case State.UNCAUGHT_EXC:
- return f"{self.test_name} failed (uncaught exception)"
- case State.REFLEAK:
- return f"{self.test_name} failed (reference leak)"
- case State.ENV_CHANGED:
- return f"{self.test_name} failed (env changed)"
- case State.RESOURCE_DENIED:
- return f"{self.test_name} skipped (resource denied)"
- case State.INTERRUPTED:
- return f"{self.test_name} interrupted"
- case State.MULTIPROCESSING_ERROR:
- return f"{self.test_name} process crashed"
- case State.DID_NOT_RUN:
- return f"{self.test_name} ran no tests"
- case State.TIMEOUT:
- return f"{self.test_name} timed out ({format_duration(self.duration)})"
- case _:
- raise ValueError("unknown result state: {state!r}")
-
- def has_meaningful_duration(self):
- return State.has_meaningful_duration(self.state)
-
- def set_env_changed(self):
- if self.state is None or self.state == State.PASSED:
- self.state = State.ENV_CHANGED
-
- def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
- if State.must_stop(self.state):
- return True
- if fail_fast and self.is_failed(fail_env_changed):
- return True
- return False
-
- def get_rerun_match_tests(self) -> FilterTuple | None:
- match_tests = []
-
- errors = self.errors or []
- failures = self.failures or []
- for error_list, is_error in (
- (errors, True),
- (failures, False),
- ):
- for full_name, *_ in error_list:
- match_name = normalize_test_name(full_name, is_error=is_error)
- if match_name is None:
- # 'setUpModule (test.test_sys)': don't filter tests
- return None
- if not match_name:
- error_type = "ERROR" if is_error else "FAIL"
- print_warning(f"rerun failed to parse {error_type} test name: "
- f"{full_name!r}: don't filter tests")
- return None
- match_tests.append(match_name)
-
- if not match_tests:
- return None
- return tuple(match_tests)
-
- def write_json(self, file) -> None:
- json.dump(self, file, cls=_EncodeTestResult)
-
- @staticmethod
- def from_json(worker_json) -> 'TestResult':
- return json.loads(worker_json, object_hook=_decode_test_result)
-
-
-class _EncodeTestResult(json.JSONEncoder):
- def default(self, o: Any) -> dict[str, Any]:
- if isinstance(o, TestResult):
- result = dataclasses.asdict(o)
- result["__test_result__"] = o.__class__.__name__
- return result
- else:
- return super().default(o)
-
-
-def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]:
- if "__test_result__" in data:
- data.pop('__test_result__')
- if data['stats'] is not None:
- data['stats'] = TestStats(**data['stats'])
- return TestResult(**data)
- else:
- return data
-
-
-@dataclasses.dataclass(slots=True, frozen=True)
-class RunTests:
- tests: TestTuple
- fail_fast: bool = False
- fail_env_changed: bool = False
- match_tests: FilterTuple | None = None
- ignore_tests: FilterTuple | None = None
- match_tests_dict: FilterDict | None = None
- rerun: bool = False
- forever: bool = False
- pgo: bool = False
- pgo_extended: bool = False
- output_on_failure: bool = False
- timeout: float | None = None
- verbose: bool = False
- quiet: bool = False
- hunt_refleak: HuntRefleak | None = None
- test_dir: StrPath | None = None
- use_junit: bool = False
- memory_limit: str | None = None
- gc_threshold: int | None = None
- use_resources: list[str] = None
- python_cmd: list[str] | None = None
-
- def copy(self, **override):
- state = dataclasses.asdict(self)
- state.update(override)
- return RunTests(**state)
-
- def get_match_tests(self, test_name) -> FilterTuple | None:
- if self.match_tests_dict is not None:
- return self.match_tests_dict.get(test_name, None)
- else:
- return None
-
- def iter_tests(self):
- if self.forever:
- while True:
- yield from self.tests
- else:
- yield from self.tests
-
- def as_json(self) -> StrJSON:
- return json.dumps(self, cls=_EncodeRunTests)
-
- @staticmethod
- def from_json(worker_json: StrJSON) -> 'RunTests':
- return json.loads(worker_json, object_hook=_decode_runtests)
-
-
-class _EncodeRunTests(json.JSONEncoder):
- def default(self, o: Any) -> dict[str, Any]:
- if isinstance(o, RunTests):
- result = dataclasses.asdict(o)
- result["__runtests__"] = True
- return result
- else:
- return super().default(o)
-
-
-def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]:
- if "__runtests__" in data:
- data.pop('__runtests__')
- if data['hunt_refleak']:
- data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak'])
- return RunTests(**data)
- else:
- return data
-
-
-# Minimum duration of a test to display its duration or to mention that
-# the test is running in background
-PROGRESS_MIN_TIME = 30.0 # seconds
-
-#If these test directories are encountered recurse into them and treat each
-# test_ .py or dir as a separate test module. This can increase parallelism.
-# Beware this can't generally be done for any directory with sub-tests as the
-# __init__.py may do things which alter what tests are to be run.
-
-SPLITTESTDIRS: set[TestName] = {
- "test_asyncio",
- "test_concurrent_futures",
- "test_multiprocessing_fork",
- "test_multiprocessing_forkserver",
- "test_multiprocessing_spawn",
-}
-
-
-def findtestdir(path=None):
- return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
-
-
-def findtests(*, testdir: StrPath | None = None, exclude=(),
- split_test_dirs: set[TestName] = SPLITTESTDIRS,
- base_mod: str = "") -> TestList:
- """Return a list of all applicable test modules."""
- testdir = findtestdir(testdir)
- tests = []
- for name in os.listdir(testdir):
- mod, ext = os.path.splitext(name)
- if (not mod.startswith("test_")) or (mod in exclude):
- continue
- if mod in split_test_dirs:
- subdir = os.path.join(testdir, mod)
- mod = f"{base_mod or 'test'}.{mod}"
- tests.extend(findtests(testdir=subdir, exclude=exclude,
- split_test_dirs=split_test_dirs,
- base_mod=mod))
- elif ext in (".py", ""):
- tests.append(f"{base_mod}.{mod}" if base_mod else mod)
- return sorted(tests)
-
-
-def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(),
- split_test_dirs=SPLITTESTDIRS):
- testdir = findtestdir(testdir)
- splitted = []
- for name in tests:
- if name in split_test_dirs:
- subdir = os.path.join(testdir, name)
- splitted.extend(findtests(testdir=subdir, exclude=exclude,
- split_test_dirs=split_test_dirs,
- base_mod=name))
- else:
- splitted.append(name)
- return splitted
-
-
-def abs_module_name(test_name: TestName, test_dir: StrPath | None) -> TestName:
- if test_name.startswith('test.') or test_dir:
- return test_name
- else:
- # Import it from the test package
- return 'test.' + test_name
-
-
-def setup_support(runtests: RunTests):
- support.PGO = runtests.pgo
- support.PGO_EXTENDED = runtests.pgo_extended
- support.set_match_tests(runtests.match_tests, runtests.ignore_tests)
- support.failfast = runtests.fail_fast
- support.verbose = runtests.verbose
- if runtests.use_junit:
- support.junit_xml_list = []
- else:
- support.junit_xml_list = None
-
-
-def _runtest(result: TestResult, runtests: RunTests) -> None:
- # Capture stdout and stderr, set faulthandler timeout,
- # and create JUnit XML report.
- verbose = runtests.verbose
- output_on_failure = runtests.output_on_failure
- timeout = runtests.timeout
-
- use_timeout = (
- timeout is not None and threading_helper.can_start_thread
- )
- if use_timeout:
- faulthandler.dump_traceback_later(timeout, exit=True)
-
- try:
- setup_support(runtests)
-
- if output_on_failure:
- support.verbose = True
-
- stream = io.StringIO()
- orig_stdout = sys.stdout
- orig_stderr = sys.stderr
- print_warning = support.print_warning
- orig_print_warnings_stderr = print_warning.orig_stderr
-
- output = None
- try:
- sys.stdout = stream
- sys.stderr = stream
- # print_warning() writes into the temporary stream to preserve
- # messages order. If support.environment_altered becomes true,
- # warnings will be written to sys.stderr below.
- print_warning.orig_stderr = stream
-
- _runtest_env_changed_exc(result, runtests, display_failure=False)
- # Ignore output if the test passed successfully
- if result.state != State.PASSED:
- output = stream.getvalue()
- finally:
- sys.stdout = orig_stdout
- sys.stderr = orig_stderr
- print_warning.orig_stderr = orig_print_warnings_stderr
-
- if output is not None:
- sys.stderr.write(output)
- sys.stderr.flush()
- else:
- # Tell tests to be moderately quiet
- support.verbose = verbose
- _runtest_env_changed_exc(result, runtests,
- display_failure=not verbose)
-
- xml_list = support.junit_xml_list
- if xml_list:
- import xml.etree.ElementTree as ET
- result.xml_data = [ET.tostring(x).decode('us-ascii')
- for x in xml_list]
- finally:
- if use_timeout:
- faulthandler.cancel_dump_traceback_later()
- support.junit_xml_list = None
-
-
-def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult:
- """Run a single test.
-
- test_name -- the name of the test
-
- Returns a TestResult.
-
- If runtests.use_junit, xml_data is a list containing each generated
- testsuite element.
- """
- start_time = time.perf_counter()
- result = TestResult(test_name)
- pgo = runtests.pgo
- try:
- _runtest(result, runtests)
- except:
- if not pgo:
- msg = traceback.format_exc()
- print(f"test {test_name} crashed -- {msg}",
- file=sys.stderr, flush=True)
- result.state = State.UNCAUGHT_EXC
- result.duration = time.perf_counter() - start_time
- return result
-
-
-def run_unittest(test_mod):
- loader = unittest.TestLoader()
- tests = loader.loadTestsFromModule(test_mod)
- for error in loader.errors:
- print(error, file=sys.stderr)
- if loader.errors:
- raise Exception("errors while loading tests")
- return support.run_unittest(tests)
-
-
-def save_env(test_name: TestName, runtests: RunTests):
- return saved_test_environment(test_name, runtests.verbose, runtests.quiet,
- pgo=runtests.pgo)
-
-
-def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None:
- # Run test_func(), collect statistics, and detect reference and memory
- # leaks.
- if runtests.hunt_refleak:
- from test.libregrtest.refleak import runtest_refleak
- refleak, test_result = runtest_refleak(result.test_name, test_func,
- runtests.hunt_refleak,
- runtests.quiet)
- else:
- test_result = test_func()
- refleak = False
-
- if refleak:
- result.state = State.REFLEAK
-
- match test_result:
- case TestStats():
- stats = test_result
- case unittest.TestResult():
- stats = TestStats.from_unittest(test_result)
- case doctest.TestResults():
- stats = TestStats.from_doctest(test_result)
- case None:
- print_warning(f"{result.test_name} test runner returned None: {test_func}")
- stats = None
- case _:
- print_warning(f"Unknown test result type: {type(test_result)}")
- stats = None
-
- result.stats = stats
-
-
-# Storage of uncollectable objects
-FOUND_GARBAGE = []
-
-
-def _load_run_test(result: TestResult, runtests: RunTests) -> None:
- # Load the test function, run the test function.
- module_name = abs_module_name(result.test_name, runtests.test_dir)
-
- # Remove the module from sys.module to reload it if it was already imported
- sys.modules.pop(module_name, None)
-
- test_mod = importlib.import_module(module_name)
-
- if hasattr(test_mod, "test_main"):
- # https://github.com/python/cpython/issues/89392
- raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest")
- def test_func():
- return run_unittest(test_mod)
-
- try:
- with save_env(result.test_name, runtests):
- regrtest_runner(result, test_func, runtests)
- finally:
- # First kill any dangling references to open files etc.
- # This can also issue some ResourceWarnings which would otherwise get
- # triggered during the following test run, and possibly produce
- # failures.
- support.gc_collect()
-
- remove_testfn(result.test_name, runtests.verbose)
-
- if gc.garbage:
- support.environment_altered = True
- print_warning(f"{result.test_name} created {len(gc.garbage)} "
- f"uncollectable object(s)")
-
- # move the uncollectable objects somewhere,
- # so we don't see them again
- FOUND_GARBAGE.extend(gc.garbage)
- gc.garbage.clear()
-
- support.reap_children()
-
-
-def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
- display_failure: bool = True) -> None:
- # Detect environment changes, handle exceptions.
-
- # Reset the environment_altered flag to detect if a test altered
- # the environment
- support.environment_altered = False
-
- pgo = runtests.pgo
- if pgo:
- display_failure = False
- quiet = runtests.quiet
-
- test_name = result.test_name
- try:
- clear_caches()
- support.gc_collect()
-
- with save_env(test_name, runtests):
- _load_run_test(result, runtests)
- except support.ResourceDenied as msg:
- if not quiet and not pgo:
- print(f"{test_name} skipped -- {msg}", flush=True)
- result.state = State.RESOURCE_DENIED
- return
- except unittest.SkipTest as msg:
- if not quiet and not pgo:
- print(f"{test_name} skipped -- {msg}", flush=True)
- result.state = State.SKIPPED
- return
- except support.TestFailedWithDetails as exc:
- msg = f"test {test_name} failed"
- if display_failure:
- msg = f"{msg} -- {exc}"
- print(msg, file=sys.stderr, flush=True)
- result.state = State.FAILED
- result.errors = exc.errors
- result.failures = exc.failures
- result.stats = exc.stats
- return
- except support.TestFailed as exc:
- msg = f"test {test_name} failed"
- if display_failure:
- msg = f"{msg} -- {exc}"
- print(msg, file=sys.stderr, flush=True)
- result.state = State.FAILED
- result.stats = exc.stats
- return
- except support.TestDidNotRun:
- result.state = State.DID_NOT_RUN
- return
- except KeyboardInterrupt:
- print()
- result.state = State.INTERRUPTED
- return
- except:
- if not pgo:
- msg = traceback.format_exc()
- print(f"test {test_name} crashed -- {msg}",
- file=sys.stderr, flush=True)
- result.state = State.UNCAUGHT_EXC
- return
-
- if support.environment_altered:
- result.set_env_changed()
- # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
- if result.state is None:
- result.state = State.PASSED
-
-
-def remove_testfn(test_name: TestName, verbose: int) -> None:
- # Try to clean up os_helper.TESTFN if left behind.
- #
- # While tests shouldn't leave any files or directories behind, when a test
- # fails that can be tedious for it to arrange. The consequences can be
- # especially nasty on Windows, since if a test leaves a file open, it
- # cannot be deleted by name (while there's nothing we can do about that
- # here either, we can display the name of the offending test, which is a
- # real help).
- name = os_helper.TESTFN
- if not os.path.exists(name):
- return
-
- if os.path.isdir(name):
- import shutil
- kind, nuker = "directory", shutil.rmtree
- elif os.path.isfile(name):
- kind, nuker = "file", os.unlink
- else:
- raise RuntimeError(f"os.path says {name!r} exists but is neither "
- f"directory nor file")
-
- if verbose:
- print_warning(f"{test_name} left behind {kind} {name!r}")
- support.environment_altered = True
-
- try:
- import stat
- # fix possible permissions problems that might prevent cleanup
- os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
- nuker(name)
- except Exception as exc:
- print_warning(f"{test_name} left behind {kind} {name!r} "
- f"and it couldn't be removed: {exc}")
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
index c1bd911..f576d49 100644
--- a/Lib/test/libregrtest/runtest_mp.py
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -15,12 +15,13 @@ from test import support
from test.support import os_helper
from test.libregrtest.main import Regrtest
-from test.libregrtest.runtest import (
- TestResult, State, PROGRESS_MIN_TIME,
- RunTests, TestName)
+from test.libregrtest.result import TestResult, State
from test.libregrtest.results import TestResults
+from test.libregrtest.runtests import RunTests
+from test.libregrtest.single import PROGRESS_MIN_TIME
from test.libregrtest.utils import (
- format_duration, print_warning, StrPath)
+ StrPath, TestName,
+ format_duration, print_warning)
from test.libregrtest.worker import create_worker_process, USE_PROCESS_GROUP
if sys.platform == 'win32':
diff --git a/Lib/test/libregrtest/runtests.py b/Lib/test/libregrtest/runtests.py
new file mode 100644
index 0000000..366c6f1
--- /dev/null
+++ b/Lib/test/libregrtest/runtests.py
@@ -0,0 +1,83 @@
+import dataclasses
+import json
+from typing import Any
+
+from test.libregrtest.utils import (
+ StrPath, StrJSON, TestTuple, FilterTuple, FilterDict)
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class HuntRefleak:
+ warmups: int
+ runs: int
+ filename: StrPath
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class RunTests:
+ tests: TestTuple
+ fail_fast: bool = False
+ fail_env_changed: bool = False
+ match_tests: FilterTuple | None = None
+ ignore_tests: FilterTuple | None = None
+ match_tests_dict: FilterDict | None = None
+ rerun: bool = False
+ forever: bool = False
+ pgo: bool = False
+ pgo_extended: bool = False
+ output_on_failure: bool = False
+ timeout: float | None = None
+ verbose: bool = False
+ quiet: bool = False
+ hunt_refleak: HuntRefleak | None = None
+ test_dir: StrPath | None = None
+ use_junit: bool = False
+ memory_limit: str | None = None
+ gc_threshold: int | None = None
+ use_resources: list[str] = None
+ python_cmd: list[str] | None = None
+
+ def copy(self, **override):
+ state = dataclasses.asdict(self)
+ state.update(override)
+ return RunTests(**state)
+
+ def get_match_tests(self, test_name) -> FilterTuple | None:
+ if self.match_tests_dict is not None:
+ return self.match_tests_dict.get(test_name, None)
+ else:
+ return None
+
+ def iter_tests(self):
+ if self.forever:
+ while True:
+ yield from self.tests
+ else:
+ yield from self.tests
+
+ def as_json(self) -> StrJSON:
+ return json.dumps(self, cls=_EncodeRunTests)
+
+ @staticmethod
+ def from_json(worker_json: StrJSON) -> 'RunTests':
+ return json.loads(worker_json, object_hook=_decode_runtests)
+
+
+class _EncodeRunTests(json.JSONEncoder):
+ def default(self, o: Any) -> dict[str, Any]:
+ if isinstance(o, RunTests):
+ result = dataclasses.asdict(o)
+ result["__runtests__"] = True
+ return result
+ else:
+ return super().default(o)
+
+
+def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]:
+ if "__runtests__" in data:
+ data.pop('__runtests__')
+ if data['hunt_refleak']:
+ data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak'])
+ return RunTests(**data)
+ else:
+ return data
diff --git a/Lib/test/libregrtest/setup.py b/Lib/test/libregrtest/setup.py
index 48eb8b6..20ef3dc 100644
--- a/Lib/test/libregrtest/setup.py
+++ b/Lib/test/libregrtest/setup.py
@@ -11,6 +11,7 @@ try:
except ImportError:
gc = None
+from test.libregrtest.runtests import RunTests
from test.libregrtest.utils import (
setup_unraisable_hook, setup_threading_excepthook, fix_umask)
@@ -25,6 +26,18 @@ def setup_test_dir(testdir: str | None) -> None:
sys.path.insert(0, os.path.abspath(testdir))
+def setup_support(runtests: RunTests):
+ support.PGO = runtests.pgo
+ support.PGO_EXTENDED = runtests.pgo_extended
+ support.set_match_tests(runtests.match_tests, runtests.ignore_tests)
+ support.failfast = runtests.fail_fast
+ support.verbose = runtests.verbose
+ if runtests.use_junit:
+ support.junit_xml_list = []
+ else:
+ support.junit_xml_list = None
+
+
def setup_tests(runtests):
fix_umask()
diff --git a/Lib/test/libregrtest/single.py b/Lib/test/libregrtest/single.py
new file mode 100644
index 0000000..bb33387
--- /dev/null
+++ b/Lib/test/libregrtest/single.py
@@ -0,0 +1,275 @@
+import doctest
+import faulthandler
+import gc
+import importlib
+import io
+import sys
+import time
+import traceback
+import unittest
+
+from test import support
+from test.support import TestStats
+from test.support import threading_helper
+
+from test.libregrtest.result import State, TestResult
+from test.libregrtest.runtests import RunTests
+from test.libregrtest.save_env import saved_test_environment
+from test.libregrtest.setup import setup_support
+from test.libregrtest.utils import (
+ TestName,
+ clear_caches, remove_testfn, abs_module_name, print_warning)
+
+
+# Minimum duration of a test to display its duration or to mention that
+# the test is running in background
+PROGRESS_MIN_TIME = 30.0 # seconds
+
+
+def run_unittest(test_mod):
+ loader = unittest.TestLoader()
+ tests = loader.loadTestsFromModule(test_mod)
+ for error in loader.errors:
+ print(error, file=sys.stderr)
+ if loader.errors:
+ raise Exception("errors while loading tests")
+ return support.run_unittest(tests)
+
+
+def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None:
+ # Run test_func(), collect statistics, and detect reference and memory
+ # leaks.
+ if runtests.hunt_refleak:
+ from test.libregrtest.refleak import runtest_refleak
+ refleak, test_result = runtest_refleak(result.test_name, test_func,
+ runtests.hunt_refleak,
+ runtests.quiet)
+ else:
+ test_result = test_func()
+ refleak = False
+
+ if refleak:
+ result.state = State.REFLEAK
+
+ match test_result:
+ case TestStats():
+ stats = test_result
+ case unittest.TestResult():
+ stats = TestStats.from_unittest(test_result)
+ case doctest.TestResults():
+ stats = TestStats.from_doctest(test_result)
+ case None:
+ print_warning(f"{result.test_name} test runner returned None: {test_func}")
+ stats = None
+ case _:
+ print_warning(f"Unknown test result type: {type(test_result)}")
+ stats = None
+
+ result.stats = stats
+
+
+def save_env(test_name: TestName, runtests: RunTests):
+ return saved_test_environment(test_name, runtests.verbose, runtests.quiet,
+ pgo=runtests.pgo)
+
+
+# Storage of uncollectable GC objects (gc.garbage)
+GC_GARBAGE = []
+
+
+def _load_run_test(result: TestResult, runtests: RunTests) -> None:
+ # Load the test function, run the test function.
+ module_name = abs_module_name(result.test_name, runtests.test_dir)
+
+ # Remove the module from sys.module to reload it if it was already imported
+ sys.modules.pop(module_name, None)
+
+ test_mod = importlib.import_module(module_name)
+
+ if hasattr(test_mod, "test_main"):
+ # https://github.com/python/cpython/issues/89392
+ raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest")
+ def test_func():
+ return run_unittest(test_mod)
+
+ try:
+ with save_env(result.test_name, runtests):
+ regrtest_runner(result, test_func, runtests)
+ finally:
+ # First kill any dangling references to open files etc.
+ # This can also issue some ResourceWarnings which would otherwise get
+ # triggered during the following test run, and possibly produce
+ # failures.
+ support.gc_collect()
+
+ remove_testfn(result.test_name, runtests.verbose)
+
+ if gc.garbage:
+ support.environment_altered = True
+ print_warning(f"{result.test_name} created {len(gc.garbage)} "
+ f"uncollectable object(s)")
+
+ # move the uncollectable objects somewhere,
+ # so we don't see them again
+ GC_GARBAGE.extend(gc.garbage)
+ gc.garbage.clear()
+
+ support.reap_children()
+
+
+def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
+ display_failure: bool = True) -> None:
+ # Detect environment changes, handle exceptions.
+
+ # Reset the environment_altered flag to detect if a test altered
+ # the environment
+ support.environment_altered = False
+
+ pgo = runtests.pgo
+ if pgo:
+ display_failure = False
+ quiet = runtests.quiet
+
+ test_name = result.test_name
+ try:
+ clear_caches()
+ support.gc_collect()
+
+ with save_env(test_name, runtests):
+ _load_run_test(result, runtests)
+ except support.ResourceDenied as msg:
+ if not quiet and not pgo:
+ print(f"{test_name} skipped -- {msg}", flush=True)
+ result.state = State.RESOURCE_DENIED
+ return
+ except unittest.SkipTest as msg:
+ if not quiet and not pgo:
+ print(f"{test_name} skipped -- {msg}", flush=True)
+ result.state = State.SKIPPED
+ return
+ except support.TestFailedWithDetails as exc:
+ msg = f"test {test_name} failed"
+ if display_failure:
+ msg = f"{msg} -- {exc}"
+ print(msg, file=sys.stderr, flush=True)
+ result.state = State.FAILED
+ result.errors = exc.errors
+ result.failures = exc.failures
+ result.stats = exc.stats
+ return
+ except support.TestFailed as exc:
+ msg = f"test {test_name} failed"
+ if display_failure:
+ msg = f"{msg} -- {exc}"
+ print(msg, file=sys.stderr, flush=True)
+ result.state = State.FAILED
+ result.stats = exc.stats
+ return
+ except support.TestDidNotRun:
+ result.state = State.DID_NOT_RUN
+ return
+ except KeyboardInterrupt:
+ print()
+ result.state = State.INTERRUPTED
+ return
+ except:
+ if not pgo:
+ msg = traceback.format_exc()
+ print(f"test {test_name} crashed -- {msg}",
+ file=sys.stderr, flush=True)
+ result.state = State.UNCAUGHT_EXC
+ return
+
+ if support.environment_altered:
+ result.set_env_changed()
+ # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
+ if result.state is None:
+ result.state = State.PASSED
+
+
+def _runtest(result: TestResult, runtests: RunTests) -> None:
+ # Capture stdout and stderr, set faulthandler timeout,
+ # and create JUnit XML report.
+ verbose = runtests.verbose
+ output_on_failure = runtests.output_on_failure
+ timeout = runtests.timeout
+
+ use_timeout = (
+ timeout is not None and threading_helper.can_start_thread
+ )
+ if use_timeout:
+ faulthandler.dump_traceback_later(timeout, exit=True)
+
+ try:
+ setup_support(runtests)
+
+ if output_on_failure:
+ support.verbose = True
+
+ stream = io.StringIO()
+ orig_stdout = sys.stdout
+ orig_stderr = sys.stderr
+ print_warning = support.print_warning
+ orig_print_warnings_stderr = print_warning.orig_stderr
+
+ output = None
+ try:
+ sys.stdout = stream
+ sys.stderr = stream
+ # print_warning() writes into the temporary stream to preserve
+ # messages order. If support.environment_altered becomes true,
+ # warnings will be written to sys.stderr below.
+ print_warning.orig_stderr = stream
+
+ _runtest_env_changed_exc(result, runtests, display_failure=False)
+ # Ignore output if the test passed successfully
+ if result.state != State.PASSED:
+ output = stream.getvalue()
+ finally:
+ sys.stdout = orig_stdout
+ sys.stderr = orig_stderr
+ print_warning.orig_stderr = orig_print_warnings_stderr
+
+ if output is not None:
+ sys.stderr.write(output)
+ sys.stderr.flush()
+ else:
+ # Tell tests to be moderately quiet
+ support.verbose = verbose
+ _runtest_env_changed_exc(result, runtests,
+ display_failure=not verbose)
+
+ xml_list = support.junit_xml_list
+ if xml_list:
+ import xml.etree.ElementTree as ET
+ result.xml_data = [ET.tostring(x).decode('us-ascii')
+ for x in xml_list]
+ finally:
+ if use_timeout:
+ faulthandler.cancel_dump_traceback_later()
+ support.junit_xml_list = None
+
+
+def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult:
+ """Run a single test.
+
+ test_name -- the name of the test
+
+ Returns a TestResult.
+
+ If runtests.use_junit, xml_data is a list containing each generated
+ testsuite element.
+ """
+ start_time = time.perf_counter()
+ result = TestResult(test_name)
+ pgo = runtests.pgo
+ try:
+ _runtest(result, runtests)
+ except:
+ if not pgo:
+ msg = traceback.format_exc()
+ print(f"test {test_name} crashed -- {msg}",
+ file=sys.stderr, flush=True)
+ result.state = State.UNCAUGHT_EXC
+ result.duration = time.perf_counter() - start_time
+ return result
diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py
index e77772c..011d287 100644
--- a/Lib/test/libregrtest/utils.py
+++ b/Lib/test/libregrtest/utils.py
@@ -21,7 +21,16 @@ MS_WINDOWS = (sys.platform == 'win32')
EXIT_TIMEOUT = 120.0
+# Types for types hints
StrPath = str
+TestName = str
+StrJSON = str
+TestTuple = tuple[TestName, ...]
+TestList = list[TestName]
+# --match and --ignore options: list of patterns
+# ('*' joker character can be used)
+FilterTuple = tuple[TestName, ...]
+FilterDict = dict[TestName, FilterTuple]
def format_duration(seconds):
@@ -389,3 +398,76 @@ def exit_timeout():
if threading_helper.can_start_thread:
faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
sys.exit(exc.code)
+
+
+def remove_testfn(test_name: TestName, verbose: int) -> None:
+ # Try to clean up os_helper.TESTFN if left behind.
+ #
+ # While tests shouldn't leave any files or directories behind, when a test
+ # fails that can be tedious for it to arrange. The consequences can be
+ # especially nasty on Windows, since if a test leaves a file open, it
+ # cannot be deleted by name (while there's nothing we can do about that
+ # here either, we can display the name of the offending test, which is a
+ # real help).
+ name = os_helper.TESTFN
+ if not os.path.exists(name):
+ return
+
+ if os.path.isdir(name):
+ import shutil
+ kind, nuker = "directory", shutil.rmtree
+ elif os.path.isfile(name):
+ kind, nuker = "file", os.unlink
+ else:
+ raise RuntimeError(f"os.path says {name!r} exists but is neither "
+ f"directory nor file")
+
+ if verbose:
+ print_warning(f"{test_name} left behind {kind} {name!r}")
+ support.environment_altered = True
+
+ try:
+ import stat
+ # fix possible permissions problems that might prevent cleanup
+ os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
+ nuker(name)
+ except Exception as exc:
+ print_warning(f"{test_name} left behind {kind} {name!r} "
+ f"and it couldn't be removed: {exc}")
+
+
+def abs_module_name(test_name: TestName, test_dir: StrPath | None) -> TestName:
+ if test_name.startswith('test.') or test_dir:
+ return test_name
+ else:
+ # Import it from the test package
+ return 'test.' + test_name
+
+
+# gh-90681: When rerunning tests, we might need to rerun the whole
+# class or module suite if some its life-cycle hooks fail.
+# Test level hooks are not affected.
+_TEST_LIFECYCLE_HOOKS = frozenset((
+ 'setUpClass', 'tearDownClass',
+ 'setUpModule', 'tearDownModule',
+))
+
+def normalize_test_name(test_full_name, *, is_error=False):
+ short_name = test_full_name.split(" ")[0]
+ if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
+ if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
+ # if setUpModule() or tearDownModule() failed, don't filter
+ # tests with the test file name, don't use use filters.
+ return None
+
+ # This means that we have a failure in a life-cycle hook,
+ # we need to rerun the whole module or class suite.
+ # Basically the error looks like this:
+ # ERROR: setUpClass (test.test_reg_ex.RegTest)
+ # or
+ # ERROR: setUpModule (test.test_reg_ex)
+ # So, we need to parse the class / module name.
+ lpar = test_full_name.index('(')
+ rpar = test_full_name.index(')')
+ return test_full_name[lpar + 1: rpar].split('.')[-1]
+ return short_name
diff --git a/Lib/test/libregrtest/worker.py b/Lib/test/libregrtest/worker.py
index 033a0a3..24251c3 100644
--- a/Lib/test/libregrtest/worker.py
+++ b/Lib/test/libregrtest/worker.py
@@ -7,9 +7,11 @@ from test import support
from test.support import os_helper
from test.libregrtest.setup import setup_tests, setup_test_dir
-from test.libregrtest.runtest import (
- run_single_test, StrJSON, FilterTuple, RunTests)
-from test.libregrtest.utils import get_work_dir, exit_timeout, StrPath
+from test.libregrtest.runtests import RunTests
+from test.libregrtest.single import run_single_test
+from test.libregrtest.utils import (
+ StrPath, StrJSON, FilterTuple,
+ get_work_dir, exit_timeout)
USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py
index a5ee4c2..21babb5 100644
--- a/Lib/test/test_regrtest.py
+++ b/Lib/test/test_regrtest.py
@@ -22,7 +22,7 @@ from test import libregrtest
from test import support
from test.support import os_helper, TestStats
from test.libregrtest import utils, setup
-from test.libregrtest.runtest import normalize_test_name
+from test.libregrtest.utils import normalize_test_name
if not support.has_subprocess_support:
raise unittest.SkipTest("test module requires subprocess")