diff options
Diffstat (limited to 'Lib/concurrent/futures/process.py')
-rw-r--r-- | Lib/concurrent/futures/process.py | 47 |
1 files changed, 26 insertions, 21 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index fd9f572..d773228 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -80,18 +80,23 @@ _global_shutdown = False class _ThreadWakeup: def __init__(self): + self._closed = False self._reader, self._writer = mp.Pipe(duplex=False) def close(self): - self._writer.close() - self._reader.close() + if not self._closed: + self._closed = True + self._writer.close() + self._reader.close() def wakeup(self): - self._writer.send_bytes(b"") + if not self._closed: + self._writer.send_bytes(b"") def clear(self): - while self._reader.poll(): - self._reader.recv_bytes() + if not self._closed: + while self._reader.poll(): + self._reader.recv_bytes() def _python_exit(): @@ -160,8 +165,9 @@ class _CallItem(object): class _SafeQueue(Queue): """Safe Queue set exception to the future object linked to a job""" - def __init__(self, max_size=0, *, ctx, pending_work_items): + def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup): self.pending_work_items = pending_work_items + self.thread_wakeup = thread_wakeup super().__init__(max_size, ctx=ctx) def _on_queue_feeder_error(self, e, obj): @@ -169,6 +175,7 @@ class _SafeQueue(Queue): tb = traceback.format_exception(type(e), e, e.__traceback__) e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) work_item = self.pending_work_items.pop(obj.work_id, None) + self.thread_wakeup.wakeup() # work_item can be None if another process terminated. In this case, # the queue_manager_thread fails all work_items with BrokenProcessPool if work_item is not None: @@ -339,6 +346,8 @@ def _queue_management_worker(executor_reference, # Release the queue's resources as soon as possible. call_queue.close() + call_queue.join_thread() + thread_wakeup.close() # If .join() is not called on the created processes then # some ctx.Queue methods may deadlock on Mac OS X. for p in processes.values(): @@ -566,6 +575,14 @@ class ProcessPoolExecutor(_base.Executor): self._pending_work_items = {} self._cancel_pending_futures = False + # _ThreadWakeup is a communication channel used to interrupt the wait + # of the main loop of queue_manager_thread from another thread (e.g. + # when calling executor.submit or executor.shutdown). We do not use the + # _result_queue to send the wakeup signal to the queue_manager_thread + # as it could result in a deadlock if a worker process dies with the + # _result_queue write lock still acquired. + self._queue_management_thread_wakeup = _ThreadWakeup() + # Create communication channels for the executor # Make the call queue slightly larger than the number of processes to # prevent the worker processes from idling. But don't make it too big @@ -573,7 +590,8 @@ class ProcessPoolExecutor(_base.Executor): queue_size = self._max_workers + EXTRA_QUEUED_CALLS self._call_queue = _SafeQueue( max_size=queue_size, ctx=self._mp_context, - pending_work_items=self._pending_work_items) + pending_work_items=self._pending_work_items, + thread_wakeup=self._queue_management_thread_wakeup) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed # processes anyway, so silence the tracebacks. @@ -581,14 +599,6 @@ class ProcessPoolExecutor(_base.Executor): self._result_queue = mp_context.SimpleQueue() self._work_ids = queue.Queue() - # _ThreadWakeup is a communication channel used to interrupt the wait - # of the main loop of queue_manager_thread from another thread (e.g. - # when calling executor.submit or executor.shutdown). We do not use the - # _result_queue to send the wakeup signal to the queue_manager_thread - # as it could result in a deadlock if a worker process dies with the - # _result_queue write lock still acquired. - self._queue_management_thread_wakeup = _ThreadWakeup() - def _start_queue_management_thread(self): if self._queue_management_thread is None: # When the executor gets garbarge collected, the weakref callback @@ -692,16 +702,11 @@ class ProcessPoolExecutor(_base.Executor): # To reduce the risk of opening too many files, remove references to # objects that use file descriptors. self._queue_management_thread = None - if self._call_queue is not None: - self._call_queue.close() - if wait: - self._call_queue.join_thread() - self._call_queue = None + self._call_queue = None self._result_queue = None self._processes = None if self._queue_management_thread_wakeup: - self._queue_management_thread_wakeup.close() self._queue_management_thread_wakeup = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ |