From 358fc87f53cf97a1768d5b1ded08f2a564f9fd85 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 6 Dec 2018 01:49:41 +0100 Subject: Revert "[2.7] bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-9686)" (GH-10970) This reverts commit 4a7dd30f5810e8861a3834159a222ab32d5c97d0. --- Lib/multiprocessing/pool.py | 56 ++++++++-------------- Lib/test/test_multiprocessing.py | 7 --- .../2018-07-26-10-31-52.bpo-34172.8ovLNi.rst | 1 - 3 files changed, 19 insertions(+), 45 deletions(-) delete mode 100644 Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 489c7d6..a47cd0f 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -162,9 +162,7 @@ class Pool(object): self._worker_handler = threading.Thread( target=Pool._handle_workers, - args=(self._cache, self._processes, self._pool, self.Process, - self._inqueue, self._outqueue, self._initializer, - self._initargs, self._maxtasksperchild, self._taskqueue) + args=(self, ) ) self._worker_handler.daemon = True self._worker_handler._state = RUN @@ -196,56 +194,42 @@ class Pool(object): exitpriority=15 ) - @staticmethod - def _join_exited_workers(pool): + def _join_exited_workers(self): """Cleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. """ cleaned = False - for i in reversed(range(len(pool))): - worker = pool[i] + for i in reversed(range(len(self._pool))): + worker = self._pool[i] if worker.exitcode is not None: # worker exited debug('cleaning up worker %d' % i) worker.join() cleaned = True - del pool[i] + del self._pool[i] return cleaned def _repopulate_pool(self): - return self._repopulate_pool_static(self._processes, self._pool, - self.Process, self._inqueue, - self._outqueue, self._initializer, - self._initargs, - self._maxtasksperchild) - - @staticmethod - def _repopulate_pool_static(processes, pool, Process, inqueue, outqueue, - initializer, initargs, maxtasksperchild): """Bring the number of pool processes up to the specified number, for use after reaping workers which have exited. """ - for i in range(processes - len(pool)): - w = Process(target=worker, - args=(inqueue, outqueue, - initializer, - initargs, maxtasksperchild) - ) - pool.append(w) + for i in range(self._processes - len(self._pool)): + w = self.Process(target=worker, + args=(self._inqueue, self._outqueue, + self._initializer, + self._initargs, self._maxtasksperchild) + ) + self._pool.append(w) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start() debug('added worker') - @staticmethod - def _maintain_pool(processes, pool, Process, inqueue, outqueue, - initializer, initargs, maxtasksperchild): + def _maintain_pool(self): """Clean up any exited workers and start replacements for them. """ - if Pool._join_exited_workers(pool): - Pool._repopulate_pool_static(processes, pool, Process, inqueue, - outqueue, initializer, initargs, - maxtasksperchild) + if self._join_exited_workers(): + self._repopulate_pool() def _setup_queues(self): from .queues import SimpleQueue @@ -335,18 +319,16 @@ class Pool(object): return result @staticmethod - def _handle_workers(cache, processes, pool, Process, inqueue, outqueue, - initializer, initargs, maxtasksperchild, taskqueue): + def _handle_workers(pool): thread = threading.current_thread() # Keep maintaining workers until the cache gets drained, unless the pool # is terminated. - while thread._state == RUN or (cache and thread._state != TERMINATE): - Pool._maintain_pool(processes, pool, Process, inqueue, outqueue, - initializer, initargs, maxtasksperchild) + while thread._state == RUN or (pool._cache and thread._state != TERMINATE): + pool._maintain_pool() time.sleep(0.1) # send sentinel to stop workers - taskqueue.put(None) + pool._taskqueue.put(None) debug('worker handler exiting') @staticmethod diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index d319218..ff299fe 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -1359,13 +1359,6 @@ class _TestPool(BaseTestCase): # they were released too. self.assertEqual(CountedObject.n_instances, 0) - def test_del_pool(self): - p = self.Pool(1) - wr = weakref.ref(p) - del p - gc.collect() - self.assertIsNone(wr()) - def unpickleable_result(): return lambda: 42 diff --git a/Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst b/Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst deleted file mode 100644 index d1c5a77..0000000 --- a/Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst +++ /dev/null @@ -1 +0,0 @@ -Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly. -- cgit v0.12