summaryrefslogtreecommitdiffstats
path: root/Lib/concurrent
diff options
context:
space:
mode:
authorThomas Moreau <thomas.moreau.2010@gmail.com>2017-10-03 09:53:17 (GMT)
committerAntoine Pitrou <pitrou@free.fr>2017-10-03 09:53:17 (GMT)
commite8c368df22c344183627e7ef882bea1683fe6dbe (patch)
tree3023a6313e5eab67c87cf1ccfb3e13e8c524ea77 /Lib/concurrent
parentefb560eee28b6b2418e1231573ca62574d6dc07b (diff)
downloadcpython-e8c368df22c344183627e7ef882bea1683fe6dbe.zip
cpython-e8c368df22c344183627e7ef882bea1683fe6dbe.tar.gz
cpython-e8c368df22c344183627e7ef882bea1683fe6dbe.tar.bz2
bpo-31540: Allow passing multiprocessing context to ProcessPoolExecutor (#3682)
Diffstat (limited to 'Lib/concurrent')
-rw-r--r--Lib/concurrent/futures/process.py66
1 files changed, 37 insertions, 29 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 50ee296..67ebbf5 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -50,8 +50,7 @@ import os
from concurrent.futures import _base
import queue
from queue import Full
-import multiprocessing
-from multiprocessing import SimpleQueue
+import multiprocessing as mp
from multiprocessing.connection import wait
import threading
import weakref
@@ -74,11 +73,11 @@ import traceback
# threads/processes finish.
_threads_queues = weakref.WeakKeyDictionary()
-_shutdown = False
+_global_shutdown = False
def _python_exit():
- global _shutdown
- _shutdown = True
+ global _global_shutdown
+ _global_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
@@ -158,12 +157,10 @@ def _process_worker(call_queue, result_queue):
This worker is run in a separate process.
Args:
- call_queue: A multiprocessing.Queue of _CallItems that will be read and
+ call_queue: A ctx.Queue of _CallItems that will be read and
evaluated by the worker.
- result_queue: A multiprocessing.Queue of _ResultItems that will written
+ result_queue: A ctx.Queue of _ResultItems that will written
to by the worker.
- shutdown: A multiprocessing.Event that will be set as a signal to the
- worker that it should exit when call_queue is empty.
"""
while True:
call_item = call_queue.get(block=True)
@@ -180,6 +177,11 @@ def _process_worker(call_queue, result_queue):
result_queue.put(_ResultItem(call_item.work_id,
result=r))
+ # Liberate the resource as soon as possible, to avoid holding onto
+ # open files or shared memory that is not needed anymore
+ del call_item
+
+
def _add_call_item_to_queue(pending_work_items,
work_ids,
call_queue):
@@ -231,20 +233,21 @@ def _queue_management_worker(executor_reference,
executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
this thread. Used to determine if the ProcessPoolExecutor has been
garbage collected and that this function can exit.
- process: A list of the multiprocessing.Process instances used as
+ process: A list of the ctx.Process instances used as
workers.
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
- call_queue: A multiprocessing.Queue that will be filled with _CallItems
+ call_queue: A ctx.Queue that will be filled with _CallItems
derived from _WorkItems for processing by the process workers.
- result_queue: A multiprocessing.Queue of _ResultItems generated by the
+ result_queue: A ctx.SimpleQueue of _ResultItems generated by the
process workers.
"""
executor = None
def shutting_down():
- return _shutdown or executor is None or executor._shutdown_thread
+ return (_global_shutdown or executor is None
+ or executor._shutdown_thread)
def shutdown_worker():
# This is an upper bound
@@ -254,7 +257,7 @@ def _queue_management_worker(executor_reference,
# Release the queue's resources as soon as possible.
call_queue.close()
# If .join() is not called on the created processes then
- # some multiprocessing.Queue methods may deadlock on Mac OS X.
+ # some ctx.Queue methods may deadlock on Mac OS X.
for p in processes.values():
p.join()
@@ -377,13 +380,15 @@ class BrokenProcessPool(RuntimeError):
class ProcessPoolExecutor(_base.Executor):
- def __init__(self, max_workers=None):
+ def __init__(self, max_workers=None, mp_context=None):
"""Initializes a new ProcessPoolExecutor instance.
Args:
max_workers: The maximum number of processes that can be used to
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
+ mp_context: A multiprocessing context to launch the workers. This
+ object should provide SimpleQueue, Queue and Process.
"""
_check_system_limits()
@@ -394,17 +399,20 @@ class ProcessPoolExecutor(_base.Executor):
raise ValueError("max_workers must be greater than 0")
self._max_workers = max_workers
+ if mp_context is None:
+ mp_context = mp.get_context()
+ self._mp_context = mp_context
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
- self._call_queue = multiprocessing.Queue(self._max_workers +
- EXTRA_QUEUED_CALLS)
+ queue_size = self._max_workers + EXTRA_QUEUED_CALLS
+ self._call_queue = mp_context.Queue(queue_size)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
- self._result_queue = SimpleQueue()
+ self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
# Map of pids to processes
@@ -426,23 +434,23 @@ class ProcessPoolExecutor(_base.Executor):
# Start the processes so that their sentinels are known.
self._adjust_process_count()
self._queue_management_thread = threading.Thread(
- target=_queue_management_worker,
- args=(weakref.ref(self, weakref_cb),
- self._processes,
- self._pending_work_items,
- self._work_ids,
- self._call_queue,
- self._result_queue))
+ target=_queue_management_worker,
+ args=(weakref.ref(self, weakref_cb),
+ self._processes,
+ self._pending_work_items,
+ self._work_ids,
+ self._call_queue,
+ self._result_queue))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
- p = multiprocessing.Process(
- target=_process_worker,
- args=(self._call_queue,
- self._result_queue))
+ p = self._mp_context.Process(
+ target=_process_worker,
+ args=(self._call_queue,
+ self._result_queue))
p.start()
self._processes[p.pid] = p