summaryrefslogtreecommitdiffstats
path: root/Lib/concurrent/futures
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/concurrent/futures')
-rw-r--r--Lib/concurrent/futures/process.py112
-rw-r--r--Lib/concurrent/futures/thread.py60
2 files changed, 93 insertions, 79 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 79c60c3..a899d5f 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -66,28 +66,17 @@ import weakref
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.
-_thread_references = set()
+_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
- for thread_reference in _thread_references:
- thread = thread_reference()
- if thread is not None:
- thread.join()
-
-def _remove_dead_thread_references():
- """Remove inactive threads from _thread_references.
-
- Should be called periodically to prevent memory leaks in scenarios such as:
- >>> while True:
- >>> ... t = ThreadPoolExecutor(max_workers=5)
- >>> ... t.map(int, ['1', '2', '3', '4', '5'])
- """
- for thread_reference in set(_thread_references):
- if thread_reference() is None:
- _thread_references.discard(thread_reference)
+ items = list(_threads_queues.items())
+ for t, q in items:
+ q.put(None)
+ for t, q in items:
+ t.join()
# Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for
@@ -130,11 +119,15 @@ def _process_worker(call_queue, result_queue, shutdown):
"""
while True:
try:
- call_item = call_queue.get(block=True, timeout=0.1)
+ call_item = call_queue.get(block=True)
except queue.Empty:
if shutdown.is_set():
return
else:
+ if call_item is None:
+ # Wake up queue management thread
+ result_queue.put(None)
+ return
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
@@ -209,40 +202,56 @@ def _queue_manangement_worker(executor_reference,
process workers that they should exit when their work queue is
empty.
"""
+ nb_shutdown_processes = 0
+ def shutdown_one_process():
+ """Tell a worker to terminate, which will in turn wake us again"""
+ nonlocal nb_shutdown_processes
+ call_queue.put(None)
+ nb_shutdown_processes += 1
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
try:
- result_item = result_queue.get(block=True, timeout=0.1)
+ result_item = result_queue.get(block=True)
except queue.Empty:
- executor = executor_reference()
- # No more work items can be added if:
- # - The interpreter is shutting down OR
- # - The executor that owns this worker has been collected OR
- # - The executor that owns this worker has been shutdown.
- if _shutdown or executor is None or executor._shutdown_thread:
- # Since no new work items can be added, it is safe to shutdown
- # this thread if there are no pending work items.
- if not pending_work_items:
- shutdown_process_event.set()
-
- # If .join() is not called on the created processes then
- # some multiprocessing.Queue methods may deadlock on Mac OS
- # X.
- for p in processes:
- p.join()
- return
- del executor
+ pass
else:
- work_item = pending_work_items[result_item.work_id]
- del pending_work_items[result_item.work_id]
-
- if result_item.exception:
- work_item.future.set_exception(result_item.exception)
+ if result_item is not None:
+ work_item = pending_work_items[result_item.work_id]
+ del pending_work_items[result_item.work_id]
+
+ if result_item.exception:
+ work_item.future.set_exception(result_item.exception)
+ else:
+ work_item.future.set_result(result_item.result)
+ continue
+ # If we come here, we either got a timeout or were explicitly woken up.
+ # In either case, check whether we should start shutting down.
+ executor = executor_reference()
+ # No more work items can be added if:
+ # - The interpreter is shutting down OR
+ # - The executor that owns this worker has been collected OR
+ # - The executor that owns this worker has been shutdown.
+ if _shutdown or executor is None or executor._shutdown_thread:
+ # Since no new work items can be added, it is safe to shutdown
+ # this thread if there are no pending work items.
+ if not pending_work_items:
+ shutdown_process_event.set()
+
+ while nb_shutdown_processes < len(processes):
+ shutdown_one_process()
+ # If .join() is not called on the created processes then
+ # some multiprocessing.Queue methods may deadlock on Mac OS
+ # X.
+ for p in processes:
+ p.join()
+ return
else:
- work_item.future.set_result(result_item.result)
+ # Start shutting down by telling a process it can exit.
+ shutdown_one_process()
+ del executor
_system_limits_checked = False
_system_limited = None
@@ -279,7 +288,6 @@ class ProcessPoolExecutor(_base.Executor):
worker processes will be created as the machine has processors.
"""
_check_system_limits()
- _remove_dead_thread_references()
if max_workers is None:
self._max_workers = multiprocessing.cpu_count()
@@ -304,10 +312,14 @@ class ProcessPoolExecutor(_base.Executor):
self._pending_work_items = {}
def _start_queue_management_thread(self):
+ # When the executor gets lost, the weakref callback will wake up
+ # the queue management thread.
+ def weakref_cb(_, q=self._result_queue):
+ q.put(None)
if self._queue_management_thread is None:
self._queue_management_thread = threading.Thread(
target=_queue_manangement_worker,
- args=(weakref.ref(self),
+ args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
@@ -316,7 +328,7 @@ class ProcessPoolExecutor(_base.Executor):
self._shutdown_process_event))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
- _thread_references.add(weakref.ref(self._queue_management_thread))
+ _threads_queues[self._queue_management_thread] = self._result_queue
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
@@ -339,6 +351,8 @@ class ProcessPoolExecutor(_base.Executor):
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
+ # Wake up queue management thread
+ self._result_queue.put(None)
self._start_queue_management_thread()
self._adjust_process_count()
@@ -348,8 +362,10 @@ class ProcessPoolExecutor(_base.Executor):
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown_thread = True
- if wait:
- if self._queue_management_thread:
+ if self._queue_management_thread:
+ # Wake up queue management thread
+ self._result_queue.put(None)
+ if wait:
self._queue_management_thread.join()
# To reduce the risk of openning too many files, remove references to
# objects that use file descriptors.
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index 15736da..93b495f 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -25,28 +25,17 @@ import weakref
# workers to exit when their work queues are empty and then waits until the
# threads finish.
-_thread_references = set()
+_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
- for thread_reference in _thread_references:
- thread = thread_reference()
- if thread is not None:
- thread.join()
-
-def _remove_dead_thread_references():
- """Remove inactive threads from _thread_references.
-
- Should be called periodically to prevent memory leaks in scenarios such as:
- >>> while True:
- ... t = ThreadPoolExecutor(max_workers=5)
- ... t.map(int, ['1', '2', '3', '4', '5'])
- """
- for thread_reference in set(_thread_references):
- if thread_reference() is None:
- _thread_references.discard(thread_reference)
+ items = list(_threads_queues.items())
+ for t, q in items:
+ q.put(None)
+ for t, q in items:
+ t.join()
atexit.register(_python_exit)
@@ -72,18 +61,23 @@ def _worker(executor_reference, work_queue):
try:
while True:
try:
- work_item = work_queue.get(block=True, timeout=0.1)
+ work_item = work_queue.get(block=True)
except queue.Empty:
- executor = executor_reference()
- # Exit if:
- # - The interpreter is shutting down OR
- # - The executor that owns the worker has been collected OR
- # - The executor that owns the worker has been shutdown.
- if _shutdown or executor is None or executor._shutdown:
- return
- del executor
+ pass
else:
- work_item.run()
+ if work_item is not None:
+ work_item.run()
+ continue
+ executor = executor_reference()
+ # Exit if:
+ # - The interpreter is shutting down OR
+ # - The executor that owns the worker has been collected OR
+ # - The executor that owns the worker has been shutdown.
+ if _shutdown or executor is None or executor._shutdown:
+ # Notice other workers
+ work_queue.put(None)
+ return
+ del executor
except BaseException as e:
_base.LOGGER.critical('Exception in worker', exc_info=True)
@@ -95,8 +89,6 @@ class ThreadPoolExecutor(_base.Executor):
max_workers: The maximum number of threads that can be used to
execute the given calls.
"""
- _remove_dead_thread_references()
-
self._max_workers = max_workers
self._work_queue = queue.Queue()
self._threads = set()
@@ -117,19 +109,25 @@ class ThreadPoolExecutor(_base.Executor):
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
+ # When the executor gets lost, the weakref callback will wake up
+ # the worker threads.
+ def weakref_cb(_, q=self._work_queue):
+ q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
- args=(weakref.ref(self), self._work_queue))
+ args=(weakref.ref(self, weakref_cb),
+ self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
- _thread_references.add(weakref.ref(t))
+ _threads_queues[t] = self._work_queue
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
+ self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()