summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-09-10 00:24:38 (GMT)
committerGitHub <noreply@github.com>2023-09-10 00:24:38 (GMT)
commit0553fdfe3040073307e8c53273041130148541d5 (patch)
tree6a13843168e13493b8ef07e01ffc2ac812bbe964
parent0c0f254230bbc9532a7e78077a639a3ac940953c (diff)
downloadcpython-0553fdfe3040073307e8c53273041130148541d5.zip
cpython-0553fdfe3040073307e8c53273041130148541d5.tar.gz
cpython-0553fdfe3040073307e8c53273041130148541d5.tar.bz2
gh-109162: Refactor libregrtest.runtest_mp (#109205)
* Add attributes to Regrtest and RunTests: * fail_env_changed * num_workers * Rename MultiprocessTestRunner to RunWorkers. Add num_workers parameters to RunWorkers constructor. Remove RunWorkers.ns attribute. * Rename TestWorkerProcess to WorkerThread. * get_running() now returns a string like: "running (...): ...". * Regrtest.action_run_tests() now selects the number of worker processes, instead of the command line parser.
-rw-r--r--Lib/test/libregrtest/cmdline.py6
-rw-r--r--Lib/test/libregrtest/main.py44
-rw-r--r--Lib/test/libregrtest/runtest.py1
-rw-r--r--Lib/test/libregrtest/runtest_mp.py62
4 files changed, 57 insertions, 56 deletions
diff --git a/Lib/test/libregrtest/cmdline.py b/Lib/test/libregrtest/cmdline.py
index 9afb132..2835546 100644
--- a/Lib/test/libregrtest/cmdline.py
+++ b/Lib/test/libregrtest/cmdline.py
@@ -1,5 +1,5 @@
import argparse
-import os
+import os.path
import shlex
import sys
from test.support import os_helper
@@ -410,10 +410,6 @@ def _parse_args(args, **kwargs):
if ns.timeout is not None:
if ns.timeout <= 0:
ns.timeout = None
- if ns.use_mp is not None:
- if ns.use_mp <= 0:
- # Use all cores + extras for tests that like to sleep
- ns.use_mp = 2 + (os.cpu_count() or 1)
if ns.use:
for a in ns.use:
for r in a:
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index 4066d06..1fa7b07 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -83,8 +83,18 @@ class Regrtest:
self.fromfile: str | None = ns.fromfile
self.starting_test: str | None = ns.start
+ # Run tests
+ if ns.use_mp is None:
+ num_workers = 0 # run sequentially
+ elif ns.use_mp <= 0:
+ num_workers = -1 # use the number of CPUs
+ else:
+ num_workers = ns.use_mp
+ self.num_workers: int = num_workers
+
# Options to run tests
self.fail_fast: bool = ns.failfast
+ self.fail_env_changed: bool = ns.fail_env_changed
self.forever: bool = ns.forever
self.randomize: bool = ns.randomize
self.random_seed: int | None = ns.random_seed
@@ -150,7 +160,6 @@ class Regrtest:
| set(self.run_no_tests))
def accumulate_result(self, result, rerun=False):
- fail_env_changed = self.ns.fail_env_changed
test_name = result.test_name
match result.state:
@@ -167,7 +176,7 @@ class Regrtest:
case State.DID_NOT_RUN:
self.run_no_tests.append(test_name)
case _:
- if result.is_failed(fail_env_changed):
+ if result.is_failed(self.fail_env_changed):
self.bad.append(test_name)
self.need_rerun.append(result)
else:
@@ -339,9 +348,8 @@ class Regrtest:
def _rerun_failed_tests(self, need_rerun, runtests: RunTests):
# Configure the runner to re-run tests
- ns = self.ns
- if ns.use_mp is None:
- ns.use_mp = 1
+ if self.num_workers == 0:
+ self.num_workers = 1
# Get tests to re-run
tests = [result.test_name for result in need_rerun]
@@ -363,7 +371,7 @@ class Regrtest:
match_tests_dict=match_tests_dict,
output_on_failure=False)
self.set_tests(runtests)
- self._run_tests_mp(runtests)
+ self._run_tests_mp(runtests, self.num_workers)
return runtests
def rerun_failed_tests(self, need_rerun, runtests: RunTests):
@@ -471,7 +479,6 @@ class Regrtest:
def run_tests_sequentially(self, runtests):
ns = self.ns
coverage = ns.trace
- fail_env_changed = ns.fail_env_changed
if coverage:
import trace
@@ -503,7 +510,7 @@ class Regrtest:
if module not in save_modules and module.startswith("test."):
support.unload(module)
- if result.must_stop(self.fail_fast, fail_env_changed):
+ if result.must_stop(self.fail_fast, self.fail_env_changed):
break
previous_test = str(result)
@@ -564,12 +571,10 @@ class Regrtest:
self.environment_changed))
def get_tests_state(self):
- fail_env_changed = self.ns.fail_env_changed
-
result = []
if self.bad:
result.append("FAILURE")
- elif fail_env_changed and self.environment_changed:
+ elif self.fail_env_changed and self.environment_changed:
result.append("ENV CHANGED")
elif self.no_tests_run():
result.append("NO TESTS RAN")
@@ -585,8 +590,9 @@ class Regrtest:
result = '%s then %s' % (self.first_state, result)
return result
- def _run_tests_mp(self, runtests: RunTests) -> None:
- from test.libregrtest.runtest_mp import run_tests_multiprocess
+ def _run_tests_mp(self, runtests: RunTests, num_workers: int) -> None:
+ from test.libregrtest.runtest_mp import RunWorkers
+
# If we're on windows and this is the parent runner (not a worker),
# track the load average.
if sys.platform == 'win32':
@@ -600,7 +606,7 @@ class Regrtest:
print(f'Failed to create WindowsLoadTracker: {error}')
try:
- run_tests_multiprocess(self, runtests)
+ RunWorkers(self, runtests, num_workers).run()
finally:
if self.win_load_tracker is not None:
self.win_load_tracker.close()
@@ -618,8 +624,8 @@ class Regrtest:
def run_tests(self, runtests: RunTests):
self.first_runtests = runtests
self.set_tests(runtests)
- if self.ns.use_mp:
- self._run_tests_mp(runtests)
+ if self.num_workers:
+ self._run_tests_mp(runtests, self.num_workers)
tracer = None
else:
tracer = self.run_tests_sequentially(runtests)
@@ -843,7 +849,7 @@ class Regrtest:
exitcode = EXITCODE_BAD_TEST
elif self.interrupted:
exitcode = EXITCODE_INTERRUPTED
- elif self.ns.fail_env_changed and self.environment_changed:
+ elif self.fail_env_changed and self.environment_changed:
exitcode = EXITCODE_ENV_CHANGED
elif self.no_tests_run():
exitcode = EXITCODE_NO_TESTS_RAN
@@ -866,6 +872,10 @@ class Regrtest:
if self.randomize:
print("Using random seed", self.random_seed)
+ if self.num_workers < 0:
+ # Use all cores + extras for tests that like to sleep
+ self.num_workers = 2 + (os.cpu_count() or 1)
+
runtests = RunTests(
tuple(self.selected),
fail_fast=self.fail_fast,
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
index dc574ed..6677017 100644
--- a/Lib/test/libregrtest/runtest.py
+++ b/Lib/test/libregrtest/runtest.py
@@ -217,6 +217,7 @@ class TestResult:
class RunTests:
tests: TestTuple
fail_fast: bool = False
+ fail_env_changed: bool = False
match_tests: FilterTuple | None = None
ignore_tests: FilterTuple | None = None
match_tests_dict: FilterDict | None = None
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
index e4a9301..28c05b5 100644
--- a/Lib/test/libregrtest/runtest_mp.py
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -16,7 +16,6 @@ from test import support
from test.support import os_helper
from test.support import TestStats
-from test.libregrtest.cmdline import Namespace
from test.libregrtest.main import Regrtest
from test.libregrtest.runtest import (
run_single_test, TestResult, State, PROGRESS_MIN_TIME,
@@ -150,14 +149,13 @@ class ExitThread(Exception):
pass
-class TestWorkerProcess(threading.Thread):
- def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
+class WorkerThread(threading.Thread):
+ def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
super().__init__()
self.worker_id = worker_id
self.runtests = runner.runtests
self.pending = runner.pending
self.output = runner.output
- self.ns = runner.ns
self.timeout = runner.worker_timeout
self.regrtest = runner.regrtest
self.current_test_name = None
@@ -167,7 +165,7 @@ class TestWorkerProcess(threading.Thread):
self._stopped = False
def __repr__(self) -> str:
- info = [f'TestWorkerProcess #{self.worker_id}']
+ info = [f'WorkerThread #{self.worker_id}']
if self.is_alive():
info.append("running")
else:
@@ -203,7 +201,7 @@ class TestWorkerProcess(threading.Thread):
else:
popen.kill()
except ProcessLookupError:
- # popen.kill(): the process completed, the TestWorkerProcess thread
+ # popen.kill(): the process completed, the WorkerThread thread
# read its exit status, but Popen.send_signal() read the returncode
# just before Popen.wait() set returncode.
pass
@@ -362,7 +360,7 @@ class TestWorkerProcess(threading.Thread):
def run(self) -> None:
fail_fast = self.runtests.fail_fast
- fail_env_changed = self.ns.fail_env_changed
+ fail_env_changed = self.runtests.fail_env_changed
while not self._stopped:
try:
try:
@@ -394,10 +392,10 @@ class TestWorkerProcess(threading.Thread):
f"{exc!r}")
def wait_stopped(self, start_time: float) -> None:
- # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
+ # bpo-38207: RunWorkers.stop_workers() called self.stop()
# which killed the process. Sometimes, killing the process from the
# main thread does not interrupt popen.communicate() in
- # TestWorkerProcess thread. This loop with a timeout is a workaround
+ # WorkerThread thread. This loop with a timeout is a workaround
# for that.
#
# Moreover, if this method fails to join the thread, it is likely
@@ -417,7 +415,7 @@ class TestWorkerProcess(threading.Thread):
break
-def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
+def get_running(workers: list[WorkerThread]) -> list[str]:
running = []
for worker in workers:
current_test_name = worker.current_test_name
@@ -427,18 +425,17 @@ def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
if dt >= PROGRESS_MIN_TIME:
text = '%s (%s)' % (current_test_name, format_duration(dt))
running.append(text)
- return running
+ if not running:
+ return None
+ return f"running ({len(running)}): {', '.join(running)}"
-class MultiprocessTestRunner:
- def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None:
- ns = regrtest.ns
-
+class RunWorkers:
+ def __init__(self, regrtest: Regrtest, runtests: RunTests, num_workers: int) -> None:
self.regrtest = regrtest
+ self.log = regrtest.log
+ self.num_workers = num_workers
self.runtests = runtests
- self.rerun = runtests.rerun
- self.log = self.regrtest.log
- self.ns = ns
self.output: queue.Queue[QueueOutput] = queue.Queue()
tests_iter = runtests.iter_tests()
self.pending = MultiprocessIterator(tests_iter)
@@ -453,9 +450,8 @@ class MultiprocessTestRunner:
self.workers = None
def start_workers(self) -> None:
- use_mp = self.ns.use_mp
- self.workers = [TestWorkerProcess(index, self)
- for index in range(1, use_mp + 1)]
+ self.workers = [WorkerThread(index, self)
+ for index in range(1, self.num_workers + 1)]
msg = f"Run tests in parallel using {len(self.workers)} child processes"
if self.timeout:
msg += (" (timeout: %s, worker timeout: %s)"
@@ -489,10 +485,11 @@ class MultiprocessTestRunner:
except queue.Empty:
pass
- # display progress
- running = get_running(self.workers)
- if running and not pgo:
- self.log('running: %s' % ', '.join(running))
+ if not pgo:
+ # display progress
+ running = get_running(self.workers)
+ if running:
+ self.log(running)
# all worker threads are done: consume pending results
try:
@@ -510,9 +507,10 @@ class MultiprocessTestRunner:
text += ' (%s)' % mp_result.err_msg
elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
text += ' (%s)' % format_duration(result.duration)
- running = get_running(self.workers)
- if running and not pgo:
- text += ' -- running: %s' % ', '.join(running)
+ if not pgo:
+ running = get_running(self.workers)
+ if running:
+ text += f' -- {running}'
self.regrtest.display_progress(self.test_index, text)
def _process_result(self, item: QueueOutput) -> bool:
@@ -537,9 +535,9 @@ class MultiprocessTestRunner:
return result
- def run_tests(self) -> None:
+ def run(self) -> None:
fail_fast = self.runtests.fail_fast
- fail_env_changed = self.ns.fail_env_changed
+ fail_env_changed = self.runtests.fail_env_changed
self.start_workers()
@@ -566,10 +564,6 @@ class MultiprocessTestRunner:
self.stop_workers()
-def run_tests_multiprocess(regrtest: Regrtest, runtests: RunTests) -> None:
- MultiprocessTestRunner(regrtest, runtests).run_tests()
-
-
class EncodeTestResult(json.JSONEncoder):
"""Encode a TestResult (sub)class object into a JSON dict."""