diff options
Diffstat (limited to 'Lib/test/libregrtest/runtest_mp.py')
-rw-r--r-- | Lib/test/libregrtest/runtest_mp.py | 52 |
1 files changed, 24 insertions, 28 deletions
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py index 45db24f..ecdde3a 100644 --- a/Lib/test/libregrtest/runtest_mp.py +++ b/Lib/test/libregrtest/runtest_mp.py @@ -10,7 +10,7 @@ import tempfile import threading import time import traceback -from typing import NamedTuple, NoReturn, Literal, Any, TextIO +from typing import NoReturn, Literal, Any, TextIO from test import support from test.support import os_helper @@ -19,9 +19,9 @@ from test.support import TestStats from test.libregrtest.cmdline import Namespace from test.libregrtest.main import Regrtest from test.libregrtest.runtest import ( - runtest, TestResult, State, PROGRESS_MIN_TIME, + run_single_test, TestResult, State, PROGRESS_MIN_TIME, FilterTuple, RunTests) -from test.libregrtest.setup import setup_tests +from test.libregrtest.setup import setup_tests, setup_test_dir from test.libregrtest.utils import format_duration, print_warning if sys.platform == 'win32': @@ -48,7 +48,6 @@ USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg")) class WorkerJob: runtests: RunTests namespace: Namespace - match_tests: FilterTuple | None = None class _EncodeWorkerJob(json.JSONEncoder): @@ -126,9 +125,10 @@ def worker_process(worker_json: str) -> NoReturn: runtests = worker_job.runtests ns = worker_job.namespace test_name = runtests.tests[0] - match_tests: FilterTuple | None = worker_job.match_tests + match_tests: FilterTuple | None = runtests.match_tests - setup_tests(ns) + setup_test_dir(ns.testdir) + setup_tests(runtests, ns) if runtests.rerun: if match_tests: @@ -138,10 +138,7 @@ def worker_process(worker_json: str) -> NoReturn: print(f"Re-running {test_name} in verbose mode", flush=True) ns.verbose = True - if match_tests is not None: - ns.match_tests = match_tests - - result = runtest(ns, test_name) + result = run_single_test(test_name, runtests, ns) print() # Force a newline (just in case) # Serialize TestResult as dict in JSON @@ -330,11 +327,13 @@ class TestWorkerProcess(threading.Thread): match_tests = self.runtests.get_match_tests(test_name) else: match_tests = None - worker_runtests = self.runtests.copy(tests=tests) + kwargs = {} + if match_tests: + kwargs['match_tests'] = match_tests + worker_runtests = self.runtests.copy(tests=tests, **kwargs) worker_job = WorkerJob( worker_runtests, - namespace=self.ns, - match_tests=match_tests) + namespace=self.ns) # gh-94026: Write stdout+stderr to a tempfile as workaround for # non-blocking pipes on Emscripten with NodeJS. @@ -401,7 +400,7 @@ class TestWorkerProcess(threading.Thread): return MultiprocessResult(result, stdout) def run(self) -> None: - fail_fast = self.ns.failfast + fail_fast = self.runtests.fail_fast fail_env_changed = self.ns.fail_env_changed while not self._stopped: try: @@ -473,7 +472,6 @@ def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]: class MultiprocessTestRunner: def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None: ns = regrtest.ns - timeout = ns.timeout self.regrtest = regrtest self.runtests = runtests @@ -483,24 +481,24 @@ class MultiprocessTestRunner: self.output: queue.Queue[QueueOutput] = queue.Queue() tests_iter = runtests.iter_tests() self.pending = MultiprocessIterator(tests_iter) - if timeout is not None: + self.timeout = runtests.timeout + if self.timeout is not None: # Rely on faulthandler to kill a worker process. This timouet is # when faulthandler fails to kill a worker process. Give a maximum # of 5 minutes to faulthandler to kill the worker. - self.worker_timeout = min(timeout * 1.5, timeout + 5 * 60) + self.worker_timeout = min(self.timeout * 1.5, self.timeout + 5 * 60) else: self.worker_timeout = None self.workers = None def start_workers(self) -> None: use_mp = self.ns.use_mp - timeout = self.ns.timeout self.workers = [TestWorkerProcess(index, self) for index in range(1, use_mp + 1)] msg = f"Run tests in parallel using {len(self.workers)} child processes" - if timeout: + if self.timeout: msg += (" (timeout: %s, worker timeout: %s)" - % (format_duration(timeout), + % (format_duration(self.timeout), format_duration(self.worker_timeout))) self.log(msg) for worker in self.workers: @@ -514,9 +512,8 @@ class MultiprocessTestRunner: worker.wait_stopped(start_time) def _get_result(self) -> QueueOutput | None: - pgo = self.ns.pgo - use_faulthandler = (self.ns.timeout is not None) - timeout = PROGRESS_UPDATE + pgo = self.runtests.pgo + use_faulthandler = (self.timeout is not None) # bpo-46205: check the status of workers every iteration to avoid # waiting forever on an empty queue. @@ -527,7 +524,7 @@ class MultiprocessTestRunner: # wait for a thread try: - return self.output.get(timeout=timeout) + return self.output.get(timeout=PROGRESS_UPDATE) except queue.Empty: pass @@ -544,7 +541,7 @@ class MultiprocessTestRunner: def display_result(self, mp_result: MultiprocessResult) -> None: result = mp_result.result - pgo = self.ns.pgo + pgo = self.runtests.pgo text = str(result) if mp_result.err_msg: @@ -580,9 +577,8 @@ class MultiprocessTestRunner: return result def run_tests(self) -> None: - fail_fast = self.ns.failfast + fail_fast = self.runtests.fail_fast fail_env_changed = self.ns.fail_env_changed - timeout = self.ns.timeout self.start_workers() @@ -600,7 +596,7 @@ class MultiprocessTestRunner: print() self.regrtest.interrupted = True finally: - if timeout is not None: + if self.timeout is not None: faulthandler.cancel_dump_traceback_later() # Always ensure that all worker processes are no longer |