import dataclasses import json from typing import Any from .utils import ( StrJSON, TestName, FilterTuple, format_duration, normalize_test_name, print_warning) @dataclasses.dataclass(slots=True) class TestStats: tests_run: int = 0 failures: int = 0 skipped: int = 0 @staticmethod def from_unittest(result): return TestStats(result.testsRun, len(result.failures), len(result.skipped)) @staticmethod def from_doctest(results): return TestStats(results.attempted, results.failed, results.skipped) def accumulate(self, stats): self.tests_run += stats.tests_run self.failures += stats.failures self.skipped += stats.skipped # Avoid enum.Enum to reduce the number of imports when tests are run class State: PASSED = "PASSED" FAILED = "FAILED" SKIPPED = "SKIPPED" UNCAUGHT_EXC = "UNCAUGHT_EXC" REFLEAK = "REFLEAK" ENV_CHANGED = "ENV_CHANGED" RESOURCE_DENIED = "RESOURCE_DENIED" INTERRUPTED = "INTERRUPTED" WORKER_FAILED = "WORKER_FAILED" # non-zero worker process exit code WORKER_BUG = "WORKER_BUG" # exception when running a worker DID_NOT_RUN = "DID_NOT_RUN" TIMEOUT = "TIMEOUT" @staticmethod def is_failed(state): return state in { State.FAILED, State.UNCAUGHT_EXC, State.REFLEAK, State.WORKER_FAILED, State.WORKER_BUG, State.TIMEOUT} @staticmethod def has_meaningful_duration(state): # Consider that the duration is meaningless for these cases. # For example, if a whole test file is skipped, its duration # is unlikely to be the duration of executing its tests, # but just the duration to execute code which skips the test. return state not in { State.SKIPPED, State.RESOURCE_DENIED, State.INTERRUPTED, State.WORKER_FAILED, State.WORKER_BUG, State.DID_NOT_RUN} @staticmethod def must_stop(state): return state in { State.INTERRUPTED, State.WORKER_BUG, } @dataclasses.dataclass(slots=True) class TestResult: test_name: TestName state: str | None = None # Test duration in seconds duration: float | None = None xml_data: list[str] | None = None stats: TestStats | None = None # errors and failures copied from support.TestFailedWithDetails errors: list[tuple[str, str]] | None = None failures: list[tuple[str, str]] | None = None def is_failed(self, fail_env_changed: bool) -> bool: if self.state == State.ENV_CHANGED: return fail_env_changed return State.is_failed(self.state) def _format_failed(self): if self.errors and self.failures: le = len(self.errors) lf = len(self.failures) error_s = "error" + ("s" if le > 1 else "") failure_s = "failure" + ("s" if lf > 1 else "") return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})" if self.errors: le = len(self.errors) error_s = "error" + ("s" if le > 1 else "") return f"{self.test_name} failed ({le} {error_s})" if self.failures: lf = len(self.failures) failure_s = "failure" + ("s" if lf > 1 else "") return f"{self.test_name} failed ({lf} {failure_s})" return f"{self.test_name} failed" def __str__(self) -> str: match self.state: case State.PASSED: return f"{self.test_name} passed" case State.FAILED: return self._format_failed() case State.SKIPPED: return f"{self.test_name} skipped" case State.UNCAUGHT_EXC: return f"{self.test_name} failed (uncaught exception)" case State.REFLEAK: return f"{self.test_name} failed (reference leak)" case State.ENV_CHANGED: return f"{self.test_name} failed (env changed)" case State.RESOURCE_DENIED: return f"{self.test_name} skipped (resource denied)" case State.INTERRUPTED: return f"{self.test_name} interrupted" case State.WORKER_FAILED: return f"{self.test_name} worker non-zero exit code" case State.WORKER_BUG: return f"{self.test_name} worker bug" case State.DID_NOT_RUN: return f"{self.test_name} ran no tests" case State.TIMEOUT: return f"{self.test_name} timed out ({format_duration(self.duration)})" case _: raise ValueError("unknown result state: {state!r}") def has_meaningful_duration(self): return State.has_meaningful_duration(self.state) def set_env_changed(self): if self.state is None or self.state == State.PASSED: self.state = State.ENV_CHANGED def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool: if State.must_stop(self.state): return True if fail_fast and self.is_failed(fail_env_changed): return True return False def get_rerun_match_tests(self) -> FilterTuple | None: match_tests = [] errors = self.errors or [] failures = self.failures or [] for error_list, is_error in ( (errors, True), (failures, False), ): for full_name, *_ in error_list: match_name = normalize_test_name(full_name, is_error=is_error) if match_name is None: # 'setUpModule (test.test_sys)': don't filter tests return None if not match_name: error_type = "ERROR" if is_error else "FAIL" print_warning(f"rerun failed to parse {error_type} test name: " f"{full_name!r}: don't filter tests") return None match_tests.append(match_name) if not match_tests: return None return tuple(match_tests) def write_json_into(self, file) -> None: json.dump(self, file, cls=_EncodeTestResult) @staticmethod def from_json(worker_json: StrJSON) -> 'TestResult': return json.loads(worker_json, object_hook=_decode_test_result) class _EncodeTestResult(json.JSONEncoder): def default(self, o: Any) -> dict[str, Any]: if isinstance(o, TestResult): result = dataclasses.asdict(o) result["__test_result__"] = o.__class__.__name__ return result else: return super().default(o) def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]: if "__test_result__" in data: data.pop('__test_result__') if data['stats'] is not None: data['stats'] = TestStats(**data['stats']) return TestResult(**data) else: return data