summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/concurrent/futures/thread.py15
-rw-r--r--Lib/test/test_concurrent_futures.py32
-rw-r--r--Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst1
3 files changed, 43 insertions, 5 deletions
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index 2af31a1..ad6b4c2 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -80,7 +80,14 @@ def _worker(executor_reference, work_queue, initializer, initargs):
work_item.run()
# Delete references to object. See issue16284
del work_item
+
+ # attempt to increment idle count
+ executor = executor_reference()
+ if executor is not None:
+ executor._idle_semaphore.release()
+ del executor
continue
+
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
@@ -133,6 +140,7 @@ class ThreadPoolExecutor(_base.Executor):
self._max_workers = max_workers
self._work_queue = queue.SimpleQueue()
+ self._idle_semaphore = threading.Semaphore(0)
self._threads = set()
self._broken = False
self._shutdown = False
@@ -178,12 +186,15 @@ class ThreadPoolExecutor(_base.Executor):
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
+ # if idle threads are available, don't spin new threads
+ if self._idle_semaphore.acquire(timeout=0):
+ return
+
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
- # TODO(bquinlan): Should avoid creating new threads if there are more
- # idle threads than items in the work queue.
+
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 212ccd8..de6ad8f 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -346,10 +346,15 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase
pass
def test_threads_terminate(self):
- self.executor.submit(mul, 21, 2)
- self.executor.submit(mul, 6, 7)
- self.executor.submit(mul, 3, 14)
+ def acquire_lock(lock):
+ lock.acquire()
+
+ sem = threading.Semaphore(0)
+ for i in range(3):
+ self.executor.submit(acquire_lock, sem)
self.assertEqual(len(self.executor._threads), 3)
+ for i in range(3):
+ sem.release()
self.executor.shutdown()
for t in self.executor._threads:
t.join()
@@ -753,6 +758,27 @@ class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
self.assertEqual(executor._max_workers,
(os.cpu_count() or 1) * 5)
+ def test_saturation(self):
+ executor = self.executor_type(4)
+ def acquire_lock(lock):
+ lock.acquire()
+
+ sem = threading.Semaphore(0)
+ for i in range(15 * executor._max_workers):
+ executor.submit(acquire_lock, sem)
+ self.assertEqual(len(executor._threads), executor._max_workers)
+ for i in range(15 * executor._max_workers):
+ sem.release()
+ executor.shutdown(wait=True)
+
+ def test_idle_thread_reuse(self):
+ executor = self.executor_type()
+ executor.submit(mul, 21, 2).result()
+ executor.submit(mul, 6, 7).result()
+ executor.submit(mul, 3, 14).result()
+ self.assertEqual(len(executor._threads), 1)
+ executor.shutdown(wait=True)
+
class ProcessPoolExecutorTest(ExecutorTest):
diff --git a/Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst b/Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst
new file mode 100644
index 0000000..8c41882
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst
@@ -0,0 +1 @@
+Change ThreadPoolExecutor to use existing idle threads before spinning up new ones. \ No newline at end of file