From 654ade3e6afa2527d1f642be28d69e219bd58b71 Mon Sep 17 00:00:00 2001 From: Jesse Noller Date: Wed, 27 Jan 2010 03:05:57 +0000 Subject: Issue #6963: Added maxtasksperchild argument to multiprocessing.Pool --- Doc/library/multiprocessing.rst | 17 ++++++- Lib/multiprocessing/__init__.py | 4 +- Lib/multiprocessing/pool.py | 96 +++++++++++++++++++++++++++++++++------- Lib/test/test_multiprocessing.py | 26 ++++++++++- Misc/ACKS | 1 + Misc/NEWS | 5 +++ 6 files changed, 130 insertions(+), 19 deletions(-) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 0782850..7404145 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -1537,7 +1537,7 @@ Process Pools One can create a pool of processes which will carry out tasks submitted to it with the :class:`Pool` class. -.. class:: multiprocessing.Pool([processes[, initializer[, initargs]]]) +.. class:: multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]]) A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and @@ -1548,6 +1548,21 @@ with the :class:`Pool` class. *initializer* is not ``None`` then each worker process will call ``initializer(*initargs)`` when it starts. + *maxtasksperchild* is the number of tasks a worker process can complete + before it will exit and be replaced with a fresh worker process, to enable + unused resources to be freed. The default *maxtasksperchild* is None, which + means worker processes will live as long as the pool. + + .. note:: + + Worker processes within a :class:`Pool` typically live for the complete + duration of the Pool's work queue. A frequent pattern found in other + systems (such as Apache, mod_wsgi, etc) to free resources held by + workers is to allow a worker within a pool to complete only a set + amount of work before being exiting, being cleaned up and a new + process spawned to replace the old one. The *maxtasksperchild* + argument to the :class:`Pool` exposes this ability to the end user. + .. method:: apply(func[, args[, kwds]]) Equivalent of the :func:`apply` built-in function. It blocks till the diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index 4fb0bd0..0031a5e 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -219,12 +219,12 @@ def JoinableQueue(maxsize=0): from multiprocessing.queues import JoinableQueue return JoinableQueue(maxsize) -def Pool(processes=None, initializer=None, initargs=()): +def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None): ''' Returns a process pool object ''' from multiprocessing.pool import Pool - return Pool(processes, initializer, initargs) + return Pool(processes, initializer, initargs, maxtasksperchild) def RawValue(typecode_or_type, *args): ''' diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index b91b77d..e2b670d 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -42,7 +42,8 @@ def mapstar(args): # Code run by worker processes # -def worker(inqueue, outqueue, initializer=None, initargs=()): +def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): + assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): @@ -52,7 +53,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=()): if initializer is not None: initializer(*initargs) - while 1: + completed = 0 + while maxtasks is None or (maxtasks and completed < maxtasks): try: task = get() except (EOFError, IOError): @@ -69,6 +71,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=()): except Exception, e: result = (False, e) put((job, i, result)) + completed += 1 + debug('worker exiting after %d tasks' % completed) # # Class representing a process pool @@ -80,11 +84,15 @@ class Pool(object): ''' Process = Process - def __init__(self, processes=None, initializer=None, initargs=()): + def __init__(self, processes=None, initializer=None, initargs=(), + maxtasksperchild=None): self._setup_queues() self._taskqueue = Queue.Queue() self._cache = {} self._state = RUN + self._maxtasksperchild = maxtasksperchild + self._initializer = initializer + self._initargs = initargs if processes is None: try: @@ -95,16 +103,18 @@ class Pool(object): if initializer is not None and not hasattr(initializer, '__call__'): raise TypeError('initializer must be a callable') + self._processes = processes self._pool = [] - for i in range(processes): - w = self.Process( - target=worker, - args=(self._inqueue, self._outqueue, initializer, initargs) - ) - self._pool.append(w) - w.name = w.name.replace('Process', 'PoolWorker') - w.daemon = True - w.start() + self._repopulate_pool() + + self._worker_handler = threading.Thread( + target=Pool._handle_workers, + args=(self, ) + ) + self._worker_handler.daemon = True + self._worker_handler._state = RUN + self._worker_handler.start() + self._task_handler = threading.Thread( target=Pool._handle_tasks, @@ -125,10 +135,48 @@ class Pool(object): self._terminate = Finalize( self, self._terminate_pool, args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, - self._task_handler, self._result_handler, self._cache), + self._worker_handler, self._task_handler, + self._result_handler, self._cache), exitpriority=15 ) + def _join_exited_workers(self): + """Cleanup after any worker processes which have exited due to reaching + their specified lifetime. Returns True if any workers were cleaned up. + """ + cleaned = False + for i in reversed(range(len(self._pool))): + worker = self._pool[i] + if worker.exitcode is not None: + # worker exited + debug('cleaning up worker %d' % i) + worker.join() + cleaned = True + del self._pool[i] + return cleaned + + def _repopulate_pool(self): + """Bring the number of pool processes up to the specified number, + for use after reaping workers which have exited. + """ + for i in range(self._processes - len(self._pool)): + w = self.Process(target=worker, + args=(self._inqueue, self._outqueue, + self._initializer, + self._initargs, self._maxtasksperchild) + ) + self._pool.append(w) + w.name = w.name.replace('Process', 'PoolWorker') + w.daemon = True + w.start() + debug('added worker') + + def _maintain_pool(self): + """Clean up any exited workers and start replacements for them. + """ + if self._join_exited_workers(): + self._repopulate_pool() + def _setup_queues(self): from .queues import SimpleQueue self._inqueue = SimpleQueue() @@ -217,6 +265,13 @@ class Pool(object): return result @staticmethod + def _handle_workers(pool): + while pool._worker_handler._state == RUN and pool._state == RUN: + pool._maintain_pool() + time.sleep(0.1) + debug('worker handler exiting') + + @staticmethod def _handle_tasks(taskqueue, put, outqueue, pool): thread = threading.current_thread() @@ -331,16 +386,19 @@ class Pool(object): debug('closing pool') if self._state == RUN: self._state = CLOSE + self._worker_handler._state = CLOSE self._taskqueue.put(None) def terminate(self): debug('terminating pool') self._state = TERMINATE + self._worker_handler._state = TERMINATE self._terminate() def join(self): debug('joining pool') assert self._state in (CLOSE, TERMINATE) + self._worker_handler.join() self._task_handler.join() self._result_handler.join() for p in self._pool: @@ -357,10 +415,11 @@ class Pool(object): @classmethod def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, - task_handler, result_handler, cache): + worker_handler, task_handler, result_handler, cache): # this is guaranteed to only be called once debug('finalizing pool') + worker_handler._state = TERMINATE task_handler._state = TERMINATE taskqueue.put(None) # sentinel @@ -372,10 +431,12 @@ class Pool(object): result_handler._state = TERMINATE outqueue.put(None) # sentinel + # Terminate workers which haven't already finished. if pool and hasattr(pool[0], 'terminate'): debug('terminating workers') for p in pool: - p.terminate() + if p.exitcode is None: + p.terminate() debug('joining task handler') task_handler.join(1e100) @@ -387,6 +448,11 @@ class Pool(object): debug('joining pool workers') for p in pool: p.join() + for w in pool: + if w.exitcode is None: + # worker has not yet exited + debug('cleaning up worker %d' % w.pid) + w.join() # # Class whose instances are returned by `Pool.apply_async()` diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index ae3b721..1f80c78 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -45,7 +45,7 @@ latin = str # LOG_LEVEL = util.SUBWARNING -#LOG_LEVEL = logging.WARNING +#LOG_LEVEL = logging.DEBUG DELTA = 0.1 CHECK_TIMINGS = False # making true makes tests take a lot longer @@ -1052,6 +1052,30 @@ class _TestPool(BaseTestCase): join = TimingWrapper(self.pool.join) join() self.assertTrue(join.elapsed < 0.2) + +class _TestPoolWorkerLifetime(BaseTestCase): + + ALLOWED_TYPES = ('processes', ) + def test_pool_worker_lifetime(self): + p = multiprocessing.Pool(3, maxtasksperchild=10) + self.assertEqual(3, len(p._pool)) + origworkerpids = [w.pid for w in p._pool] + # Run many tasks so each worker gets replaced (hopefully) + results = [] + for i in range(100): + results.append(p.apply_async(sqr, (i, ))) + # Fetch the results and verify we got the right answers, + # also ensuring all the tasks have completed. + for (j, res) in enumerate(results): + self.assertEqual(res.get(), sqr(j)) + # Refill the pool + p._repopulate_pool() + # Finally, check that the worker pids have changed + finalworkerpids = [w.pid for w in p._pool] + self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) + p.close() + p.join() + # # Test that manager has expected number of shared objects left # diff --git a/Misc/ACKS b/Misc/ACKS index 7c98382..877c326 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -121,6 +121,7 @@ Brett Cannon Mike Carlton Terry Carroll Donn Cave +Charles Cazabon Per Cederqvist Octavian Cerna Hye-Shik Chang diff --git a/Misc/NEWS b/Misc/NEWS index ac1295a..e560204 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -173,6 +173,11 @@ Core and Builtins Library ------- +- Issue #6963: Added "maxtasksperchild" argument to multiprocessing.Pool, + allowing for a maximum number of tasks within the pool to be completed by + the worker before that worker is terminated, and a new one created to + replace it. + - Issue #7617: Make sure distutils.unixccompiler.UnixCCompiler recognizes gcc when it has a fully qualified configuration prefix. Initial patch by Arfrever. -- cgit v0.12