summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCharles-François Natali <neologix@free.fr>2011-10-24 16:45:29 (GMT)
committerCharles-François Natali <neologix@free.fr>2011-10-24 16:45:29 (GMT)
commitf8859e1808eff603b727d1adbeb38f745c9fedb5 (patch)
tree4491834ef0c5d5be6283d61894940e321d33b18f
parentd6ca6c2b32118fec10d28b1a23d97f8e6b536d04 (diff)
downloadcpython-f8859e1808eff603b727d1adbeb38f745c9fedb5.zip
cpython-f8859e1808eff603b727d1adbeb38f745c9fedb5.tar.gz
cpython-f8859e1808eff603b727d1adbeb38f745c9fedb5.tar.bz2
Issue #10332: multiprocessing: fix a race condition when a Pool is closed
before all tasks have completed.
-rw-r--r--Lib/multiprocessing/pool.py6
-rw-r--r--Lib/test/test_multiprocessing.py14
-rw-r--r--Misc/NEWS3
3 files changed, 22 insertions, 1 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index e450319..04e7c44 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -321,7 +321,11 @@ class Pool(object):
@staticmethod
def _handle_workers(pool):
- while pool._worker_handler._state == RUN and pool._state == RUN:
+ thread = threading.current_thread()
+
+ # Keep maintaining workers until the cache gets drained, unless the pool
+ # is terminated.
+ while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
pool._maintain_pool()
time.sleep(0.1)
# send sentinel to stop workers
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index 6940d0e..45bf454 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1217,6 +1217,20 @@ class _TestPoolWorkerLifetime(BaseTestCase):
p.close()
p.join()
+ def test_pool_worker_lifetime_early_close(self):
+ # Issue #10332: closing a pool whose workers have limited lifetimes
+ # before all the tasks completed would make join() hang.
+ p = multiprocessing.Pool(3, maxtasksperchild=1)
+ results = []
+ for i in range(6):
+ results.append(p.apply_async(sqr, (i, 0.3)))
+ p.close()
+ p.join()
+ # check the results
+ for (j, res) in enumerate(results):
+ self.assertEqual(res.get(), sqr(j))
+
+
#
# Test that manager has expected number of shared objects left
#
diff --git a/Misc/NEWS b/Misc/NEWS
index 3909420..f7a741a 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -58,6 +58,9 @@ Core and Builtins
Library
-------
+- Issue #10332: multiprocessing: fix a race condition when a Pool is closed
+ before all tasks have completed.
+
- Issue #13255: wrong docstrings in array module.
- Issue #9168: now smtpd is able to bind privileged port.