summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/pool.py
diff options
context:
space:
mode:
authortzickel <tzickel@users.noreply.github.com>2018-10-02 21:01:23 (GMT)
committerAntoine Pitrou <pitrou@free.fr>2018-10-02 21:01:23 (GMT)
commit97bfe8d3ebb0a54c8798f57555cb4152f9b2e1d0 (patch)
treeb97ded93b7eece4244ce9719f75469f3410083f2 /Lib/multiprocessing/pool.py
parent9012a0fb4c4ec1afef9efb9fdb0964554ea17983 (diff)
downloadcpython-97bfe8d3ebb0a54c8798f57555cb4152f9b2e1d0.zip
cpython-97bfe8d3ebb0a54c8798f57555cb4152f9b2e1d0.tar.gz
cpython-97bfe8d3ebb0a54c8798f57555cb4152f9b2e1d0.tar.bz2
bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-8450)
Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.
Diffstat (limited to 'Lib/multiprocessing/pool.py')
-rw-r--r--Lib/multiprocessing/pool.py74
1 files changed, 50 insertions, 24 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 3e9a0d6..574b5db 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -149,8 +149,9 @@ class Pool(object):
'''
_wrap_exception = True
- def Process(self, *args, **kwds):
- return self._ctx.Process(*args, **kwds)
+ @staticmethod
+ def Process(ctx, *args, **kwds):
+ return ctx.Process(*args, **kwds)
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None):
@@ -177,13 +178,15 @@ class Pool(object):
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
- args=(self, )
+ args=(self._cache, self._taskqueue, self._ctx, self.Process,
+ self._processes, self._pool, self._inqueue, self._outqueue,
+ self._initializer, self._initargs, self._maxtasksperchild,
+ self._wrap_exception)
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
self._worker_handler.start()
-
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue,
@@ -209,43 +212,62 @@ class Pool(object):
exitpriority=15
)
- def _join_exited_workers(self):
+ @staticmethod
+ def _join_exited_workers(pool):
"""Cleanup after any worker processes which have exited due to reaching
their specified lifetime. Returns True if any workers were cleaned up.
"""
cleaned = False
- for i in reversed(range(len(self._pool))):
- worker = self._pool[i]
+ for i in reversed(range(len(pool))):
+ worker = pool[i]
if worker.exitcode is not None:
# worker exited
util.debug('cleaning up worker %d' % i)
worker.join()
cleaned = True
- del self._pool[i]
+ del pool[i]
return cleaned
def _repopulate_pool(self):
+ return self._repopulate_pool_static(self._ctx, self.Process,
+ self._processes,
+ self._pool, self._inqueue,
+ self._outqueue, self._initializer,
+ self._initargs,
+ self._maxtasksperchild,
+ self._wrap_exception)
+
+ @staticmethod
+ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
+ outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception):
"""Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited.
"""
- for i in range(self._processes - len(self._pool)):
- w = self.Process(target=worker,
- args=(self._inqueue, self._outqueue,
- self._initializer,
- self._initargs, self._maxtasksperchild,
- self._wrap_exception)
- )
- self._pool.append(w)
+ for i in range(processes - len(pool)):
+ w = Process(ctx, target=worker,
+ args=(inqueue, outqueue,
+ initializer,
+ initargs, maxtasksperchild,
+ wrap_exception)
+ )
+ pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
util.debug('added worker')
- def _maintain_pool(self):
+ @staticmethod
+ def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
+ initializer, initargs, maxtasksperchild,
+ wrap_exception):
"""Clean up any exited workers and start replacements for them.
"""
- if self._join_exited_workers():
- self._repopulate_pool()
+ if Pool._join_exited_workers(pool):
+ Pool._repopulate_pool_static(ctx, Process, processes, pool,
+ inqueue, outqueue, initializer,
+ initargs, maxtasksperchild,
+ wrap_exception)
def _setup_queues(self):
self._inqueue = self._ctx.SimpleQueue()
@@ -403,16 +425,20 @@ class Pool(object):
return result
@staticmethod
- def _handle_workers(pool):
+ def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
+ inqueue, outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception):
thread = threading.current_thread()
# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
- while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
- pool._maintain_pool()
+ while thread._state == RUN or (cache and thread._state != TERMINATE):
+ Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
+ outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception)
time.sleep(0.1)
# send sentinel to stop workers
- pool._taskqueue.put(None)
+ taskqueue.put(None)
util.debug('worker handler exiting')
@staticmethod
@@ -794,7 +820,7 @@ class ThreadPool(Pool):
_wrap_exception = False
@staticmethod
- def Process(*args, **kwds):
+ def Process(ctx, *args, **kwds):
from .dummy import Process
return Process(*args, **kwds)