diff options
author | Thomas Moreau <thomas.moreau.2010@gmail.com> | 2020-02-16 18:09:26 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-16 18:09:26 (GMT) |
commit | a5cbab552d294d99fde864306632d7e511a75d3c (patch) | |
tree | 3f12da3e9bc19c5ae9e836a6694d90cc9ddd35d6 /Lib/test | |
parent | 1ed61617a4a6632905ad6a0b440cd2cafb8b6414 (diff) | |
download | cpython-a5cbab552d294d99fde864306632d7e511a75d3c.zip cpython-a5cbab552d294d99fde864306632d7e511a75d3c.tar.gz cpython-a5cbab552d294d99fde864306632d7e511a75d3c.tar.bz2 |
bpo-39104: Fix hanging ProcessPoolExecutor on shutdown nowait with pickling failure (GH-17670)
As reported initially by @rad-pat in #6084, the following script causes a deadlock.
```
from concurrent.futures import ProcessPoolExecutor
class ObjectWithPickleError():
"""Triggers a RuntimeError when sending job to the workers"""
def __reduce__(self):
raise RuntimeError()
if __name__ == "__main__":
e = ProcessPoolExecutor()
f = e.submit(id, ObjectWithPickleError())
e.shutdown(wait=False)
f.result() # Deadlock on get
```
This is caused by the fact that the main process is closing communication channels that might be necessary to the `queue_management_thread` later. To avoid this, this PR let the `queue_management_thread` manage all the closing.
https://bugs.python.org/issue39104
Automerge-Triggered-By: @pitrou
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/test_concurrent_futures.py | 74 |
1 files changed, 72 insertions, 2 deletions
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index af77f81..a7381f9 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -415,13 +415,32 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase def test_del_shutdown(self): executor = futures.ThreadPoolExecutor(max_workers=5) - executor.map(abs, range(-5, 5)) + res = executor.map(abs, range(-5, 5)) threads = executor._threads del executor for t in threads: t.join() + # Make sure the results were all computed before the + # executor got shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + + def test_shutdown_no_wait(self): + # Ensure that the executor cleans up the threads when calling + # shutdown with wait=False + executor = futures.ThreadPoolExecutor(max_workers=5) + res = executor.map(abs, range(-5, 5)) + threads = executor._threads + executor.shutdown(wait=False) + for t in threads: + t.join() + + # Make sure the results were all computed before the + # executor got shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + + def test_thread_names_assigned(self): executor = futures.ThreadPoolExecutor( max_workers=5, thread_name_prefix='SpecialPool') @@ -488,7 +507,7 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest): def test_del_shutdown(self): executor = futures.ProcessPoolExecutor(max_workers=5) - list(executor.map(abs, range(-5, 5))) + res = executor.map(abs, range(-5, 5)) queue_management_thread = executor._queue_management_thread processes = executor._processes call_queue = executor._call_queue @@ -502,6 +521,31 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest): p.join() call_queue.join_thread() + # Make sure the results were all computed before the + # executor got shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + + def test_shutdown_no_wait(self): + # Ensure that the executor cleans up the processes when calling + # shutdown with wait=False + executor = futures.ProcessPoolExecutor(max_workers=5) + res = executor.map(abs, range(-5, 5)) + processes = executor._processes + call_queue = executor._call_queue + queue_management_thread = executor._queue_management_thread + executor.shutdown(wait=False) + + # Make sure that all the executor resources were properly cleaned by + # the shutdown process + queue_management_thread.join() + for p in processes.values(): + p.join() + call_queue.join_thread() + + # Make sure the results were all computed before the executor got + # shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + create_executor_tests(ProcessPoolShutdownTest, executor_mixins=(ProcessPoolForkMixin, @@ -1086,6 +1130,32 @@ class ExecutorDeadlockTest: with self.assertRaises(BrokenProcessPool): f.result() + def test_shutdown_deadlock_pickle(self): + # Test that the pool calling shutdown with wait=False does not cause + # a deadlock if a task fails at pickle after the shutdown call. + # Reported in bpo-39104. + self.executor.shutdown(wait=True) + with self.executor_type(max_workers=2, + mp_context=get_context(self.ctx)) as executor: + self.executor = executor # Allow clean up in fail_on_deadlock + + # Start the executor and get the queue_management_thread to collect + # the threads and avoid dangling thread that should be cleaned up + # asynchronously. + executor.submit(id, 42).result() + queue_manager = executor._queue_management_thread + + # Submit a task that fails at pickle and shutdown the executor + # without waiting + f = executor.submit(id, ErrorAtPickle()) + executor.shutdown(wait=False) + with self.assertRaises(PicklingError): + f.result() + + # Make sure the executor is eventually shutdown and do not leave + # dangling threads + queue_manager.join() + create_executor_tests(ExecutorDeadlockTest, executor_mixins=(ProcessPoolForkMixin, |