diff options
author | Jesse Noller <jnoller@gmail.com> | 2010-01-27 03:36:01 (GMT) |
---|---|---|
committer | Jesse Noller <jnoller@gmail.com> | 2010-01-27 03:36:01 (GMT) |
commit | 1f0b6586387a3dae59cf28239effe14f1333229a (patch) | |
tree | 5113af47512d196c433d6bcb695985bcbc21a14f | |
parent | c3511a461de4e7370415b7de85eb193b6ee1548a (diff) | |
download | cpython-1f0b6586387a3dae59cf28239effe14f1333229a.zip cpython-1f0b6586387a3dae59cf28239effe14f1333229a.tar.gz cpython-1f0b6586387a3dae59cf28239effe14f1333229a.tar.bz2 |
Merged revisions 77794 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r77794 | jesse.noller | 2010-01-26 22:05:57 -0500 (Tue, 26 Jan 2010) | 1 line
Issue #6963: Added maxtasksperchild argument to multiprocessing.Pool
........
-rw-r--r-- | Doc/library/multiprocessing.rst | 17 | ||||
-rw-r--r-- | Lib/multiprocessing/__init__.py | 4 | ||||
-rw-r--r-- | Lib/multiprocessing/pool.py | 96 | ||||
-rw-r--r-- | Lib/test/test_multiprocessing.py | 26 | ||||
-rw-r--r-- | Misc/ACKS | 1 | ||||
-rw-r--r-- | Misc/NEWS | 5 |
6 files changed, 130 insertions, 19 deletions
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index a2cdb02..6d7e768 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -1535,7 +1535,7 @@ Process Pools One can create a pool of processes which will carry out tasks submitted to it with the :class:`Pool` class. -.. class:: multiprocessing.Pool([processes[, initializer[, initargs]]]) +.. class:: multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]]) A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and @@ -1546,6 +1546,21 @@ with the :class:`Pool` class. *initializer* is not ``None`` then each worker process will call ``initializer(*initargs)`` when it starts. + *maxtasksperchild* is the number of tasks a worker process can complete + before it will exit and be replaced with a fresh worker process, to enable + unused resources to be freed. The default *maxtasksperchild* is None, which + means worker processes will live as long as the pool. + + .. note:: + + Worker processes within a :class:`Pool` typically live for the complete + duration of the Pool's work queue. A frequent pattern found in other + systems (such as Apache, mod_wsgi, etc) to free resources held by + workers is to allow a worker within a pool to complete only a set + amount of work before being exiting, being cleaned up and a new + process spawned to replace the old one. The *maxtasksperchild* + argument to the :class:`Pool` exposes this ability to the end user. + .. method:: apply(func[, args[, kwds]]) Call *func* with arguments *args* and keyword arguments *kwds*. It blocks diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index 5a13742..e4af68b 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -218,12 +218,12 @@ def JoinableQueue(maxsize=0): from multiprocessing.queues import JoinableQueue return JoinableQueue(maxsize) -def Pool(processes=None, initializer=None, initargs=()): +def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None): ''' Returns a process pool object ''' from multiprocessing.pool import Pool - return Pool(processes, initializer, initargs) + return Pool(processes, initializer, initargs, maxtasksperchild) def RawValue(typecode_or_type, *args): ''' diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index d3ecc9b..6271b86 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -42,7 +42,8 @@ def mapstar(args): # Code run by worker processes # -def worker(inqueue, outqueue, initializer=None, initargs=()): +def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): + assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): @@ -52,7 +53,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=()): if initializer is not None: initializer(*initargs) - while 1: + completed = 0 + while maxtasks is None or (maxtasks and completed < maxtasks): try: task = get() except (EOFError, IOError): @@ -69,6 +71,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=()): except Exception as e: result = (False, e) put((job, i, result)) + completed += 1 + debug('worker exiting after %d tasks' % completed) # # Class representing a process pool @@ -80,11 +84,15 @@ class Pool(object): ''' Process = Process - def __init__(self, processes=None, initializer=None, initargs=()): + def __init__(self, processes=None, initializer=None, initargs=(), + maxtasksperchild=None): self._setup_queues() self._taskqueue = queue.Queue() self._cache = {} self._state = RUN + self._maxtasksperchild = maxtasksperchild + self._initializer = initializer + self._initargs = initargs if processes is None: try: @@ -95,16 +103,18 @@ class Pool(object): if initializer is not None and not hasattr(initializer, '__call__'): raise TypeError('initializer must be a callable') + self._processes = processes self._pool = [] - for i in range(processes): - w = self.Process( - target=worker, - args=(self._inqueue, self._outqueue, initializer, initargs) - ) - self._pool.append(w) - w.name = w.name.replace('Process', 'PoolWorker') - w.daemon = True - w.start() + self._repopulate_pool() + + self._worker_handler = threading.Thread( + target=Pool._handle_workers, + args=(self, ) + ) + self._worker_handler.daemon = True + self._worker_handler._state = RUN + self._worker_handler.start() + self._task_handler = threading.Thread( target=Pool._handle_tasks, @@ -125,10 +135,48 @@ class Pool(object): self._terminate = Finalize( self, self._terminate_pool, args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, - self._task_handler, self._result_handler, self._cache), + self._worker_handler, self._task_handler, + self._result_handler, self._cache), exitpriority=15 ) + def _join_exited_workers(self): + """Cleanup after any worker processes which have exited due to reaching + their specified lifetime. Returns True if any workers were cleaned up. + """ + cleaned = False + for i in reversed(range(len(self._pool))): + worker = self._pool[i] + if worker.exitcode is not None: + # worker exited + debug('cleaning up worker %d' % i) + worker.join() + cleaned = True + del self._pool[i] + return cleaned + + def _repopulate_pool(self): + """Bring the number of pool processes up to the specified number, + for use after reaping workers which have exited. + """ + for i in range(self._processes - len(self._pool)): + w = self.Process(target=worker, + args=(self._inqueue, self._outqueue, + self._initializer, + self._initargs, self._maxtasksperchild) + ) + self._pool.append(w) + w.name = w.name.replace('Process', 'PoolWorker') + w.daemon = True + w.start() + debug('added worker') + + def _maintain_pool(self): + """Clean up any exited workers and start replacements for them. + """ + if self._join_exited_workers(): + self._repopulate_pool() + def _setup_queues(self): from .queues import SimpleQueue self._inqueue = SimpleQueue() @@ -218,6 +266,13 @@ class Pool(object): return result @staticmethod + def _handle_workers(pool): + while pool._worker_handler._state == RUN and pool._state == RUN: + pool._maintain_pool() + time.sleep(0.1) + debug('worker handler exiting') + + @staticmethod def _handle_tasks(taskqueue, put, outqueue, pool): thread = threading.current_thread() @@ -332,16 +387,19 @@ class Pool(object): debug('closing pool') if self._state == RUN: self._state = CLOSE + self._worker_handler._state = CLOSE self._taskqueue.put(None) def terminate(self): debug('terminating pool') self._state = TERMINATE + self._worker_handler._state = TERMINATE self._terminate() def join(self): debug('joining pool') assert self._state in (CLOSE, TERMINATE) + self._worker_handler.join() self._task_handler.join() self._result_handler.join() for p in self._pool: @@ -358,10 +416,11 @@ class Pool(object): @classmethod def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, - task_handler, result_handler, cache): + worker_handler, task_handler, result_handler, cache): # this is guaranteed to only be called once debug('finalizing pool') + worker_handler._state = TERMINATE task_handler._state = TERMINATE taskqueue.put(None) # sentinel @@ -373,10 +432,12 @@ class Pool(object): result_handler._state = TERMINATE outqueue.put(None) # sentinel + # Terminate workers which haven't already finished. if pool and hasattr(pool[0], 'terminate'): debug('terminating workers') for p in pool: - p.terminate() + if p.exitcode is None: + p.terminate() debug('joining task handler') task_handler.join(1e100) @@ -388,6 +449,11 @@ class Pool(object): debug('joining pool workers') for p in pool: p.join() + for w in pool: + if w.exitcode is None: + # worker has not yet exited + debug('cleaning up worker %d' % w.pid) + w.join() # # Class whose instances are returned by `Pool.apply_async()` diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index b65fbf7..be923bd 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -46,7 +46,7 @@ def latin(s): # LOG_LEVEL = util.SUBWARNING -#LOG_LEVEL = logging.WARNING +#LOG_LEVEL = logging.DEBUG DELTA = 0.1 CHECK_TIMINGS = False # making true makes tests take a lot longer @@ -1053,6 +1053,30 @@ class _TestPool(BaseTestCase): join = TimingWrapper(self.pool.join) join() self.assertTrue(join.elapsed < 0.2) + +class _TestPoolWorkerLifetime(BaseTestCase): + + ALLOWED_TYPES = ('processes', ) + def test_pool_worker_lifetime(self): + p = multiprocessing.Pool(3, maxtasksperchild=10) + self.assertEqual(3, len(p._pool)) + origworkerpids = [w.pid for w in p._pool] + # Run many tasks so each worker gets replaced (hopefully) + results = [] + for i in range(100): + results.append(p.apply_async(sqr, (i, ))) + # Fetch the results and verify we got the right answers, + # also ensuring all the tasks have completed. + for (j, res) in enumerate(results): + self.assertEqual(res.get(), sqr(j)) + # Refill the pool + p._repopulate_pool() + # Finally, check that the worker pids have changed + finalworkerpids = [w.pid for w in p._pool] + self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) + p.close() + p.join() + # # Test that manager has expected number of shared objects left # @@ -120,6 +120,7 @@ Brett Cannon Mike Carlton Terry Carroll Donn Cave +Charles Cazabon Per Cederqvist Octavian Cerna Hye-Shik Chang @@ -234,6 +234,11 @@ C-API Library ------- +- Issue #6963: Added "maxtasksperchild" argument to multiprocessing.Pool, + allowing for a maximum number of tasks within the pool to be completed by + the worker before that worker is terminated, and a new one created to + replace it. + - Issue #7792: Registering non-classes to ABCs raised an obscure error. - Issue #7785: Don't accept bytes in FileIO.write(). |