summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSam Gross <colesbury@gmail.com>2024-04-08 14:47:42 (GMT)
committerGitHub <noreply@github.com>2024-04-08 14:47:42 (GMT)
commit26a680a58524fe39eecb243e37adfa6e157466f6 (patch)
tree771940c33d30b40d10b1880465b45a2a0306ef7b
parent59864edd572b5c0cc3be58087a9ea3a700226146 (diff)
downloadcpython-26a680a58524fe39eecb243e37adfa6e157466f6.zip
cpython-26a680a58524fe39eecb243e37adfa6e157466f6.tar.gz
cpython-26a680a58524fe39eecb243e37adfa6e157466f6.tar.bz2
gh-117293: Fix race condition in run_workers.py (#117298)
The worker thread may still be alive after it enqueues it's last result, which can lead to a delay of 30 seconds after the test finishes. This happens much more frequently in the free-threaded build with the GIL disabled. This changes run_workers.py to track of live workers by enqueueing a `WorkerExited()` instance before the worker exits.
-rw-r--r--Lib/test/libregrtest/run_workers.py37
1 files changed, 21 insertions, 16 deletions
diff --git a/Lib/test/libregrtest/run_workers.py b/Lib/test/libregrtest/run_workers.py
index 9cfe1b9..235047c 100644
--- a/Lib/test/libregrtest/run_workers.py
+++ b/Lib/test/libregrtest/run_workers.py
@@ -79,8 +79,12 @@ class MultiprocessResult:
err_msg: str | None = None
+class WorkerThreadExited:
+ """Indicates that a worker thread has exited"""
+
ExcStr = str
QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
+QueueContent = QueueOutput | WorkerThreadExited
class ExitThread(Exception):
@@ -376,8 +380,8 @@ class WorkerThread(threading.Thread):
def run(self) -> None:
fail_fast = self.runtests.fail_fast
fail_env_changed = self.runtests.fail_env_changed
- while not self._stopped:
- try:
+ try:
+ while not self._stopped:
try:
test_name = next(self.pending)
except StopIteration:
@@ -396,11 +400,12 @@ class WorkerThread(threading.Thread):
if mp_result.result.must_stop(fail_fast, fail_env_changed):
break
- except ExitThread:
- break
- except BaseException:
- self.output.put((True, traceback.format_exc()))
- break
+ except ExitThread:
+ pass
+ except BaseException:
+ self.output.put((True, traceback.format_exc()))
+ finally:
+ self.output.put(WorkerThreadExited())
def _wait_completed(self) -> None:
popen = self._popen
@@ -458,8 +463,9 @@ class RunWorkers:
self.log = logger.log
self.display_progress = logger.display_progress
self.results: TestResults = results
+ self.live_worker_count = 0
- self.output: queue.Queue[QueueOutput] = queue.Queue()
+ self.output: queue.Queue[QueueContent] = queue.Queue()
tests_iter = runtests.iter_tests()
self.pending = MultiprocessIterator(tests_iter)
self.timeout = runtests.timeout
@@ -497,6 +503,7 @@ class RunWorkers:
self.log(msg)
for worker in self.workers:
worker.start()
+ self.live_worker_count += 1
def stop_workers(self) -> None:
start_time = time.monotonic()
@@ -511,14 +518,18 @@ class RunWorkers:
# bpo-46205: check the status of workers every iteration to avoid
# waiting forever on an empty queue.
- while any(worker.is_alive() for worker in self.workers):
+ while self.live_worker_count > 0:
if use_faulthandler:
faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
exit=True)
# wait for a thread
try:
- return self.output.get(timeout=PROGRESS_UPDATE)
+ result = self.output.get(timeout=PROGRESS_UPDATE)
+ if isinstance(result, WorkerThreadExited):
+ self.live_worker_count -= 1
+ continue
+ return result
except queue.Empty:
pass
@@ -528,12 +539,6 @@ class RunWorkers:
if running:
self.log(running)
- # all worker threads are done: consume pending results
- try:
- return self.output.get(timeout=0)
- except queue.Empty:
- return None
-
def display_result(self, mp_result: MultiprocessResult) -> None:
result = mp_result.result
pgo = self.runtests.pgo