diff options
author | Pablo Galindo <Pablogsal@gmail.com> | 2019-03-16 22:34:24 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-16 22:34:24 (GMT) |
commit | 7c994549dcffd0d9d3bb37475e6374f356e7240e (patch) | |
tree | 7b59c744c1900c05e920b1eeca1526d9fa886f87 /Lib/multiprocessing | |
parent | 962bdeab191ee64459caa199209331005797ea7a (diff) | |
download | cpython-7c994549dcffd0d9d3bb37475e6374f356e7240e.zip cpython-7c994549dcffd0d9d3bb37475e6374f356e7240e.tar.gz cpython-7c994549dcffd0d9d3bb37475e6374f356e7240e.tar.bz2 |
bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool (#11488)
* bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool
* Use self-pipe pattern to avoid polling for changes
* Refactor some variable names and add comments
* Restore timeout and poll
* Use reader object only on wait()
* Recompute worker sentinels every time
* Remove timeout and use change notifier
* Refactor some methods to be overloaded by the ThreadPool, document the cache class and fix typos
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/pool.py | 88 |
1 files changed, 77 insertions, 11 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 18a56f8..665ca06 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -21,11 +21,13 @@ import threading import time import traceback import warnings +from queue import Empty # If threading is available then ThreadPool should be provided. Therefore # we avoid top-level imports which are liable to fail on some systems. from . import util from . import get_context, TimeoutError +from .connection import wait # # Constants representing the state of a pool @@ -145,6 +147,29 @@ def _helper_reraises_exception(ex): # Class representing a process pool # +class _PoolCache(dict): + """ + Class that implements a cache for the Pool class that will notify + the pool management threads every time the cache is emptied. The + notification is done by the use of a queue that is provided when + instantiating the cache. + """ + def __init__(self, *args, notifier=None, **kwds): + self.notifier = notifier + super().__init__(*args, **kwds) + + def __delitem__(self, item): + super().__delitem__(item) + + # Notify that the cache is empty. This is important because the + # pool keeps maintaining workers until the cache gets drained. This + # eliminates a race condition in which a task is finished after the + # the pool's _handle_workers method has enter another iteration of the + # loop. In this situation, the only event that can wake up the pool + # is the cache to be emptied (no more tasks available). + if not self: + self.notifier.put(None) + class Pool(object): ''' Class which supports an async version of applying functions to arguments. @@ -165,7 +190,11 @@ class Pool(object): self._ctx = context or get_context() self._setup_queues() self._taskqueue = queue.SimpleQueue() - self._cache = {} + # The _change_notifier queue exist to wake up self._handle_workers() + # when the cache (self._cache) is empty or when there is a change in + # the _state variable of the thread that runs _handle_workers. + self._change_notifier = self._ctx.SimpleQueue() + self._cache = _PoolCache(notifier=self._change_notifier) self._maxtasksperchild = maxtasksperchild self._initializer = initializer self._initargs = initargs @@ -189,12 +218,14 @@ class Pool(object): p.join() raise + sentinels = self._get_sentinels() + self._worker_handler = threading.Thread( target=Pool._handle_workers, args=(self._cache, self._taskqueue, self._ctx, self.Process, self._processes, self._pool, self._inqueue, self._outqueue, self._initializer, self._initargs, self._maxtasksperchild, - self._wrap_exception) + self._wrap_exception, sentinels, self._change_notifier) ) self._worker_handler.daemon = True self._worker_handler._state = RUN @@ -221,7 +252,7 @@ class Pool(object): self._terminate = util.Finalize( self, self._terminate_pool, args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, - self._worker_handler, self._task_handler, + self._change_notifier, self._worker_handler, self._task_handler, self._result_handler, self._cache), exitpriority=15 ) @@ -233,6 +264,8 @@ class Pool(object): if self._state == RUN: _warn(f"unclosed running multiprocessing pool {self!r}", ResourceWarning, source=self) + if getattr(self, '_change_notifier', None) is not None: + self._change_notifier.put(None) def __repr__(self): cls = self.__class__ @@ -240,6 +273,16 @@ class Pool(object): f'state={self._state} ' f'pool_size={len(self._pool)}>') + def _get_sentinels(self): + task_queue_sentinels = [self._outqueue._reader] + self_notifier_sentinels = [self._change_notifier._reader] + return [*task_queue_sentinels, *self_notifier_sentinels] + + @staticmethod + def _get_worker_sentinels(workers): + return [worker.sentinel for worker in + workers if hasattr(worker, "sentinel")] + @staticmethod def _join_exited_workers(pool): """Cleanup after any worker processes which have exited due to reaching @@ -452,18 +495,28 @@ class Pool(object): return result @staticmethod - def _handle_workers(cache, taskqueue, ctx, Process, processes, pool, - inqueue, outqueue, initializer, initargs, - maxtasksperchild, wrap_exception): + def _wait_for_updates(sentinels, change_notifier, timeout=None): + wait(sentinels, timeout=timeout) + while not change_notifier.empty(): + change_notifier.get() + + @classmethod + def _handle_workers(cls, cache, taskqueue, ctx, Process, processes, + pool, inqueue, outqueue, initializer, initargs, + maxtasksperchild, wrap_exception, sentinels, + change_notifier): thread = threading.current_thread() # Keep maintaining workers until the cache gets drained, unless the pool # is terminated. while thread._state == RUN or (cache and thread._state != TERMINATE): - Pool._maintain_pool(ctx, Process, processes, pool, inqueue, - outqueue, initializer, initargs, - maxtasksperchild, wrap_exception) - time.sleep(0.1) + cls._maintain_pool(ctx, Process, processes, pool, inqueue, + outqueue, initializer, initargs, + maxtasksperchild, wrap_exception) + + current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels] + + cls._wait_for_updates(current_sentinels, change_notifier) # send sentinel to stop workers taskqueue.put(None) util.debug('worker handler exiting') @@ -593,11 +646,13 @@ class Pool(object): if self._state == RUN: self._state = CLOSE self._worker_handler._state = CLOSE + self._change_notifier.put(None) def terminate(self): util.debug('terminating pool') self._state = TERMINATE self._worker_handler._state = TERMINATE + self._change_notifier.put(None) self._terminate() def join(self): @@ -622,7 +677,7 @@ class Pool(object): time.sleep(0) @classmethod - def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, + def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, worker_handler, task_handler, result_handler, cache): # this is guaranteed to only be called once util.debug('finalizing pool') @@ -638,6 +693,7 @@ class Pool(object): "Cannot have cache with result_hander not alive") result_handler._state = TERMINATE + change_notifier.put(None) outqueue.put(None) # sentinel # We must wait for the worker handler to exit before terminating @@ -871,6 +927,13 @@ class ThreadPool(Pool): self._quick_put = self._inqueue.put self._quick_get = self._outqueue.get + def _get_sentinels(self): + return [self._change_notifier._reader] + + @staticmethod + def _get_worker_sentinels(workers): + return [] + @staticmethod def _help_stuff_finish(inqueue, task_handler, size): # drain inqueue, and put sentinels at its head to make workers finish @@ -881,3 +944,6 @@ class ThreadPool(Pool): pass for i in range(size): inqueue.put(None) + + def _wait_for_updates(self, sentinels, change_notifier, timeout): + time.sleep(timeout) |