summaryrefslogtreecommitdiffstats
path: root/Lib/concurrent/futures
diff options
context:
space:
mode:
authorKyle Stanley <aeros167@gmail.com>2020-04-19 14:00:59 (GMT)
committerGitHub <noreply@github.com>2020-04-19 14:00:59 (GMT)
commit1ac6e379297cc1cf8acf6c1b011fccc7b3da2cbe (patch)
tree89df3b0a4f0a8775039b6d4c3ea7a90624a72f65 /Lib/concurrent/futures
parentc12375aa0b838d34067efa3f1b9a1fbc632d0413 (diff)
downloadcpython-1ac6e379297cc1cf8acf6c1b011fccc7b3da2cbe.zip
cpython-1ac6e379297cc1cf8acf6c1b011fccc7b3da2cbe.tar.gz
cpython-1ac6e379297cc1cf8acf6c1b011fccc7b3da2cbe.tar.bz2
bpo-39207: Spawn workers on demand in ProcessPoolExecutor (GH-19453)
Roughly based on https://github.com/python/cpython/commit/904e34d4e6b6007986dcc585d5c553ee8ae06f95, but with a few substantial differences. /cc @pitrou @brianquinlan
Diffstat (limited to 'Lib/concurrent/futures')
-rw-r--r--Lib/concurrent/futures/process.py16
1 files changed, 14 insertions, 2 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 4c39500..36355ae 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -318,6 +318,12 @@ class _ExecutorManagerThread(threading.Thread):
# while waiting on new results.
del result_item
+ # attempt to increment idle process count
+ executor = self.executor_reference()
+ if executor is not None:
+ executor._idle_worker_semaphore.release()
+ del executor
+
if self.is_shutting_down():
self.flag_executor_shutting_down()
@@ -601,6 +607,7 @@ class ProcessPoolExecutor(_base.Executor):
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
+ self._idle_worker_semaphore = threading.Semaphore(0)
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
@@ -633,14 +640,18 @@ class ProcessPoolExecutor(_base.Executor):
def _start_executor_manager_thread(self):
if self._executor_manager_thread is None:
# Start the processes so that their sentinels are known.
- self._adjust_process_count()
self._executor_manager_thread = _ExecutorManagerThread(self)
self._executor_manager_thread.start()
_threads_wakeups[self._executor_manager_thread] = \
self._executor_manager_thread_wakeup
def _adjust_process_count(self):
- for _ in range(len(self._processes), self._max_workers):
+ # if there's an idle process, we don't need to spawn a new one.
+ if self._idle_worker_semaphore.acquire(blocking=False):
+ return
+
+ process_count = len(self._processes)
+ if process_count < self._max_workers:
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
@@ -669,6 +680,7 @@ class ProcessPoolExecutor(_base.Executor):
# Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup()
+ self._adjust_process_count()
self._start_executor_manager_thread()
return f
submit.__doc__ = _base.Executor.submit.__doc__