diff options
-rw-r--r-- | Lib/concurrent/futures/process.py | 208 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 21 | ||||
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 37 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures.py | 172 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst | 4 |
5 files changed, 386 insertions, 56 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 35af65d..aaa5151 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -8,10 +8,10 @@ The follow diagram and text describe the data-flow through the system: |======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ -| | => | Work Ids | => | | => | Call Q | => | | -| | +----------+ | | +-----------+ | | -| | | ... | | | | ... | | | -| | | 6 | | | | 5, call() | | | +| | => | Work Ids | | | | Call Q | | Process | +| | +----------+ | | +-----------+ | Pool | +| | | ... | | | | ... | +---------+ +| | | 6 | => | | => | 5, call() | => | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | @@ -52,6 +52,7 @@ import queue from queue import Full import multiprocessing as mp from multiprocessing.connection import wait +from multiprocessing.queues import Queue import threading import weakref from functools import partial @@ -72,16 +73,31 @@ import traceback # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_threads_queues = weakref.WeakKeyDictionary() +_threads_wakeups = weakref.WeakKeyDictionary() _global_shutdown = False + +class _ThreadWakeup: + __slot__ = ["_state"] + + def __init__(self): + self._reader, self._writer = mp.Pipe(duplex=False) + + def wakeup(self): + self._writer.send_bytes(b"") + + def clear(self): + while self._reader.poll(): + self._reader.recv_bytes() + + def _python_exit(): global _global_shutdown _global_shutdown = True - items = list(_threads_queues.items()) - for t, q in items: - q.put(None) - for t, q in items: + items = list(_threads_wakeups.items()) + for _, thread_wakeup in items: + thread_wakeup.wakeup() + for t, _ in items: t.join() # Controls how many more calls than processes will be queued in the call queue. @@ -90,6 +106,7 @@ def _python_exit(): # (Futures in the call queue cannot be cancelled). EXTRA_QUEUED_CALLS = 1 + # Hack to embed stringification of remote traceback in local traceback class _RemoteTraceback(Exception): @@ -132,6 +149,25 @@ class _CallItem(object): self.kwargs = kwargs +class _SafeQueue(Queue): + """Safe Queue set exception to the future object linked to a job""" + def __init__(self, max_size=0, *, ctx, pending_work_items): + self.pending_work_items = pending_work_items + super().__init__(max_size, ctx=ctx) + + def _on_queue_feeder_error(self, e, obj): + if isinstance(obj, _CallItem): + tb = traceback.format_exception(type(e), e, e.__traceback__) + e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) + work_item = self.pending_work_items.pop(obj.work_id, None) + # work_item can be None if another process terminated. In this case, + # the queue_manager_thread fails all work_items with BrokenProcessPool + if work_item is not None: + work_item.future.set_exception(e) + else: + super()._on_queue_feeder_error(e, obj) + + def _get_chunks(*iterables, chunksize): """ Iterates over zip()ed iterables in chunks. """ it = zip(*iterables) @@ -152,6 +188,17 @@ def _process_chunk(fn, chunk): """ return [fn(*args) for args in chunk] + +def _sendback_result(result_queue, work_id, result=None, exception=None): + """Safely send back the given result or exception""" + try: + result_queue.put(_ResultItem(work_id, result=result, + exception=exception)) + except BaseException as e: + exc = _ExceptionWithTraceback(e, e.__traceback__) + result_queue.put(_ResultItem(work_id, exception=exc)) + + def _process_worker(call_queue, result_queue, initializer, initargs): """Evaluates calls from call_queue and places the results in result_queue. @@ -183,10 +230,9 @@ def _process_worker(call_queue, result_queue, initializer, initargs): r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - result_queue.put(_ResultItem(call_item.work_id, exception=exc)) + _sendback_result(result_queue, call_item.work_id, exception=exc) else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) + _sendback_result(result_queue, call_item.work_id, result=r) # Liberate the resource as soon as possible, to avoid holding onto # open files or shared memory that is not needed anymore @@ -230,12 +276,14 @@ def _add_call_item_to_queue(pending_work_items, del pending_work_items[work_id] continue + def _queue_management_worker(executor_reference, processes, pending_work_items, work_ids_queue, call_queue, - result_queue): + result_queue, + thread_wakeup): """Manages the communication between this process and the worker processes. This function is run in a local thread. @@ -253,6 +301,9 @@ def _queue_management_worker(executor_reference, derived from _WorkItems for processing by the process workers. result_queue: A ctx.SimpleQueue of _ResultItems generated by the process workers. + thread_wakeup: A _ThreadWakeup to allow waking up the + queue_manager_thread from the main Thread and avoid deadlocks + caused by permanently locked queues. """ executor = None @@ -261,10 +312,21 @@ def _queue_management_worker(executor_reference, or executor._shutdown_thread) def shutdown_worker(): - # This is an upper bound - nb_children_alive = sum(p.is_alive() for p in processes.values()) - for i in range(0, nb_children_alive): - call_queue.put_nowait(None) + # This is an upper bound on the number of children alive. + n_children_alive = sum(p.is_alive() for p in processes.values()) + n_children_to_stop = n_children_alive + n_sentinels_sent = 0 + # Send the right number of sentinels, to make sure all children are + # properly terminated. + while n_sentinels_sent < n_children_to_stop and n_children_alive > 0: + for i in range(n_children_to_stop - n_sentinels_sent): + try: + call_queue.put_nowait(None) + n_sentinels_sent += 1 + except Full: + break + n_children_alive = sum(p.is_alive() for p in processes.values()) + # Release the queue's resources as soon as possible. call_queue.close() # If .join() is not called on the created processes then @@ -272,19 +334,37 @@ def _queue_management_worker(executor_reference, for p in processes.values(): p.join() - reader = result_queue._reader + result_reader = result_queue._reader + wakeup_reader = thread_wakeup._reader + readers = [result_reader, wakeup_reader] while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) - sentinels = [p.sentinel for p in processes.values()] - assert sentinels - ready = wait([reader] + sentinels) - if reader in ready: - result_item = reader.recv() - else: + # Wait for a result to be ready in the result_queue while checking + # that all worker processes are still running, or for a wake up + # signal send. The wake up signals come either from new tasks being + # submitted, from the executor being shutdown/gc-ed, or from the + # shutdown of the python interpreter. + worker_sentinels = [p.sentinel for p in processes.values()] + ready = wait(readers + worker_sentinels) + + cause = None + is_broken = True + if result_reader in ready: + try: + result_item = result_reader.recv() + is_broken = False + except BaseException as e: + cause = traceback.format_exception(type(e), e, e.__traceback__) + + elif wakeup_reader in ready: + is_broken = False + result_item = None + thread_wakeup.clear() + if is_broken: # Mark the process pool broken so that submits fail right now. executor = executor_reference() if executor is not None: @@ -293,14 +373,15 @@ def _queue_management_worker(executor_reference, 'usable anymore') executor._shutdown_thread = True executor = None + bpe = BrokenProcessPool("A process in the process pool was " + "terminated abruptly while the future was " + "running or pending.") + if cause is not None: + bpe.__cause__ = _RemoteTraceback( + f"\n'''\n{''.join(cause)}'''") # All futures in flight must be marked failed for work_id, work_item in pending_work_items.items(): - work_item.future.set_exception( - BrokenProcessPool( - "A process in the process pool was " - "terminated abruptly while the future was " - "running or pending." - )) + work_item.future.set_exception(bpe) # Delete references to object. See issue16284 del work_item pending_work_items.clear() @@ -329,6 +410,9 @@ def _queue_management_worker(executor_reference, work_item.future.set_result(result_item.result) # Delete references to object. See issue16284 del work_item + # Delete reference to result_item + del result_item + # Check whether we should start shutting down. executor = executor_reference() # No more work items can be added if: @@ -348,8 +432,11 @@ def _queue_management_worker(executor_reference, pass executor = None + _system_limits_checked = False _system_limited = None + + def _check_system_limits(): global _system_limits_checked, _system_limited if _system_limits_checked: @@ -369,7 +456,8 @@ def _check_system_limits(): # minimum number of semaphores available # according to POSIX return - _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max + _system_limited = ("system provides too few semaphores (%d" + " available, 256 necessary)" % nsems_max) raise NotImplementedError(_system_limited) @@ -415,6 +503,7 @@ class ProcessPoolExecutor(_base.Executor): raise ValueError("max_workers must be greater than 0") self._max_workers = max_workers + if mp_context is None: mp_context = mp.get_context() self._mp_context = mp_context @@ -424,34 +513,52 @@ class ProcessPoolExecutor(_base.Executor): self._initializer = initializer self._initargs = initargs + # Management thread + self._queue_management_thread = None + + # Map of pids to processes + self._processes = {} + + # Shutdown is a two-step process. + self._shutdown_thread = False + self._shutdown_lock = threading.Lock() + self._broken = False + self._queue_count = 0 + self._pending_work_items = {} + + # Create communication channels for the executor # Make the call queue slightly larger than the number of processes to # prevent the worker processes from idling. But don't make it too big # because futures in the call queue cannot be cancelled. queue_size = self._max_workers + EXTRA_QUEUED_CALLS - self._call_queue = mp_context.Queue(queue_size) + self._call_queue = _SafeQueue( + max_size=queue_size, ctx=self._mp_context, + pending_work_items=self._pending_work_items) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed # processes anyway, so silence the tracebacks. self._call_queue._ignore_epipe = True self._result_queue = mp_context.SimpleQueue() self._work_ids = queue.Queue() - self._queue_management_thread = None - # Map of pids to processes - self._processes = {} - # Shutdown is a two-step process. - self._shutdown_thread = False - self._shutdown_lock = threading.Lock() - self._broken = False - self._queue_count = 0 - self._pending_work_items = {} + # _ThreadWakeup is a communication channel used to interrupt the wait + # of the main loop of queue_manager_thread from another thread (e.g. + # when calling executor.submit or executor.shutdown). We do not use the + # _result_queue to send the wakeup signal to the queue_manager_thread + # as it could result in a deadlock if a worker process dies with the + # _result_queue write lock still acquired. + self._queue_management_thread_wakeup = _ThreadWakeup() def _start_queue_management_thread(self): - # When the executor gets lost, the weakref callback will wake up - # the queue management thread. - def weakref_cb(_, q=self._result_queue): - q.put(None) if self._queue_management_thread is None: + # When the executor gets garbarge collected, the weakref callback + # will wake up the queue management thread so that it can terminate + # if there is no pending work item. + def weakref_cb(_, + thread_wakeup=self._queue_management_thread_wakeup): + mp.util.debug('Executor collected: triggering callback for' + ' QueueManager wakeup') + thread_wakeup.wakeup() # Start the processes so that their sentinels are known. self._adjust_process_count() self._queue_management_thread = threading.Thread( @@ -461,10 +568,13 @@ class ProcessPoolExecutor(_base.Executor): self._pending_work_items, self._work_ids, self._call_queue, - self._result_queue)) + self._result_queue, + self._queue_management_thread_wakeup), + name="QueueManagerThread") self._queue_management_thread.daemon = True self._queue_management_thread.start() - _threads_queues[self._queue_management_thread] = self._result_queue + _threads_wakeups[self._queue_management_thread] = \ + self._queue_management_thread_wakeup def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): @@ -491,7 +601,7 @@ class ProcessPoolExecutor(_base.Executor): self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread - self._result_queue.put(None) + self._queue_management_thread_wakeup.wakeup() self._start_queue_management_thread() return f @@ -531,7 +641,7 @@ class ProcessPoolExecutor(_base.Executor): self._shutdown_thread = True if self._queue_management_thread: # Wake up queue management thread - self._result_queue.put(None) + self._queue_management_thread_wakeup.wakeup() if wait: self._queue_management_thread.join() # To reduce the risk of opening too many files, remove references to diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 328efbd..d66d37a 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -160,9 +160,10 @@ class Queue(object): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send_bytes, - self._wlock, self._writer.close, self._ignore_epipe), + self._wlock, self._writer.close, self._ignore_epipe, + self._on_queue_feeder_error), name='QueueFeederThread' - ) + ) self._thread.daemon = True debug('doing self._thread.start()') @@ -201,7 +202,8 @@ class Queue(object): notempty.notify() @staticmethod - def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): + def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, + onerror): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release @@ -253,8 +255,17 @@ class Queue(object): info('error in queue thread: %s', e) return else: - import traceback - traceback.print_exc() + onerror(e, obj) + + @staticmethod + def _on_queue_feeder_error(e, obj): + """ + Private API hook called when feeding data in the background thread + raises an exception. For overriding by concurrent.futures. + """ + import traceback + traceback.print_exc() + _sentinel = object() diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 7575c5d..05166b9 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1029,6 +1029,43 @@ class _TestQueue(BaseTestCase): self.assertTrue(q.get(timeout=1.0)) close_queue(q) + def test_queue_feeder_on_queue_feeder_error(self): + # bpo-30006: verify feeder handles exceptions using the + # _on_queue_feeder_error hook. + if self.TYPE != 'processes': + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + class NotSerializable(object): + """Mock unserializable object""" + def __init__(self): + self.reduce_was_called = False + self.on_queue_feeder_error_was_called = False + + def __reduce__(self): + self.reduce_was_called = True + raise AttributeError + + class SafeQueue(multiprocessing.queues.Queue): + """Queue with overloaded _on_queue_feeder_error hook""" + @staticmethod + def _on_queue_feeder_error(e, obj): + if (isinstance(e, AttributeError) and + isinstance(obj, NotSerializable)): + obj.on_queue_feeder_error_was_called = True + + not_serializable_obj = NotSerializable() + # The captured_stderr reduces the noise in the test report + with test.support.captured_stderr(): + q = SafeQueue(ctx=multiprocessing.get_context()) + q.put(not_serializable_obj) + + # Verify that q is still functionning correctly + q.put(True) + self.assertTrue(q.get(timeout=1.0)) + + # Assert that the serialization and the hook have been called correctly + self.assertTrue(not_serializable_obj.reduce_was_called) + self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) # # # diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 7687899..675cd7a 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -18,6 +18,7 @@ import threading import time import unittest import weakref +from pickle import PicklingError from concurrent import futures from concurrent.futures._base import ( @@ -394,16 +395,17 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest): queue_management_thread = executor._queue_management_thread processes = executor._processes call_queue = executor._call_queue + queue_management_thread = executor._queue_management_thread del executor + # Make sure that all the executor ressources were properly cleaned by + # the shutdown process queue_management_thread.join() for p in processes.values(): p.join() - call_queue.close() call_queue.join_thread() - create_executor_tests(ProcessPoolShutdownTest, executor_mixins=(ProcessPoolForkMixin, ProcessPoolForkserverMixin, @@ -784,6 +786,172 @@ create_executor_tests(ProcessPoolExecutorTest, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)) +def hide_process_stderr(): + import io + sys.stderr = io.StringIO() + + +def _crash(delay=None): + """Induces a segfault.""" + if delay: + time.sleep(delay) + import faulthandler + faulthandler.disable() + faulthandler._sigsegv() + + +def _exit(): + """Induces a sys exit with exitcode 1.""" + sys.exit(1) + + +def _raise_error(Err): + """Function that raises an Exception in process.""" + hide_process_stderr() + raise Err() + + +def _return_instance(cls): + """Function that returns a instance of cls.""" + hide_process_stderr() + return cls() + + +class CrashAtPickle(object): + """Bad object that triggers a segfault at pickling time.""" + def __reduce__(self): + _crash() + + +class CrashAtUnpickle(object): + """Bad object that triggers a segfault at unpickling time.""" + def __reduce__(self): + return _crash, () + + +class ExitAtPickle(object): + """Bad object that triggers a process exit at pickling time.""" + def __reduce__(self): + _exit() + + +class ExitAtUnpickle(object): + """Bad object that triggers a process exit at unpickling time.""" + def __reduce__(self): + return _exit, () + + +class ErrorAtPickle(object): + """Bad object that triggers an error at pickling time.""" + def __reduce__(self): + from pickle import PicklingError + raise PicklingError("Error in pickle") + + +class ErrorAtUnpickle(object): + """Bad object that triggers an error at unpickling time.""" + def __reduce__(self): + from pickle import UnpicklingError + return _raise_error, (UnpicklingError, ) + + +class ExecutorDeadlockTest: + TIMEOUT = 15 + + @classmethod + def _sleep_id(cls, x, delay): + time.sleep(delay) + return x + + def _fail_on_deadlock(self, executor): + # If we did not recover before TIMEOUT seconds, consider that the + # executor is in a deadlock state and forcefully clean all its + # composants. + import faulthandler + from tempfile import TemporaryFile + with TemporaryFile(mode="w+") as f: + faulthandler.dump_traceback(file=f) + f.seek(0) + tb = f.read() + for p in executor._processes.values(): + p.terminate() + # This should be safe to call executor.shutdown here as all possible + # deadlocks should have been broken. + executor.shutdown(wait=True) + print(f"\nTraceback:\n {tb}", file=sys.__stderr__) + self.fail(f"Executor deadlock:\n\n{tb}") + + + def test_crash(self): + # extensive testing for deadlock caused by crashes in a pool. + self.executor.shutdown(wait=True) + crash_cases = [ + # Check problem occuring while pickling a task in + # the task_handler thread + (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"), + # Check problem occuring while unpickling a task on workers + (id, (ExitAtUnpickle(),), BrokenProcessPool, + "exit at task unpickle"), + (id, (ErrorAtUnpickle(),), BrokenProcessPool, + "error at task unpickle"), + (id, (CrashAtUnpickle(),), BrokenProcessPool, + "crash at task unpickle"), + # Check problem occuring during func execution on workers + (_crash, (), BrokenProcessPool, + "crash during func execution on worker"), + (_exit, (), SystemExit, + "exit during func execution on worker"), + (_raise_error, (RuntimeError, ), RuntimeError, + "error during func execution on worker"), + # Check problem occuring while pickling a task result + # on workers + (_return_instance, (CrashAtPickle,), BrokenProcessPool, + "crash during result pickle on worker"), + (_return_instance, (ExitAtPickle,), SystemExit, + "exit during result pickle on worker"), + (_return_instance, (ErrorAtPickle,), PicklingError, + "error during result pickle on worker"), + # Check problem occuring while unpickling a task in + # the result_handler thread + (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool, + "error during result unpickle in result_handler"), + (_return_instance, (ExitAtUnpickle,), BrokenProcessPool, + "exit during result unpickle in result_handler") + ] + for func, args, error, name in crash_cases: + with self.subTest(name): + # The captured_stderr reduces the noise in the test report + with test.support.captured_stderr(): + executor = self.executor_type( + max_workers=2, mp_context=get_context(self.ctx)) + res = executor.submit(func, *args) + with self.assertRaises(error): + try: + res.result(timeout=self.TIMEOUT) + except futures.TimeoutError: + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + self._fail_on_deadlock(executor) + executor.shutdown(wait=True) + + def test_shutdown_deadlock(self): + # Test that the pool calling shutdown do not cause deadlock + # if a worker fails after the shutdown call. + self.executor.shutdown(wait=True) + with self.executor_type(max_workers=2, + mp_context=get_context(self.ctx)) as executor: + self.executor = executor # Allow clean up in fail_on_deadlock + f = executor.submit(_crash, delay=.1) + executor.shutdown(wait=True) + with self.assertRaises(BrokenProcessPool): + f.result() + + +create_executor_tests(ExecutorDeadlockTest, + executor_mixins=(ProcessPoolForkMixin, + ProcessPoolForkserverMixin, + ProcessPoolSpawnMixin)) + class FutureTests(BaseTestCase): def test_done_callback_with_result(self): diff --git a/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst b/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst new file mode 100644 index 0000000..49cbbb3 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst @@ -0,0 +1,4 @@ +Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when +task arguments or results cause pickling or unpickling errors. +This should make sure that calls to the :class:`ProcessPoolExecutor` API +always eventually return. |