diff options
Diffstat (limited to 'Lib/test/test_concurrent_futures.py')
-rw-r--r-- | Lib/test/test_concurrent_futures.py | 74 |
1 files changed, 72 insertions, 2 deletions
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index af77f81..a7381f9 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -415,13 +415,32 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase def test_del_shutdown(self): executor = futures.ThreadPoolExecutor(max_workers=5) - executor.map(abs, range(-5, 5)) + res = executor.map(abs, range(-5, 5)) threads = executor._threads del executor for t in threads: t.join() + # Make sure the results were all computed before the + # executor got shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + + def test_shutdown_no_wait(self): + # Ensure that the executor cleans up the threads when calling + # shutdown with wait=False + executor = futures.ThreadPoolExecutor(max_workers=5) + res = executor.map(abs, range(-5, 5)) + threads = executor._threads + executor.shutdown(wait=False) + for t in threads: + t.join() + + # Make sure the results were all computed before the + # executor got shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + + def test_thread_names_assigned(self): executor = futures.ThreadPoolExecutor( max_workers=5, thread_name_prefix='SpecialPool') @@ -488,7 +507,7 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest): def test_del_shutdown(self): executor = futures.ProcessPoolExecutor(max_workers=5) - list(executor.map(abs, range(-5, 5))) + res = executor.map(abs, range(-5, 5)) queue_management_thread = executor._queue_management_thread processes = executor._processes call_queue = executor._call_queue @@ -502,6 +521,31 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest): p.join() call_queue.join_thread() + # Make sure the results were all computed before the + # executor got shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + + def test_shutdown_no_wait(self): + # Ensure that the executor cleans up the processes when calling + # shutdown with wait=False + executor = futures.ProcessPoolExecutor(max_workers=5) + res = executor.map(abs, range(-5, 5)) + processes = executor._processes + call_queue = executor._call_queue + queue_management_thread = executor._queue_management_thread + executor.shutdown(wait=False) + + # Make sure that all the executor resources were properly cleaned by + # the shutdown process + queue_management_thread.join() + for p in processes.values(): + p.join() + call_queue.join_thread() + + # Make sure the results were all computed before the executor got + # shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + create_executor_tests(ProcessPoolShutdownTest, executor_mixins=(ProcessPoolForkMixin, @@ -1086,6 +1130,32 @@ class ExecutorDeadlockTest: with self.assertRaises(BrokenProcessPool): f.result() + def test_shutdown_deadlock_pickle(self): + # Test that the pool calling shutdown with wait=False does not cause + # a deadlock if a task fails at pickle after the shutdown call. + # Reported in bpo-39104. + self.executor.shutdown(wait=True) + with self.executor_type(max_workers=2, + mp_context=get_context(self.ctx)) as executor: + self.executor = executor # Allow clean up in fail_on_deadlock + + # Start the executor and get the queue_management_thread to collect + # the threads and avoid dangling thread that should be cleaned up + # asynchronously. + executor.submit(id, 42).result() + queue_manager = executor._queue_management_thread + + # Submit a task that fails at pickle and shutdown the executor + # without waiting + f = executor.submit(id, ErrorAtPickle()) + executor.shutdown(wait=False) + with self.assertRaises(PicklingError): + f.result() + + # Make sure the executor is eventually shutdown and do not leave + # dangling threads + queue_manager.join() + create_executor_tests(ExecutorDeadlockTest, executor_mixins=(ProcessPoolForkMixin, |