summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-09-02 16:09:36 (GMT)
committerGitHub <noreply@github.com>2023-09-02 16:09:36 (GMT)
commitd4e534cbb35678c82b3a1276826af55d7bfc23b6 (patch)
tree6d161a616e91406d600e9e8ef1120705f47e7da4 /Lib
parente7de0c5901b85a5241386a33f98c27a4e08d5384 (diff)
downloadcpython-d4e534cbb35678c82b3a1276826af55d7bfc23b6.zip
cpython-d4e534cbb35678c82b3a1276826af55d7bfc23b6.tar.gz
cpython-d4e534cbb35678c82b3a1276826af55d7bfc23b6.tar.bz2
regrtest computes statistics (#108793)
test_netrc, test_pep646_syntax and test_xml_etree now return results in the test_main() function. Changes: * Rewrite TestResult as a dataclass with a new State class. * Add test.support.TestStats class and Regrtest.stats_dict attribute. * libregrtest.runtest functions now modify a TestResult instance in-place. * libregrtest summary lists the number of run tests and skipped tests, and denied resources. * Add TestResult.has_meaningful_duration() method. * Compute TestResult duration in the upper function. * Use time.perf_counter() instead of time.monotonic(). * Regrtest: rename 'resource_denieds' attribute to 'resource_denied'. * Rename CHILD_ERROR to MULTIPROCESSING_ERROR. * Use match/case syntadx to have different code depending on the test state. Co-authored-by: Alex Waygood <Alex.Waygood@Gmail.com>
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/libregrtest/main.py126
-rw-r--r--Lib/test/libregrtest/refleak.py5
-rw-r--r--Lib/test/libregrtest/runtest.py324
-rw-r--r--Lib/test/libregrtest/runtest_mp.py84
-rw-r--r--Lib/test/libregrtest/save_env.py8
-rw-r--r--Lib/test/support/__init__.py61
-rw-r--r--Lib/test/test_netrc.py2
-rw-r--r--Lib/test/test_pep646_syntax.py2
-rw-r--r--Lib/test/test_regrtest.py206
-rw-r--r--Lib/test/test_xml_etree.py2
10 files changed, 512 insertions, 308 deletions
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index 3d290c8..6e6423e 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -11,15 +11,14 @@ import time
import unittest
from test.libregrtest.cmdline import _parse_args
from test.libregrtest.runtest import (
- findtests, split_test_packages, runtest, get_abs_module, is_failed,
- PROGRESS_MIN_TIME,
- Passed, Failed, EnvChanged, Skipped, ResourceDenied, Interrupted,
- ChildError, DidNotRun)
+ findtests, split_test_packages, runtest, get_abs_module,
+ PROGRESS_MIN_TIME, State)
from test.libregrtest.setup import setup_tests
from test.libregrtest.pgo import setup_pgo_tests
from test.libregrtest.utils import (removepy, count, format_duration,
printlist, get_build_info)
from test import support
+from test.support import TestStats
from test.support import os_helper
from test.support import threading_helper
@@ -78,13 +77,14 @@ class Regrtest:
self.good = []
self.bad = []
self.skipped = []
- self.resource_denieds = []
+ self.resource_denied = []
self.environment_changed = []
self.run_no_tests = []
self.need_rerun = []
self.rerun = []
self.first_result = None
self.interrupted = False
+ self.stats_dict: dict[str, TestStats] = {}
# used by --slow
self.test_times = []
@@ -93,7 +93,7 @@ class Regrtest:
self.tracer = None
# used to display the progress bar "[ 3/100]"
- self.start_time = time.monotonic()
+ self.start_time = time.perf_counter()
self.test_count = ''
self.test_count_width = 1
@@ -111,36 +111,41 @@ class Regrtest:
def get_executed(self):
return (set(self.good) | set(self.bad) | set(self.skipped)
- | set(self.resource_denieds) | set(self.environment_changed)
+ | set(self.resource_denied) | set(self.environment_changed)
| set(self.run_no_tests))
def accumulate_result(self, result, rerun=False):
- test_name = result.name
-
- if not isinstance(result, (ChildError, Interrupted)) and not rerun:
- self.test_times.append((result.duration_sec, test_name))
-
- if isinstance(result, Passed):
- self.good.append(test_name)
- elif isinstance(result, ResourceDenied):
- self.skipped.append(test_name)
- self.resource_denieds.append(test_name)
- elif isinstance(result, Skipped):
- self.skipped.append(test_name)
- elif isinstance(result, EnvChanged):
- self.environment_changed.append(test_name)
- elif isinstance(result, Failed):
- if not rerun:
- self.bad.append(test_name)
- self.need_rerun.append(result)
- elif isinstance(result, DidNotRun):
- self.run_no_tests.append(test_name)
- elif isinstance(result, Interrupted):
- self.interrupted = True
- else:
- raise ValueError("invalid test result: %r" % result)
+ test_name = result.test_name
+
+ if result.has_meaningful_duration() and not rerun:
+ self.test_times.append((result.duration, test_name))
- if rerun and not isinstance(result, (Failed, Interrupted)):
+ match result.state:
+ case State.PASSED:
+ self.good.append(test_name)
+ case State.ENV_CHANGED:
+ self.environment_changed.append(test_name)
+ case State.SKIPPED:
+ self.skipped.append(test_name)
+ case State.RESOURCE_DENIED:
+ self.skipped.append(test_name)
+ self.resource_denied.append(test_name)
+ case State.INTERRUPTED:
+ self.interrupted = True
+ case State.DID_NOT_RUN:
+ self.run_no_tests.append(test_name)
+ case _:
+ if result.is_failed(self.ns.fail_env_changed):
+ if not rerun:
+ self.bad.append(test_name)
+ self.need_rerun.append(result)
+ else:
+ raise ValueError(f"invalid test state: {state!r}")
+
+ if result.stats is not None:
+ self.stats_dict[result.test_name] = result.stats
+
+ if rerun and not(result.is_failed(False) or result.state == State.INTERRUPTED):
self.bad.remove(test_name)
xml_data = result.xml_data
@@ -162,7 +167,7 @@ class Regrtest:
line = f"load avg: {load_avg:.2f} {line}"
# add the timestamp prefix: "0:01:05 "
- test_time = time.monotonic() - self.start_time
+ test_time = time.perf_counter() - self.start_time
mins, secs = divmod(int(test_time), 60)
hours, mins = divmod(mins, 60)
@@ -337,7 +342,7 @@ class Regrtest:
rerun_list = list(self.need_rerun)
self.need_rerun.clear()
for result in rerun_list:
- test_name = result.name
+ test_name = result.test_name
self.rerun.append(test_name)
errors = result.errors or []
@@ -364,7 +369,7 @@ class Regrtest:
self.accumulate_result(result, rerun=True)
- if isinstance(result, Interrupted):
+ if result.state == State.INTERRUPTED:
break
if self.bad:
@@ -461,7 +466,7 @@ class Regrtest:
previous_test = None
for test_index, test_name in enumerate(self.tests, 1):
- start_time = time.monotonic()
+ start_time = time.perf_counter()
text = test_name
if previous_test:
@@ -480,14 +485,14 @@ class Regrtest:
result = runtest(self.ns, test_name)
self.accumulate_result(result)
- if isinstance(result, Interrupted):
+ if result.state == State.INTERRUPTED:
break
previous_test = str(result)
- test_time = time.monotonic() - start_time
+ test_time = time.perf_counter() - start_time
if test_time >= PROGRESS_MIN_TIME:
previous_test = "%s in %s" % (previous_test, format_duration(test_time))
- elif isinstance(result, Passed):
+ elif result.state == State.PASSED:
# be quiet: say nothing if the test passed shortly
previous_test = None
@@ -496,7 +501,7 @@ class Regrtest:
if module not in save_modules and module.startswith("test."):
support.unload(module)
- if self.ns.failfast and is_failed(result, self.ns):
+ if self.ns.failfast and result.is_failed(self.ns.fail_env_changed):
break
if previous_test:
@@ -638,13 +643,48 @@ class Regrtest:
coverdir=self.ns.coverdir)
print()
- duration = time.monotonic() - self.start_time
- print("Total duration: %s" % format_duration(duration))
- print("Tests result: %s" % self.get_tests_result())
+ self.display_summary()
if self.ns.runleaks:
os.system("leaks %d" % os.getpid())
+ def display_summary(self):
+ duration = time.perf_counter() - self.start_time
+
+ # Total duration
+ print("Total duration: %s" % format_duration(duration))
+
+ # Total tests
+ total = TestStats()
+ for stats in self.stats_dict.values():
+ total.accumulate(stats)
+ stats = [f'run={total.tests_run:,}']
+ if total.failures:
+ stats.append(f'failures={total.failures:,}')
+ if total.skipped:
+ stats.append(f'skipped={total.skipped:,}')
+ print(f"Total tests: {' '.join(stats)}")
+
+ # Total test files
+ report = [f'success={len(self.good)}']
+ if self.bad:
+ report.append(f'failed={len(self.bad)}')
+ if self.environment_changed:
+ report.append(f'env_changed={len(self.environment_changed)}')
+ if self.skipped:
+ report.append(f'skipped={len(self.skipped)}')
+ if self.resource_denied:
+ report.append(f'resource_denied={len(self.resource_denied)}')
+ if self.rerun:
+ report.append(f'rerun={len(self.rerun)}')
+ if self.run_no_tests:
+ report.append(f'run_no_tests={len(self.run_no_tests)}')
+ print(f"Total test files: {' '.join(report)}")
+
+ # Result
+ result = self.get_tests_result()
+ print(f"Result: {result}")
+
def save_xml_result(self):
if not self.ns.xmlpath and not self.testsuite_xml:
return
diff --git a/Lib/test/libregrtest/refleak.py b/Lib/test/libregrtest/refleak.py
index cd11d38..206802b 100644
--- a/Lib/test/libregrtest/refleak.py
+++ b/Lib/test/libregrtest/refleak.py
@@ -83,11 +83,12 @@ def dash_R(ns, test_name, test_func):
print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr,
flush=True)
+ results = None
dash_R_cleanup(fs, ps, pic, zdc, abcs)
support.gc_collect()
for i in rep_range:
- test_func()
+ results = test_func()
dash_R_cleanup(fs, ps, pic, zdc, abcs)
support.gc_collect()
@@ -151,7 +152,7 @@ def dash_R(ns, test_name, test_func):
print(msg, file=refrep)
refrep.flush()
failed = True
- return failed
+ return (failed, results)
def dash_R_cleanup(fs, ps, pic, zdc, abcs):
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
index fd49927..6fa6069 100644
--- a/Lib/test/libregrtest/runtest.py
+++ b/Lib/test/libregrtest/runtest.py
@@ -1,3 +1,5 @@
+import dataclasses
+import doctest
import faulthandler
import functools
import gc
@@ -10,6 +12,7 @@ import traceback
import unittest
from test import support
+from test.support import TestStats
from test.support import os_helper
from test.support import threading_helper
from test.libregrtest.cmdline import Namespace
@@ -17,108 +20,114 @@ from test.libregrtest.save_env import saved_test_environment
from test.libregrtest.utils import clear_caches, format_duration, print_warning
-class TestResult:
- def __init__(
- self,
- name: str,
- duration_sec: float = 0.0,
- xml_data: list[str] | None = None,
- ) -> None:
- self.name = name
- self.duration_sec = duration_sec
- self.xml_data = xml_data
-
- def __str__(self) -> str:
- return f"{self.name} finished"
-
+# Avoid enum.Enum to reduce the number of imports when tests are run
+class State:
+ PASSED = "PASSED"
+ FAILED = "FAILED"
+ SKIPPED = "SKIPPED"
+ UNCAUGHT_EXC = "UNCAUGHT_EXC"
+ REFLEAK = "REFLEAK"
+ ENV_CHANGED = "ENV_CHANGED"
+ RESOURCE_DENIED = "RESOURCE_DENIED"
+ INTERRUPTED = "INTERRUPTED"
+ MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
+ DID_NOT_RUN = "DID_NOT_RUN"
+ TIMEOUT = "TIMEOUT"
-class Passed(TestResult):
- def __str__(self) -> str:
- return f"{self.name} passed"
-
-
-class Failed(TestResult):
- def __init__(
- self,
- name: str,
- duration_sec: float = 0.0,
- xml_data: list[str] | None = None,
- errors: list[tuple[str, str]] | None = None,
- failures: list[tuple[str, str]] | None = None,
- ) -> None:
- super().__init__(name, duration_sec=duration_sec, xml_data=xml_data)
- self.errors = errors
- self.failures = failures
+ @staticmethod
+ def is_failed(state):
+ return state in {
+ State.FAILED,
+ State.UNCAUGHT_EXC,
+ State.REFLEAK,
+ State.MULTIPROCESSING_ERROR,
+ State.TIMEOUT}
- def __str__(self) -> str:
+ @staticmethod
+ def has_meaningful_duration(state):
+ # Consider that the duration is meaningless for these cases.
+ # For example, if a whole test file is skipped, its duration
+ # is unlikely to be the duration of executing its tests,
+ # but just the duration to execute code which skips the test.
+ return state not in {
+ State.SKIPPED,
+ State.RESOURCE_DENIED,
+ State.INTERRUPTED,
+ State.MULTIPROCESSING_ERROR,
+ State.DID_NOT_RUN}
+
+
+@dataclasses.dataclass(slots=True)
+class TestResult:
+ test_name: str
+ state: str | None = None
+ # Test duration in seconds
+ duration: float | None = None
+ xml_data: list[str] | None = None
+ stats: TestStats | None = None
+
+ # errors and failures copied from support.TestFailedWithDetails
+ errors: list[tuple[str, str]] | None = None
+ failures: list[tuple[str, str]] | None = None
+
+ def is_failed(self, fail_env_changed: bool) -> bool:
+ if self.state == State.ENV_CHANGED:
+ return fail_env_changed
+ return State.is_failed(self.state)
+
+ def _format_failed(self):
if self.errors and self.failures:
le = len(self.errors)
lf = len(self.failures)
error_s = "error" + ("s" if le > 1 else "")
failure_s = "failure" + ("s" if lf > 1 else "")
- return f"{self.name} failed ({le} {error_s}, {lf} {failure_s})"
+ return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
if self.errors:
le = len(self.errors)
error_s = "error" + ("s" if le > 1 else "")
- return f"{self.name} failed ({le} {error_s})"
+ return f"{self.test_name} failed ({le} {error_s})"
if self.failures:
lf = len(self.failures)
failure_s = "failure" + ("s" if lf > 1 else "")
- return f"{self.name} failed ({lf} {failure_s})"
-
- return f"{self.name} failed"
+ return f"{self.test_name} failed ({lf} {failure_s})"
+ return f"{self.test_name} failed"
-class UncaughtException(Failed):
def __str__(self) -> str:
- return f"{self.name} failed (uncaught exception)"
-
-
-class EnvChanged(Failed):
- def __str__(self) -> str:
- return f"{self.name} failed (env changed)"
-
- # Convert Passed to EnvChanged
- @staticmethod
- def from_passed(other):
- return EnvChanged(other.name, other.duration_sec, other.xml_data)
-
-
-class RefLeak(Failed):
- def __str__(self) -> str:
- return f"{self.name} failed (reference leak)"
-
-
-class Skipped(TestResult):
- def __str__(self) -> str:
- return f"{self.name} skipped"
-
-
-class ResourceDenied(Skipped):
- def __str__(self) -> str:
- return f"{self.name} skipped (resource denied)"
-
-
-class Interrupted(TestResult):
- def __str__(self) -> str:
- return f"{self.name} interrupted"
-
-
-class ChildError(Failed):
- def __str__(self) -> str:
- return f"{self.name} crashed"
-
-
-class DidNotRun(TestResult):
- def __str__(self) -> str:
- return f"{self.name} ran no tests"
-
-
-class Timeout(Failed):
- def __str__(self) -> str:
- return f"{self.name} timed out ({format_duration(self.duration_sec)})"
+ match self.state:
+ case State.PASSED:
+ return f"{self.test_name} passed"
+ case State.FAILED:
+ return self._format_failed()
+ case State.SKIPPED:
+ return f"{self.test_name} skipped"
+ case State.UNCAUGHT_EXC:
+ return f"{self.test_name} failed (uncaught exception)"
+ case State.REFLEAK:
+ return f"{self.test_name} failed (reference leak)"
+ case State.ENV_CHANGED:
+ return f"{self.test_name} failed (env changed)"
+ case State.RESOURCE_DENIED:
+ return f"{self.test_name} skipped (resource denied)"
+ case State.INTERRUPTED:
+ return f"{self.test_name} interrupted"
+ case State.MULTIPROCESSING_ERROR:
+ return f"{self.test_name} process crashed"
+ case State.DID_NOT_RUN:
+ return f"{self.test_name} ran no tests"
+ case State.TIMEOUT:
+ return f"{self.test_name} timed out ({format_duration(self.duration)})"
+ case _:
+ raise ValueError("unknown result state: {state!r}")
+
+ def has_meaningful_duration(self):
+ return State.has_meaningful_duration(self.state)
+
+ def set_env_changed(self):
+ if self.state is None or self.state == State.PASSED:
+ self.state = State.ENV_CHANGED
# Minimum duration of a test to display its duration or to mention that
@@ -142,12 +151,6 @@ SPLITTESTDIRS = {
FOUND_GARBAGE = []
-def is_failed(result: TestResult, ns: Namespace) -> bool:
- if isinstance(result, EnvChanged):
- return ns.fail_env_changed
- return isinstance(result, Failed)
-
-
def findtestdir(path=None):
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
@@ -194,9 +197,9 @@ def get_abs_module(ns: Namespace, test_name: str) -> str:
return 'test.' + test_name
-def _runtest(ns: Namespace, test_name: str) -> TestResult:
- # Handle faulthandler timeout, capture stdout+stderr, XML serialization
- # and measure time.
+def _runtest_capture_output_timeout_junit(result: TestResult, ns: Namespace) -> None:
+ # Capture stdout and stderr, set faulthandler timeout,
+ # and create JUnit XML report.
output_on_failure = ns.verbose3
@@ -206,7 +209,6 @@ def _runtest(ns: Namespace, test_name: str) -> TestResult:
if use_timeout:
faulthandler.dump_traceback_later(ns.timeout, exit=True)
- start_time = time.perf_counter()
try:
support.set_match_tests(ns.match_tests, ns.ignore_tests)
support.junit_xml_list = xml_list = [] if ns.xmlpath else None
@@ -231,9 +233,9 @@ def _runtest(ns: Namespace, test_name: str) -> TestResult:
# warnings will be written to sys.stderr below.
print_warning.orig_stderr = stream
- result = _runtest_inner(ns, test_name,
- display_failure=False)
- if not isinstance(result, Passed):
+ _runtest_env_changed_exc(result, ns, display_failure=False)
+ # Ignore output if the test passed successfully
+ if result.state != State.PASSED:
output = stream.getvalue()
finally:
sys.stdout = orig_stdout
@@ -247,18 +249,13 @@ def _runtest(ns: Namespace, test_name: str) -> TestResult:
# Tell tests to be moderately quiet
support.verbose = ns.verbose
- result = _runtest_inner(ns, test_name,
- display_failure=not ns.verbose)
+ _runtest_env_changed_exc(result, ns,
+ display_failure=not ns.verbose)
if xml_list:
import xml.etree.ElementTree as ET
- result.xml_data = [
- ET.tostring(x).decode('us-ascii')
- for x in xml_list
- ]
-
- result.duration_sec = time.perf_counter() - start_time
- return result
+ result.xml_data = [ET.tostring(x).decode('us-ascii')
+ for x in xml_list]
finally:
if use_timeout:
faulthandler.cancel_dump_traceback_later()
@@ -271,19 +268,23 @@ def runtest(ns: Namespace, test_name: str) -> TestResult:
ns -- regrtest namespace of options
test_name -- the name of the test
- Returns a TestResult sub-class depending on the kind of result received.
+ Returns a TestResult.
If ns.xmlpath is not None, xml_data is a list containing each
generated testsuite element.
"""
+ start_time = time.perf_counter()
+ result = TestResult(test_name)
try:
- return _runtest(ns, test_name)
+ _runtest_capture_output_timeout_junit(result, ns)
except:
if not ns.pgo:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
- return Failed(test_name)
+ result.state = State.UNCAUGHT_EXC
+ result.duration = time.perf_counter() - start_time
+ return result
def _test_module(the_module):
@@ -293,18 +294,48 @@ def _test_module(the_module):
print(error, file=sys.stderr)
if loader.errors:
raise Exception("errors while loading tests")
- support.run_unittest(tests)
+ return support.run_unittest(tests)
def save_env(ns: Namespace, test_name: str):
return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)
-def _runtest_inner2(ns: Namespace, test_name: str) -> bool:
- # Load the test function, run the test function, handle huntrleaks
- # to detect leaks.
+def regrtest_runner(result, test_func, ns) -> None:
+ # Run test_func(), collect statistics, and detect reference and memory
+ # leaks.
+
+ if ns.huntrleaks:
+ from test.libregrtest.refleak import dash_R
+ refleak, test_result = dash_R(ns, result.test_name, test_func)
+ else:
+ test_result = test_func()
+ refleak = False
+
+ if refleak:
+ result.state = State.REFLEAK
+
+ match test_result:
+ case TestStats():
+ stats = test_result
+ case unittest.TestResult():
+ stats = TestStats.from_unittest(test_result)
+ case doctest.TestResults():
+ stats = TestStats.from_doctest(test_result)
+ case None:
+ print_warning(f"{result.test_name} test runner returned None: {test_func}")
+ stats = None
+ case _:
+ print_warning(f"Unknown test result type: {type(test_result)}")
+ stats = None
+
+ result.stats = stats
+
- abstest = get_abs_module(ns, test_name)
+def _load_run_test(result: TestResult, ns: Namespace) -> None:
+ # Load the test function, run the test function.
+
+ abstest = get_abs_module(ns, result.test_name)
# remove the module from sys.module to reload it if it was already imported
try:
@@ -314,23 +345,15 @@ def _runtest_inner2(ns: Namespace, test_name: str) -> bool:
the_module = importlib.import_module(abstest)
- if ns.huntrleaks:
- from test.libregrtest.refleak import dash_R
-
# If the test has a test_main, that will run the appropriate
# tests. If not, use normal unittest test loading.
- test_runner = getattr(the_module, "test_main", None)
- if test_runner is None:
- test_runner = functools.partial(_test_module, the_module)
+ test_func = getattr(the_module, "test_main", None)
+ if test_func is None:
+ test_func = functools.partial(_test_module, the_module)
try:
- with save_env(ns, test_name):
- if ns.huntrleaks:
- # Return True if the test leaked references
- refleak = dash_R(ns, test_name, test_runner)
- else:
- test_runner()
- refleak = False
+ with save_env(ns, result.test_name):
+ regrtest_runner(result, test_func, ns)
finally:
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
@@ -338,11 +361,11 @@ def _runtest_inner2(ns: Namespace, test_name: str) -> bool:
# failures.
support.gc_collect()
- cleanup_test_droppings(test_name, ns.verbose)
+ cleanup_test_droppings(result.test_name, ns.verbose)
if gc.garbage:
support.environment_altered = True
- print_warning(f"{test_name} created {len(gc.garbage)} "
+ print_warning(f"{result.test_name} created {len(gc.garbage)} "
f"uncollectable object(s).")
# move the uncollectable objects somewhere,
@@ -352,12 +375,9 @@ def _runtest_inner2(ns: Namespace, test_name: str) -> bool:
support.reap_children()
- return refleak
-
-def _runtest_inner(
- ns: Namespace, test_name: str, display_failure: bool = True
-) -> TestResult:
+def _runtest_env_changed_exc(result: TestResult, ns: Namespace,
+ display_failure: bool = True) -> None:
# Detect environment changes, handle exceptions.
# Reset the environment_altered flag to detect if a test altered
@@ -367,49 +387,61 @@ def _runtest_inner(
if ns.pgo:
display_failure = False
+ test_name = result.test_name
try:
clear_caches()
support.gc_collect()
with save_env(ns, test_name):
- refleak = _runtest_inner2(ns, test_name)
+ _load_run_test(result, ns)
except support.ResourceDenied as msg:
if not ns.quiet and not ns.pgo:
print(f"{test_name} skipped -- {msg}", flush=True)
- return ResourceDenied(test_name)
+ result.state = State.RESOURCE_DENIED
+ return
except unittest.SkipTest as msg:
if not ns.quiet and not ns.pgo:
print(f"{test_name} skipped -- {msg}", flush=True)
- return Skipped(test_name)
+ result.state = State.SKIPPED
+ return
except support.TestFailedWithDetails as exc:
msg = f"test {test_name} failed"
if display_failure:
msg = f"{msg} -- {exc}"
print(msg, file=sys.stderr, flush=True)
- return Failed(test_name, errors=exc.errors, failures=exc.failures)
+ result.state = State.FAILED
+ result.errors = exc.errors
+ result.failures = exc.failures
+ result.stats = exc.stats
+ return
except support.TestFailed as exc:
msg = f"test {test_name} failed"
if display_failure:
msg = f"{msg} -- {exc}"
print(msg, file=sys.stderr, flush=True)
- return Failed(test_name)
+ result.state = State.FAILED
+ result.stats = exc.stats
+ return
except support.TestDidNotRun:
- return DidNotRun(test_name)
+ result.state = State.DID_NOT_RUN
+ return
except KeyboardInterrupt:
print()
- return Interrupted(test_name)
+ result.state = State.INTERRUPTED
+ return
except:
if not ns.pgo:
msg = traceback.format_exc()
print(f"test {test_name} crashed -- {msg}",
file=sys.stderr, flush=True)
- return UncaughtException(test_name)
+ result.state = State.UNCAUGHT_EXC
+ return
- if refleak:
- return RefLeak(test_name)
if support.environment_altered:
- return EnvChanged(test_name)
- return Passed(test_name)
+ result.set_env_changed()
+ # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
+ if result.state is None:
+ result.state = State.PASSED
def cleanup_test_droppings(test_name: str, verbose: int) -> None:
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
index 62e6c6d..fb1f80b 100644
--- a/Lib/test/libregrtest/runtest_mp.py
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -1,3 +1,4 @@
+import dataclasses
import faulthandler
import json
import os.path
@@ -13,12 +14,13 @@ from typing import NamedTuple, NoReturn, Literal, Any, TextIO
from test import support
from test.support import os_helper
+from test.support import TestStats
from test.libregrtest.cmdline import Namespace
from test.libregrtest.main import Regrtest
from test.libregrtest.runtest import (
- runtest, is_failed, TestResult, Interrupted, Timeout, ChildError,
- PROGRESS_MIN_TIME, Passed, EnvChanged)
+ runtest, TestResult, State,
+ PROGRESS_MIN_TIME)
from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import format_duration, print_warning
@@ -43,9 +45,9 @@ USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
def must_stop(result: TestResult, ns: Namespace) -> bool:
- if isinstance(result, Interrupted):
+ if result.state == State.INTERRUPTED:
return True
- if ns.failfast and is_failed(result, ns):
+ if ns.failfast and result.is_failed(ns.fail_env_changed):
return True
return False
@@ -130,8 +132,8 @@ class MultiprocessIterator:
class MultiprocessResult(NamedTuple):
result: TestResult
# bpo-45410: stderr is written into stdout to keep messages order
- stdout: str
- error_msg: str
+ worker_stdout: str | None = None
+ err_msg: str | None = None
ExcStr = str
@@ -209,15 +211,12 @@ class TestWorkerProcess(threading.Thread):
def mp_result_error(
self,
test_result: TestResult,
- stdout: str = '',
+ stdout: str | None = None,
err_msg=None
) -> MultiprocessResult:
- test_result.duration_sec = time.monotonic() - self.start_time
return MultiprocessResult(test_result, stdout, err_msg)
def _run_process(self, test_name: str, tmp_dir: str, stdout_fh: TextIO) -> int:
- self.start_time = time.monotonic()
-
self.current_test_name = test_name
try:
popen = run_test_in_subprocess(test_name, self.ns, tmp_dir, stdout_fh)
@@ -306,38 +305,41 @@ class TestWorkerProcess(threading.Thread):
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Cannot read process stdout: {exc}"
- return self.mp_result_error(ChildError(test_name), '', err_msg)
+ result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
+ return self.mp_result_error(result, err_msg=err_msg)
if retcode is None:
- return self.mp_result_error(Timeout(test_name), stdout)
+ result = TestResult(test_name, state=State.TIMEOUT)
+ return self.mp_result_error(result, stdout)
err_msg = None
if retcode != 0:
err_msg = "Exit code %s" % retcode
else:
- stdout, _, result = stdout.rpartition("\n")
+ stdout, _, worker_json = stdout.rpartition("\n")
stdout = stdout.rstrip()
- if not result:
+ if not worker_json:
err_msg = "Failed to parse worker stdout"
else:
try:
# deserialize run_tests_worker() output
- result = json.loads(result, object_hook=decode_test_result)
+ result = json.loads(worker_json,
+ object_hook=decode_test_result)
except Exception as exc:
err_msg = "Failed to parse worker JSON: %s" % exc
- if err_msg is not None:
- return self.mp_result_error(ChildError(test_name), stdout, err_msg)
+ if err_msg:
+ result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
+ return self.mp_result_error(result, stdout, err_msg)
if tmp_files:
msg = (f'\n\n'
f'Warning -- {test_name} leaked temporary files '
f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}')
stdout += msg
- if isinstance(result, Passed):
- result = EnvChanged.from_passed(result)
+ result.set_env_changed()
- return MultiprocessResult(result, stdout, err_msg)
+ return MultiprocessResult(result, stdout)
def run(self) -> None:
while not self._stopped:
@@ -347,7 +349,9 @@ class TestWorkerProcess(threading.Thread):
except StopIteration:
break
+ self.start_time = time.monotonic()
mp_result = self._runtest(test_name)
+ mp_result.result.duration = time.monotonic() - self.start_time
self.output.put((False, mp_result))
if must_stop(mp_result.result, self.ns):
@@ -473,11 +477,11 @@ class MultiprocessTestRunner:
result = mp_result.result
text = str(result)
- if mp_result.error_msg is not None:
- # CHILD_ERROR
- text += ' (%s)' % mp_result.error_msg
- elif (result.duration_sec >= PROGRESS_MIN_TIME and not self.ns.pgo):
- text += ' (%s)' % format_duration(result.duration_sec)
+ if mp_result.err_msg:
+ # MULTIPROCESSING_ERROR
+ text += ' (%s)' % mp_result.err_msg
+ elif (result.duration >= PROGRESS_MIN_TIME and not self.ns.pgo):
+ text += ' (%s)' % format_duration(result.duration)
running = get_running(self.workers)
if running and not self.ns.pgo:
text += ' -- running: %s' % ', '.join(running)
@@ -489,7 +493,7 @@ class MultiprocessTestRunner:
# Thread got an exception
format_exc = item[1]
print_warning(f"regrtest worker thread failed: {format_exc}")
- result = ChildError("<regrtest worker>")
+ result = TestResult("<regrtest worker>", state=State.MULTIPROCESSING_ERROR)
self.regrtest.accumulate_result(result)
return True
@@ -498,8 +502,8 @@ class MultiprocessTestRunner:
self.regrtest.accumulate_result(mp_result.result)
self.display_result(mp_result)
- if mp_result.stdout:
- print(mp_result.stdout, flush=True)
+ if mp_result.worker_stdout:
+ print(mp_result.worker_stdout, flush=True)
if must_stop(mp_result.result, self.ns):
return True
@@ -541,32 +545,20 @@ class EncodeTestResult(json.JSONEncoder):
def default(self, o: Any) -> dict[str, Any]:
if isinstance(o, TestResult):
- result = vars(o)
+ result = dataclasses.asdict(o)
result["__test_result__"] = o.__class__.__name__
return result
return super().default(o)
-def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]:
+def decode_test_result(d: dict[str, Any]) -> TestResult | TestStats | dict[str, Any]:
"""Decode a TestResult (sub)class object from a JSON dict."""
if "__test_result__" not in d:
return d
- cls_name = d.pop("__test_result__")
- for cls in get_all_test_result_classes():
- if cls.__name__ == cls_name:
- return cls(**d)
-
-
-def get_all_test_result_classes() -> set[type[TestResult]]:
- prev_count = 0
- classes = {TestResult}
- while len(classes) > prev_count:
- prev_count = len(classes)
- to_add = []
- for cls in classes:
- to_add.extend(cls.__subclasses__())
- classes.update(to_add)
- return classes
+ d.pop('__test_result__')
+ if d['stats'] is not None:
+ d['stats'] = TestStats(**d['stats'])
+ return TestResult(**d)
diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py
index c7801b7..164fe98 100644
--- a/Lib/test/libregrtest/save_env.py
+++ b/Lib/test/libregrtest/save_env.py
@@ -23,7 +23,7 @@ class SkipTestEnvironment(Exception):
class saved_test_environment:
"""Save bits of the test environment and restore them at block exit.
- with saved_test_environment(testname, verbose, quiet):
+ with saved_test_environment(test_name, verbose, quiet):
#stuff
Unless quiet is True, a warning is printed to stderr if any of
@@ -34,8 +34,8 @@ class saved_test_environment:
items is also printed.
"""
- def __init__(self, testname, verbose=0, quiet=False, *, pgo=False):
- self.testname = testname
+ def __init__(self, test_name, verbose=0, quiet=False, *, pgo=False):
+ self.test_name = test_name
self.verbose = verbose
self.quiet = quiet
self.pgo = pgo
@@ -323,7 +323,7 @@ class saved_test_environment:
restore(original)
if not self.quiet and not self.pgo:
print_warning(
- f"{name} was modified by {self.testname}\n"
+ f"{name} was modified by {self.test_name}\n"
f" Before: {original}\n"
f" After: {current} ")
return False
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 16a5056..f28a3a2 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -4,6 +4,7 @@ if __name__ != 'test.support':
raise ImportError('support must be imported from the test package')
import contextlib
+import dataclasses
import functools
import getpass
import _opcode
@@ -118,17 +119,20 @@ class Error(Exception):
class TestFailed(Error):
"""Test failed."""
+ def __init__(self, msg, *args, stats=None):
+ self.msg = msg
+ self.stats = stats
+ super().__init__(msg, *args)
+
+ def __str__(self):
+ return self.msg
class TestFailedWithDetails(TestFailed):
"""Test failed."""
- def __init__(self, msg, errors, failures):
- self.msg = msg
+ def __init__(self, msg, errors, failures, stats):
self.errors = errors
self.failures = failures
- super().__init__(msg, errors, failures)
-
- def __str__(self):
- return self.msg
+ super().__init__(msg, errors, failures, stats=stats)
class TestDidNotRun(Error):
"""Test did not run any subtests."""
@@ -1105,6 +1109,30 @@ def _filter_suite(suite, pred):
newtests.append(test)
suite._tests = newtests
+@dataclasses.dataclass(slots=True)
+class TestStats:
+ tests_run: int = 0
+ failures: int = 0
+ skipped: int = 0
+
+ @staticmethod
+ def from_unittest(result):
+ return TestStats(result.testsRun,
+ len(result.failures),
+ len(result.skipped))
+
+ @staticmethod
+ def from_doctest(results):
+ return TestStats(results.attempted,
+ results.failed,
+ results.skipped)
+
+ def accumulate(self, stats):
+ self.tests_run += stats.tests_run
+ self.failures += stats.failures
+ self.skipped += stats.skipped
+
+
def _run_suite(suite):
"""Run tests from a unittest.TestSuite-derived class."""
runner = get_test_runner(sys.stdout,
@@ -1119,6 +1147,7 @@ def _run_suite(suite):
if not result.testsRun and not result.skipped and not result.errors:
raise TestDidNotRun
if not result.wasSuccessful():
+ stats = TestStats.from_unittest(result)
if len(result.errors) == 1 and not result.failures:
err = result.errors[0][1]
elif len(result.failures) == 1 and not result.errors:
@@ -1128,7 +1157,8 @@ def _run_suite(suite):
if not verbose: err += "; run in verbose mode for details"
errors = [(str(tc), exc_str) for tc, exc_str in result.errors]
failures = [(str(tc), exc_str) for tc, exc_str in result.failures]
- raise TestFailedWithDetails(err, errors, failures)
+ raise TestFailedWithDetails(err, errors, failures, stats=stats)
+ return result
# By default, don't filter tests
@@ -1237,7 +1267,7 @@ def run_unittest(*classes):
else:
suite.addTest(loader.loadTestsFromTestCase(cls))
_filter_suite(suite, match_test)
- _run_suite(suite)
+ return _run_suite(suite)
#=======================================================================
# Check for the presence of docstrings.
@@ -1277,13 +1307,18 @@ def run_doctest(module, verbosity=None, optionflags=0):
else:
verbosity = None
- f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
- if f:
- raise TestFailed("%d of %d doctests failed" % (f, t))
+ results = doctest.testmod(module,
+ verbose=verbosity,
+ optionflags=optionflags)
+ if results.failed:
+ stats = TestStats.from_doctest(results)
+ raise TestFailed(f"{results.failed} of {results.attempted} "
+ f"doctests failed",
+ stats=stats)
if verbose:
print('doctest (%s) ... %d tests with zero failures' %
- (module.__name__, t))
- return f, t
+ (module.__name__, results.attempted))
+ return results
#=======================================================================
diff --git a/Lib/test/test_netrc.py b/Lib/test/test_netrc.py
index 573d636..b38cb32 100644
--- a/Lib/test/test_netrc.py
+++ b/Lib/test/test_netrc.py
@@ -309,7 +309,7 @@ class NetrcTestCase(unittest.TestCase):
('anonymous', '', 'pass'))
def test_main():
- run_unittest(NetrcTestCase)
+ return run_unittest(NetrcTestCase)
if __name__ == "__main__":
test_main()
diff --git a/Lib/test/test_pep646_syntax.py b/Lib/test/test_pep646_syntax.py
index 3ffa82d..12a4227 100644
--- a/Lib/test/test_pep646_syntax.py
+++ b/Lib/test/test_pep646_syntax.py
@@ -320,7 +320,7 @@ __test__ = {'doctests' : doctests}
def test_main(verbose=False):
from test import support
from test import test_pep646_syntax
- support.run_doctest(test_pep646_syntax, verbose)
+ return support.run_doctest(test_pep646_syntax, verbose)
if __name__ == "__main__":
test_main(verbose=True)
diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py
index 0c1400c..1c02d80 100644
--- a/Lib/test/test_regrtest.py
+++ b/Lib/test/test_regrtest.py
@@ -19,7 +19,7 @@ import textwrap
import unittest
from test import libregrtest
from test import support
-from test.support import os_helper
+from test.support import os_helper, TestStats
from test.libregrtest import utils, setup
if not support.has_subprocess_support:
@@ -409,7 +409,9 @@ class BaseTestCase(unittest.TestCase):
self.fail("%r not found in %r" % (regex, output))
return match
- def check_line(self, output, regex):
+ def check_line(self, output, regex, full=False):
+ if full:
+ regex += '\n'
regex = re.compile(r'^' + regex, re.MULTILINE)
self.assertRegex(output, regex)
@@ -421,21 +423,27 @@ class BaseTestCase(unittest.TestCase):
def check_executed_tests(self, output, tests, skipped=(), failed=(),
env_changed=(), omitted=(),
- rerun={}, no_test_ran=(),
+ rerun={}, run_no_tests=(),
+ resource_denied=(),
randomize=False, interrupted=False,
- fail_env_changed=False):
+ fail_env_changed=False,
+ *, stats):
if isinstance(tests, str):
tests = [tests]
if isinstance(skipped, str):
skipped = [skipped]
+ if isinstance(resource_denied, str):
+ resource_denied = [resource_denied]
if isinstance(failed, str):
failed = [failed]
if isinstance(env_changed, str):
env_changed = [env_changed]
if isinstance(omitted, str):
omitted = [omitted]
- if isinstance(no_test_ran, str):
- no_test_ran = [no_test_ran]
+ if isinstance(run_no_tests, str):
+ run_no_tests = [run_no_tests]
+ if isinstance(stats, int):
+ stats = TestStats(stats)
executed = self.parse_executed_tests(output)
if randomize:
@@ -479,12 +487,12 @@ class BaseTestCase(unittest.TestCase):
regex = LOG_PREFIX + f"Re-running {name} in verbose mode \\(matching: {match}\\)"
self.check_line(output, regex)
- if no_test_ran:
- regex = list_regex('%s test%s run no tests', no_test_ran)
+ if run_no_tests:
+ regex = list_regex('%s test%s run no tests', run_no_tests)
self.check_line(output, regex)
good = (len(tests) - len(skipped) - len(failed)
- - len(omitted) - len(env_changed) - len(no_test_ran))
+ - len(omitted) - len(env_changed) - len(run_no_tests))
if good:
regex = r'%s test%s OK\.$' % (good, plural(good))
if not skipped and not failed and good > 1:
@@ -494,6 +502,33 @@ class BaseTestCase(unittest.TestCase):
if interrupted:
self.check_line(output, 'Test suite interrupted by signal SIGINT.')
+ # Total tests
+ parts = [f'run={stats.tests_run:,}']
+ if stats.failures:
+ parts.append(f'failures={stats.failures:,}')
+ if stats.skipped:
+ parts.append(f'skipped={stats.skipped:,}')
+ line = fr'Total tests: {" ".join(parts)}'
+ self.check_line(output, line, full=True)
+
+ # Total test files
+ report = [f'success={good}']
+ if failed:
+ report.append(f'failed={len(failed)}')
+ if env_changed:
+ report.append(f'env_changed={len(env_changed)}')
+ if skipped:
+ report.append(f'skipped={len(skipped)}')
+ if resource_denied:
+ report.append(f'resource_denied={len(resource_denied)}')
+ if rerun:
+ report.append(f'rerun={len(rerun)}')
+ if run_no_tests:
+ report.append(f'run_no_tests={len(run_no_tests)}')
+ line = fr'Total test files: {" ".join(report)}'
+ self.check_line(output, line, full=True)
+
+ # Result
result = []
if failed:
result.append('FAILURE')
@@ -508,10 +543,8 @@ class BaseTestCase(unittest.TestCase):
result.append('SUCCESS')
result = ', '.join(result)
if rerun:
- self.check_line(output, 'Tests result: FAILURE')
result = 'FAILURE then %s' % result
-
- self.check_line(output, 'Tests result: %s' % result)
+ self.check_line(output, f'Result: {result}', full=True)
def parse_random_seed(self, output):
match = self.regex_search(r'Using random seed ([0-9]+)', output)
@@ -604,7 +637,8 @@ class ProgramsTestCase(BaseTestCase):
def check_output(self, output):
self.parse_random_seed(output)
- self.check_executed_tests(output, self.tests, randomize=True)
+ self.check_executed_tests(output, self.tests,
+ randomize=True, stats=len(self.tests))
def run_tests(self, args):
output = self.run_python(args)
@@ -718,7 +752,8 @@ class ArgsTestCase(BaseTestCase):
tests = [test_ok, test_failing]
output = self.run_tests(*tests, exitcode=EXITCODE_BAD_TEST)
- self.check_executed_tests(output, tests, failed=test_failing)
+ self.check_executed_tests(output, tests, failed=test_failing,
+ stats=TestStats(2, 1))
def test_resources(self):
# test -u command line option
@@ -737,17 +772,21 @@ class ArgsTestCase(BaseTestCase):
# -u all: 2 resources enabled
output = self.run_tests('-u', 'all', *test_names)
- self.check_executed_tests(output, test_names)
+ self.check_executed_tests(output, test_names, stats=2)
# -u audio: 1 resource enabled
output = self.run_tests('-uaudio', *test_names)
self.check_executed_tests(output, test_names,
- skipped=tests['network'])
+ skipped=tests['network'],
+ resource_denied=tests['network'],
+ stats=1)
# no option: 0 resources enabled
output = self.run_tests(*test_names)
self.check_executed_tests(output, test_names,
- skipped=test_names)
+ skipped=test_names,
+ resource_denied=test_names,
+ stats=0)
def test_random(self):
# test -r and --randseed command line option
@@ -795,7 +834,8 @@ class ArgsTestCase(BaseTestCase):
previous = name
output = self.run_tests('--fromfile', filename)
- self.check_executed_tests(output, tests)
+ stats = len(tests)
+ self.check_executed_tests(output, tests, stats=stats)
# test format '[2/7] test_opcodes'
with open(filename, "w") as fp:
@@ -803,7 +843,7 @@ class ArgsTestCase(BaseTestCase):
print("[%s/%s] %s" % (index, len(tests), name), file=fp)
output = self.run_tests('--fromfile', filename)
- self.check_executed_tests(output, tests)
+ self.check_executed_tests(output, tests, stats=stats)
# test format 'test_opcodes'
with open(filename, "w") as fp:
@@ -811,7 +851,7 @@ class ArgsTestCase(BaseTestCase):
print(name, file=fp)
output = self.run_tests('--fromfile', filename)
- self.check_executed_tests(output, tests)
+ self.check_executed_tests(output, tests, stats=stats)
# test format 'Lib/test/test_opcodes.py'
with open(filename, "w") as fp:
@@ -819,20 +859,20 @@ class ArgsTestCase(BaseTestCase):
print('Lib/test/%s.py' % name, file=fp)
output = self.run_tests('--fromfile', filename)
- self.check_executed_tests(output, tests)
+ self.check_executed_tests(output, tests, stats=stats)
def test_interrupted(self):
code = TEST_INTERRUPTED
test = self.create_test('sigint', code=code)
output = self.run_tests(test, exitcode=EXITCODE_INTERRUPTED)
self.check_executed_tests(output, test, omitted=test,
- interrupted=True)
+ interrupted=True, stats=0)
def test_slowest(self):
# test --slowest
tests = [self.create_test() for index in range(3)]
output = self.run_tests("--slowest", *tests)
- self.check_executed_tests(output, tests)
+ self.check_executed_tests(output, tests, stats=len(tests))
regex = ('10 slowest tests:\n'
'(?:- %s: .*\n){%s}'
% (self.TESTNAME_REGEX, len(tests)))
@@ -851,7 +891,8 @@ class ArgsTestCase(BaseTestCase):
args = ("--slowest", test)
output = self.run_tests(*args, exitcode=EXITCODE_INTERRUPTED)
self.check_executed_tests(output, test,
- omitted=test, interrupted=True)
+ omitted=test, interrupted=True,
+ stats=0)
regex = ('10 slowest tests:\n')
self.check_line(output, regex)
@@ -860,7 +901,7 @@ class ArgsTestCase(BaseTestCase):
# test --coverage
test = self.create_test('coverage')
output = self.run_tests("--coverage", test)
- self.check_executed_tests(output, [test])
+ self.check_executed_tests(output, [test], stats=1)
regex = (r'lines +cov% +module +\(path\)\n'
r'(?: *[0-9]+ *[0-9]{1,2}% *[^ ]+ +\([^)]+\)+)+')
self.check_line(output, regex)
@@ -890,7 +931,8 @@ class ArgsTestCase(BaseTestCase):
""")
test = self.create_test('forever', code=code)
output = self.run_tests('--forever', test, exitcode=EXITCODE_BAD_TEST)
- self.check_executed_tests(output, [test]*3, failed=test)
+ self.check_executed_tests(output, [test]*3, failed=test,
+ stats=TestStats(1, 1))
def check_leak(self, code, what):
test = self.create_test('huntrleaks', code=code)
@@ -900,7 +942,7 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests('--huntrleaks', '6:3:', test,
exitcode=EXITCODE_BAD_TEST,
stderr=subprocess.STDOUT)
- self.check_executed_tests(output, [test], failed=test)
+ self.check_executed_tests(output, [test], failed=test, stats=1)
line = 'beginning 9 repetitions\n123456789\n.........\n'
self.check_line(output, re.escape(line))
@@ -982,7 +1024,7 @@ class ArgsTestCase(BaseTestCase):
tests = [crash_test]
output = self.run_tests("-j2", *tests, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, tests, failed=crash_test,
- randomize=True)
+ randomize=True, stats=0)
def parse_methods(self, output):
regex = re.compile("^(test[^ ]+).*ok$", flags=re.MULTILINE)
@@ -1077,13 +1119,14 @@ class ArgsTestCase(BaseTestCase):
# don't fail by default
output = self.run_tests(testname)
- self.check_executed_tests(output, [testname], env_changed=testname)
+ self.check_executed_tests(output, [testname],
+ env_changed=testname, stats=1)
# fail with --fail-env-changed
output = self.run_tests("--fail-env-changed", testname,
exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname], env_changed=testname,
- fail_env_changed=True)
+ fail_env_changed=True, stats=1)
def test_rerun_fail(self):
# FAILURE then FAILURE
@@ -1102,7 +1145,9 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
- failed=testname, rerun={testname: "test_fail_always"})
+ failed=testname,
+ rerun={testname: "test_fail_always"},
+ stats=TestStats(1, 1))
def test_rerun_success(self):
# FAILURE then SUCCESS
@@ -1123,7 +1168,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=0)
self.check_executed_tests(output, [testname],
- rerun={testname: "test_fail_once"})
+ rerun={testname: "test_fail_once"},
+ stats=1)
def test_rerun_setup_class_hook_failure(self):
# FAILURE then FAILURE
@@ -1143,7 +1189,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "ExampleTests"})
+ rerun={testname: "ExampleTests"},
+ stats=0)
def test_rerun_teardown_class_hook_failure(self):
# FAILURE then FAILURE
@@ -1163,7 +1210,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "ExampleTests"})
+ rerun={testname: "ExampleTests"},
+ stats=1)
def test_rerun_setup_module_hook_failure(self):
# FAILURE then FAILURE
@@ -1182,7 +1230,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: testname})
+ rerun={testname: testname},
+ stats=0)
def test_rerun_teardown_module_hook_failure(self):
# FAILURE then FAILURE
@@ -1201,7 +1250,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: testname})
+ rerun={testname: testname},
+ stats=1)
def test_rerun_setup_hook_failure(self):
# FAILURE then FAILURE
@@ -1220,7 +1270,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "test_success"})
+ rerun={testname: "test_success"},
+ stats=1)
def test_rerun_teardown_hook_failure(self):
# FAILURE then FAILURE
@@ -1239,7 +1290,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "test_success"})
+ rerun={testname: "test_success"},
+ stats=1)
def test_rerun_async_setup_hook_failure(self):
# FAILURE then FAILURE
@@ -1258,7 +1310,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "test_success"})
+ rerun={testname: "test_success"},
+ stats=1)
def test_rerun_async_teardown_hook_failure(self):
# FAILURE then FAILURE
@@ -1277,7 +1330,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-w", testname, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, testname,
failed=[testname],
- rerun={testname: "test_success"})
+ rerun={testname: "test_success"},
+ stats=1)
def test_no_tests_ran(self):
code = textwrap.dedent("""
@@ -1291,7 +1345,9 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests(testname, "-m", "nosuchtest",
exitcode=EXITCODE_NO_TESTS_RAN)
- self.check_executed_tests(output, [testname], no_test_ran=testname)
+ self.check_executed_tests(output, [testname],
+ run_no_tests=testname,
+ stats=0)
def test_no_tests_ran_skip(self):
code = textwrap.dedent("""
@@ -1304,7 +1360,8 @@ class ArgsTestCase(BaseTestCase):
testname = self.create_test(code=code)
output = self.run_tests(testname)
- self.check_executed_tests(output, [testname])
+ self.check_executed_tests(output, [testname],
+ stats=TestStats(1, skipped=1))
def test_no_tests_ran_multiple_tests_nonexistent(self):
code = textwrap.dedent("""
@@ -1320,7 +1377,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests(testname, testname2, "-m", "nosuchtest",
exitcode=EXITCODE_NO_TESTS_RAN)
self.check_executed_tests(output, [testname, testname2],
- no_test_ran=[testname, testname2])
+ run_no_tests=[testname, testname2],
+ stats=0)
def test_no_test_ran_some_test_exist_some_not(self):
code = textwrap.dedent("""
@@ -1343,7 +1401,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests(testname, testname2, "-m", "nosuchtest",
"-m", "test_other_bug", exitcode=0)
self.check_executed_tests(output, [testname, testname2],
- no_test_ran=[testname])
+ run_no_tests=[testname],
+ stats=1)
@support.cpython_only
def test_uncollectable(self):
@@ -1370,7 +1429,8 @@ class ArgsTestCase(BaseTestCase):
exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname],
env_changed=[testname],
- fail_env_changed=True)
+ fail_env_changed=True,
+ stats=1)
def test_multiprocessing_timeout(self):
code = textwrap.dedent(r"""
@@ -1396,7 +1456,7 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests("-j2", "--timeout=1.0", testname,
exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
- failed=testname)
+ failed=testname, stats=0)
self.assertRegex(output,
re.compile('%s timed out' % testname, re.MULTILINE))
@@ -1430,7 +1490,8 @@ class ArgsTestCase(BaseTestCase):
exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname],
env_changed=[testname],
- fail_env_changed=True)
+ fail_env_changed=True,
+ stats=1)
self.assertIn("Warning -- Unraisable exception", output)
self.assertIn("Exception: weakref callback bug", output)
@@ -1462,7 +1523,8 @@ class ArgsTestCase(BaseTestCase):
exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname],
env_changed=[testname],
- fail_env_changed=True)
+ fail_env_changed=True,
+ stats=1)
self.assertIn("Warning -- Uncaught thread exception", output)
self.assertIn("Exception: bug in thread", output)
@@ -1503,7 +1565,8 @@ class ArgsTestCase(BaseTestCase):
output = self.run_tests(*cmd, exitcode=EXITCODE_ENV_CHANGED)
self.check_executed_tests(output, [testname],
env_changed=[testname],
- fail_env_changed=True)
+ fail_env_changed=True,
+ stats=1)
self.assertRegex(output, regex)
def test_unicode_guard_env(self):
@@ -1550,7 +1613,8 @@ class ArgsTestCase(BaseTestCase):
self.check_executed_tests(output, testnames,
env_changed=testnames,
fail_env_changed=True,
- randomize=True)
+ randomize=True,
+ stats=len(testnames))
for testname in testnames:
self.assertIn(f"Warning -- {testname} leaked temporary "
f"files (1): mytmpfile",
@@ -1589,7 +1653,47 @@ class ArgsTestCase(BaseTestCase):
exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
failed=[testname],
- randomize=True)
+ randomize=True,
+ stats=0)
+
+ def test_doctest(self):
+ code = textwrap.dedent(fr'''
+ import doctest
+ import sys
+ from test import support
+
+ def my_function():
+ """
+ Pass:
+
+ >>> 1 + 1
+ 2
+
+ Failure:
+
+ >>> 2 + 3
+ 23
+ >>> 1 + 1
+ 11
+
+ Skipped test (ignored):
+
+ >>> id(1.0) # doctest: +SKIP
+ 7948648
+ """
+
+ def test_main():
+ testmod = sys.modules[__name__]
+ return support.run_doctest(testmod)
+ ''')
+ testname = self.create_test(code=code)
+
+ output = self.run_tests("--fail-env-changed", "-v", "-j1", testname,
+ exitcode=EXITCODE_BAD_TEST)
+ self.check_executed_tests(output, [testname],
+ failed=[testname],
+ randomize=True,
+ stats=TestStats(4, 2, 1))
class TestUtils(unittest.TestCase):
diff --git a/Lib/test/test_xml_etree.py b/Lib/test/test_xml_etree.py
index b9352cb..da2828c 100644
--- a/Lib/test/test_xml_etree.py
+++ b/Lib/test/test_xml_etree.py
@@ -4250,7 +4250,7 @@ def test_main(module=None):
old_factories = None
try:
- support.run_unittest(*test_classes)
+ return support.run_unittest(*test_classes)
finally:
from xml.etree import ElementPath
# Restore mapping and path cache