summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2011-07-02 19:20:25 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2011-07-02 19:20:25 (GMT)
commit020436b0d4ae271638ed5d0881c1fa7f7c0a1b09 (patch)
treead620d4691d253b267ff7c3d03c7bc3811bceca9 /Lib
parentaac0f75b3b28513c45116534720e66b0262e2e72 (diff)
downloadcpython-020436b0d4ae271638ed5d0881c1fa7f7c0a1b09.zip
cpython-020436b0d4ae271638ed5d0881c1fa7f7c0a1b09.tar.gz
cpython-020436b0d4ae271638ed5d0881c1fa7f7c0a1b09.tar.bz2
Issue #12456: fix a possible hang on shutdown of a concurrent.futures.ProcessPoolExecutor.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/concurrent/futures/process.py30
-rw-r--r--Lib/test/test_concurrent_futures.py7
2 files changed, 28 insertions, 9 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index c2331e7..689f9ba 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -50,7 +50,7 @@ import os
from concurrent.futures import _base
import queue
import multiprocessing
-from multiprocessing.queues import SimpleQueue, SentinelReady
+from multiprocessing.queues import SimpleQueue, SentinelReady, Full
import threading
import weakref
@@ -195,6 +195,10 @@ def _queue_management_worker(executor_reference,
result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers.
"""
+ executor = None
+
+ def shutting_down():
+ return _shutdown or executor is None or executor._shutdown_thread
def shutdown_worker():
# This is an upper bound
@@ -202,8 +206,7 @@ def _queue_management_worker(executor_reference,
for i in range(0, nb_children_alive):
call_queue.put(None)
# If .join() is not called on the created processes then
- # some multiprocessing.Queue methods may deadlock on Mac OS
- # X.
+ # some multiprocessing.Queue methods may deadlock on Mac OS X.
for p in processes.values():
p.join()
@@ -222,7 +225,7 @@ def _queue_management_worker(executor_reference,
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
- del executor
+ executor = None
# All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
@@ -242,7 +245,11 @@ def _queue_management_worker(executor_reference,
if isinstance(result_item, int):
# Clean shutdown of a worker using its PID
# (avoids marking the executor broken)
+ assert shutting_down()
del processes[result_item]
+ if not processes:
+ shutdown_worker()
+ return
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
@@ -257,16 +264,21 @@ def _queue_management_worker(executor_reference,
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
- if _shutdown or executor is None or executor._shutdown_thread:
+ if shutting_down():
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
- if not pending_work_items:
+ if not pending_work_items and call_queue.qsize() == 0:
shutdown_worker()
return
- else:
+ try:
# Start shutting down by telling a process it can exit.
- call_queue.put(None)
- del executor
+ call_queue.put_nowait(None)
+ except Full:
+ # This is not a problem: we will eventually be woken up (in
+ # result_queue.get()) and be able to send a sentinel again,
+ # if necessary.
+ pass
+ executor = None
_system_limits_checked = False
_system_limited = None
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 5968980..fda6f5b 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -367,6 +367,13 @@ class ExecutorTest(unittest.TestCase):
self.assertEqual([None, None], results)
+ def test_shutdown_race_issue12456(self):
+ # Issue #12456: race condition at shutdown where trying to post a
+ # sentinel in the call queue blocks (the queue is full while processes
+ # have exited).
+ self.executor.map(str, [2] * (self.worker_count + 1))
+ self.executor.shutdown()
+
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
def test_map_submits_without_iteration(self):