summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_concurrent_futures.py39
1 files changed, 39 insertions, 0 deletions
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index c8fa35e..af77f81 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -342,6 +342,29 @@ class ExecutorShutdownTest:
for f in fs:
f.result()
+ def test_cancel_futures(self):
+ executor = self.executor_type(max_workers=3)
+ fs = [executor.submit(time.sleep, .1) for _ in range(50)]
+ executor.shutdown(cancel_futures=True)
+ # We can't guarantee the exact number of cancellations, but we can
+ # guarantee that *some* were cancelled. With setting max_workers to 3,
+ # most of the submitted futures should have been cancelled.
+ cancelled = [fut for fut in fs if fut.cancelled()]
+ self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")
+
+ # Ensure the other futures were able to finish.
+ # Use "not fut.cancelled()" instead of "fut.done()" to include futures
+ # that may have been left in a pending state.
+ others = [fut for fut in fs if not fut.cancelled()]
+ for fut in others:
+ self.assertTrue(fut.done(), msg=f"{fut._state=}")
+ self.assertIsNone(fut.exception())
+
+ # Similar to the number of cancelled futures, we can't guarantee the
+ # exact number that completed. But, we can guarantee that at least
+ # one finished.
+ self.assertTrue(len(others) > 0, msg=f"{len(others)=}")
+
def test_hang_issue39205(self):
"""shutdown(wait=False) doesn't hang at exit with running futures.
@@ -422,6 +445,22 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase
self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
t.join()
+ def test_cancel_futures_wait_false(self):
+ # Can only be reliably tested for TPE, since PPE often hangs with
+ # `wait=False` (even without *cancel_futures*).
+ rc, out, err = assert_python_ok('-c', """if True:
+ from concurrent.futures import ThreadPoolExecutor
+ from test.test_concurrent_futures import sleep_and_print
+ if __name__ == "__main__":
+ t = ThreadPoolExecutor()
+ t.submit(sleep_and_print, .1, "apple")
+ t.shutdown(wait=False, cancel_futures=True)
+ """.format(executor_type=self.executor_type.__name__))
+ # Errors in atexit hooks don't change the process exit code, check
+ # stderr manually.
+ self.assertFalse(err)
+ self.assertEqual(out.strip(), b"apple")
+
class ProcessPoolShutdownTest(ExecutorShutdownTest):
def _prime_executor(self):