diff options
author | tzickel <tzickel@users.noreply.github.com> | 2018-10-02 21:01:23 (GMT) |
---|---|---|
committer | Antoine Pitrou <pitrou@free.fr> | 2018-10-02 21:01:23 (GMT) |
commit | 97bfe8d3ebb0a54c8798f57555cb4152f9b2e1d0 (patch) | |
tree | b97ded93b7eece4244ce9719f75469f3410083f2 /Lib/multiprocessing/pool.py | |
parent | 9012a0fb4c4ec1afef9efb9fdb0964554ea17983 (diff) | |
download | cpython-97bfe8d3ebb0a54c8798f57555cb4152f9b2e1d0.zip cpython-97bfe8d3ebb0a54c8798f57555cb4152f9b2e1d0.tar.gz cpython-97bfe8d3ebb0a54c8798f57555cb4152f9b2e1d0.tar.bz2 |
bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-8450)
Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.
Diffstat (limited to 'Lib/multiprocessing/pool.py')
-rw-r--r-- | Lib/multiprocessing/pool.py | 74 |
1 files changed, 50 insertions, 24 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 3e9a0d6..574b5db 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -149,8 +149,9 @@ class Pool(object): ''' _wrap_exception = True - def Process(self, *args, **kwds): - return self._ctx.Process(*args, **kwds) + @staticmethod + def Process(ctx, *args, **kwds): + return ctx.Process(*args, **kwds) def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None): @@ -177,13 +178,15 @@ class Pool(object): self._worker_handler = threading.Thread( target=Pool._handle_workers, - args=(self, ) + args=(self._cache, self._taskqueue, self._ctx, self.Process, + self._processes, self._pool, self._inqueue, self._outqueue, + self._initializer, self._initargs, self._maxtasksperchild, + self._wrap_exception) ) self._worker_handler.daemon = True self._worker_handler._state = RUN self._worker_handler.start() - self._task_handler = threading.Thread( target=Pool._handle_tasks, args=(self._taskqueue, self._quick_put, self._outqueue, @@ -209,43 +212,62 @@ class Pool(object): exitpriority=15 ) - def _join_exited_workers(self): + @staticmethod + def _join_exited_workers(pool): """Cleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. """ cleaned = False - for i in reversed(range(len(self._pool))): - worker = self._pool[i] + for i in reversed(range(len(pool))): + worker = pool[i] if worker.exitcode is not None: # worker exited util.debug('cleaning up worker %d' % i) worker.join() cleaned = True - del self._pool[i] + del pool[i] return cleaned def _repopulate_pool(self): + return self._repopulate_pool_static(self._ctx, self.Process, + self._processes, + self._pool, self._inqueue, + self._outqueue, self._initializer, + self._initargs, + self._maxtasksperchild, + self._wrap_exception) + + @staticmethod + def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, + outqueue, initializer, initargs, + maxtasksperchild, wrap_exception): """Bring the number of pool processes up to the specified number, for use after reaping workers which have exited. """ - for i in range(self._processes - len(self._pool)): - w = self.Process(target=worker, - args=(self._inqueue, self._outqueue, - self._initializer, - self._initargs, self._maxtasksperchild, - self._wrap_exception) - ) - self._pool.append(w) + for i in range(processes - len(pool)): + w = Process(ctx, target=worker, + args=(inqueue, outqueue, + initializer, + initargs, maxtasksperchild, + wrap_exception) + ) + pool.append(w) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start() util.debug('added worker') - def _maintain_pool(self): + @staticmethod + def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, + initializer, initargs, maxtasksperchild, + wrap_exception): """Clean up any exited workers and start replacements for them. """ - if self._join_exited_workers(): - self._repopulate_pool() + if Pool._join_exited_workers(pool): + Pool._repopulate_pool_static(ctx, Process, processes, pool, + inqueue, outqueue, initializer, + initargs, maxtasksperchild, + wrap_exception) def _setup_queues(self): self._inqueue = self._ctx.SimpleQueue() @@ -403,16 +425,20 @@ class Pool(object): return result @staticmethod - def _handle_workers(pool): + def _handle_workers(cache, taskqueue, ctx, Process, processes, pool, + inqueue, outqueue, initializer, initargs, + maxtasksperchild, wrap_exception): thread = threading.current_thread() # Keep maintaining workers until the cache gets drained, unless the pool # is terminated. - while thread._state == RUN or (pool._cache and thread._state != TERMINATE): - pool._maintain_pool() + while thread._state == RUN or (cache and thread._state != TERMINATE): + Pool._maintain_pool(ctx, Process, processes, pool, inqueue, + outqueue, initializer, initargs, + maxtasksperchild, wrap_exception) time.sleep(0.1) # send sentinel to stop workers - pool._taskqueue.put(None) + taskqueue.put(None) util.debug('worker handler exiting') @staticmethod @@ -794,7 +820,7 @@ class ThreadPool(Pool): _wrap_exception = False @staticmethod - def Process(*args, **kwds): + def Process(ctx, *args, **kwds): from .dummy import Process return Process(*args, **kwds) |