summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
authorThomas Moreau <thomas.moreau.2010@gmail.com>2020-02-16 18:09:26 (GMT)
committerGitHub <noreply@github.com>2020-02-16 18:09:26 (GMT)
commita5cbab552d294d99fde864306632d7e511a75d3c (patch)
tree3f12da3e9bc19c5ae9e836a6694d90cc9ddd35d6 /Lib/test
parent1ed61617a4a6632905ad6a0b440cd2cafb8b6414 (diff)
downloadcpython-a5cbab552d294d99fde864306632d7e511a75d3c.zip
cpython-a5cbab552d294d99fde864306632d7e511a75d3c.tar.gz
cpython-a5cbab552d294d99fde864306632d7e511a75d3c.tar.bz2
bpo-39104: Fix hanging ProcessPoolExecutor on shutdown nowait with pickling failure (GH-17670)
As reported initially by @rad-pat in #6084, the following script causes a deadlock. ``` from concurrent.futures import ProcessPoolExecutor class ObjectWithPickleError(): """Triggers a RuntimeError when sending job to the workers""" def __reduce__(self): raise RuntimeError() if __name__ == "__main__": e = ProcessPoolExecutor() f = e.submit(id, ObjectWithPickleError()) e.shutdown(wait=False) f.result() # Deadlock on get ``` This is caused by the fact that the main process is closing communication channels that might be necessary to the `queue_management_thread` later. To avoid this, this PR let the `queue_management_thread` manage all the closing. https://bugs.python.org/issue39104 Automerge-Triggered-By: @pitrou
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_concurrent_futures.py74
1 files changed, 72 insertions, 2 deletions
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index af77f81..a7381f9 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -415,13 +415,32 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase
def test_del_shutdown(self):
executor = futures.ThreadPoolExecutor(max_workers=5)
- executor.map(abs, range(-5, 5))
+ res = executor.map(abs, range(-5, 5))
threads = executor._threads
del executor
for t in threads:
t.join()
+ # Make sure the results were all computed before the
+ # executor got shutdown.
+ assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+ def test_shutdown_no_wait(self):
+ # Ensure that the executor cleans up the threads when calling
+ # shutdown with wait=False
+ executor = futures.ThreadPoolExecutor(max_workers=5)
+ res = executor.map(abs, range(-5, 5))
+ threads = executor._threads
+ executor.shutdown(wait=False)
+ for t in threads:
+ t.join()
+
+ # Make sure the results were all computed before the
+ # executor got shutdown.
+ assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+
def test_thread_names_assigned(self):
executor = futures.ThreadPoolExecutor(
max_workers=5, thread_name_prefix='SpecialPool')
@@ -488,7 +507,7 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
def test_del_shutdown(self):
executor = futures.ProcessPoolExecutor(max_workers=5)
- list(executor.map(abs, range(-5, 5)))
+ res = executor.map(abs, range(-5, 5))
queue_management_thread = executor._queue_management_thread
processes = executor._processes
call_queue = executor._call_queue
@@ -502,6 +521,31 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
p.join()
call_queue.join_thread()
+ # Make sure the results were all computed before the
+ # executor got shutdown.
+ assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+ def test_shutdown_no_wait(self):
+ # Ensure that the executor cleans up the processes when calling
+ # shutdown with wait=False
+ executor = futures.ProcessPoolExecutor(max_workers=5)
+ res = executor.map(abs, range(-5, 5))
+ processes = executor._processes
+ call_queue = executor._call_queue
+ queue_management_thread = executor._queue_management_thread
+ executor.shutdown(wait=False)
+
+ # Make sure that all the executor resources were properly cleaned by
+ # the shutdown process
+ queue_management_thread.join()
+ for p in processes.values():
+ p.join()
+ call_queue.join_thread()
+
+ # Make sure the results were all computed before the executor got
+ # shutdown.
+ assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
create_executor_tests(ProcessPoolShutdownTest,
executor_mixins=(ProcessPoolForkMixin,
@@ -1086,6 +1130,32 @@ class ExecutorDeadlockTest:
with self.assertRaises(BrokenProcessPool):
f.result()
+ def test_shutdown_deadlock_pickle(self):
+ # Test that the pool calling shutdown with wait=False does not cause
+ # a deadlock if a task fails at pickle after the shutdown call.
+ # Reported in bpo-39104.
+ self.executor.shutdown(wait=True)
+ with self.executor_type(max_workers=2,
+ mp_context=get_context(self.ctx)) as executor:
+ self.executor = executor # Allow clean up in fail_on_deadlock
+
+ # Start the executor and get the queue_management_thread to collect
+ # the threads and avoid dangling thread that should be cleaned up
+ # asynchronously.
+ executor.submit(id, 42).result()
+ queue_manager = executor._queue_management_thread
+
+ # Submit a task that fails at pickle and shutdown the executor
+ # without waiting
+ f = executor.submit(id, ErrorAtPickle())
+ executor.shutdown(wait=False)
+ with self.assertRaises(PicklingError):
+ f.result()
+
+ # Make sure the executor is eventually shutdown and do not leave
+ # dangling threads
+ queue_manager.join()
+
create_executor_tests(ExecutorDeadlockTest,
executor_mixins=(ProcessPoolForkMixin,