diff options
author | Charles-François Natali <neologix@free.fr> | 2011-10-24 16:45:29 (GMT) |
---|---|---|
committer | Charles-François Natali <neologix@free.fr> | 2011-10-24 16:45:29 (GMT) |
commit | f8859e1808eff603b727d1adbeb38f745c9fedb5 (patch) | |
tree | 4491834ef0c5d5be6283d61894940e321d33b18f /Lib | |
parent | d6ca6c2b32118fec10d28b1a23d97f8e6b536d04 (diff) | |
download | cpython-f8859e1808eff603b727d1adbeb38f745c9fedb5.zip cpython-f8859e1808eff603b727d1adbeb38f745c9fedb5.tar.gz cpython-f8859e1808eff603b727d1adbeb38f745c9fedb5.tar.bz2 |
Issue #10332: multiprocessing: fix a race condition when a Pool is closed
before all tasks have completed.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/multiprocessing/pool.py | 6 | ||||
-rw-r--r-- | Lib/test/test_multiprocessing.py | 14 |
2 files changed, 19 insertions, 1 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index e450319..04e7c44 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -321,7 +321,11 @@ class Pool(object): @staticmethod def _handle_workers(pool): - while pool._worker_handler._state == RUN and pool._state == RUN: + thread = threading.current_thread() + + # Keep maintaining workers until the cache gets drained, unless the pool + # is terminated. + while thread._state == RUN or (pool._cache and thread._state != TERMINATE): pool._maintain_pool() time.sleep(0.1) # send sentinel to stop workers diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index 6940d0e..45bf454 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -1217,6 +1217,20 @@ class _TestPoolWorkerLifetime(BaseTestCase): p.close() p.join() + def test_pool_worker_lifetime_early_close(self): + # Issue #10332: closing a pool whose workers have limited lifetimes + # before all the tasks completed would make join() hang. + p = multiprocessing.Pool(3, maxtasksperchild=1) + results = [] + for i in range(6): + results.append(p.apply_async(sqr, (i, 0.3))) + p.close() + p.join() + # check the results + for (j, res) in enumerate(results): + self.assertEqual(res.get(), sqr(j)) + + # # Test that manager has expected number of shared objects left # |