diff options
author | Kyle Stanley <aeros167@gmail.com> | 2020-04-19 14:00:59 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-19 14:00:59 (GMT) |
commit | 1ac6e379297cc1cf8acf6c1b011fccc7b3da2cbe (patch) | |
tree | 89df3b0a4f0a8775039b6d4c3ea7a90624a72f65 /Lib/concurrent/futures | |
parent | c12375aa0b838d34067efa3f1b9a1fbc632d0413 (diff) | |
download | cpython-1ac6e379297cc1cf8acf6c1b011fccc7b3da2cbe.zip cpython-1ac6e379297cc1cf8acf6c1b011fccc7b3da2cbe.tar.gz cpython-1ac6e379297cc1cf8acf6c1b011fccc7b3da2cbe.tar.bz2 |
bpo-39207: Spawn workers on demand in ProcessPoolExecutor (GH-19453)
Roughly based on https://github.com/python/cpython/commit/904e34d4e6b6007986dcc585d5c553ee8ae06f95, but with a few substantial differences.
/cc @pitrou @brianquinlan
Diffstat (limited to 'Lib/concurrent/futures')
-rw-r--r-- | Lib/concurrent/futures/process.py | 16 |
1 files changed, 14 insertions, 2 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 4c39500..36355ae 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -318,6 +318,12 @@ class _ExecutorManagerThread(threading.Thread): # while waiting on new results. del result_item + # attempt to increment idle process count + executor = self.executor_reference() + if executor is not None: + executor._idle_worker_semaphore.release() + del executor + if self.is_shutting_down(): self.flag_executor_shutting_down() @@ -601,6 +607,7 @@ class ProcessPoolExecutor(_base.Executor): # Shutdown is a two-step process. self._shutdown_thread = False self._shutdown_lock = threading.Lock() + self._idle_worker_semaphore = threading.Semaphore(0) self._broken = False self._queue_count = 0 self._pending_work_items = {} @@ -633,14 +640,18 @@ class ProcessPoolExecutor(_base.Executor): def _start_executor_manager_thread(self): if self._executor_manager_thread is None: # Start the processes so that their sentinels are known. - self._adjust_process_count() self._executor_manager_thread = _ExecutorManagerThread(self) self._executor_manager_thread.start() _threads_wakeups[self._executor_manager_thread] = \ self._executor_manager_thread_wakeup def _adjust_process_count(self): - for _ in range(len(self._processes), self._max_workers): + # if there's an idle process, we don't need to spawn a new one. + if self._idle_worker_semaphore.acquire(blocking=False): + return + + process_count = len(self._processes) + if process_count < self._max_workers: p = self._mp_context.Process( target=_process_worker, args=(self._call_queue, @@ -669,6 +680,7 @@ class ProcessPoolExecutor(_base.Executor): # Wake up queue management thread self._executor_manager_thread_wakeup.wakeup() + self._adjust_process_count() self._start_executor_manager_thread() return f submit.__doc__ = _base.Executor.submit.__doc__ |