diff options
Diffstat (limited to 'Lib/test/test_concurrent_futures.py')
-rw-r--r-- | Lib/test/test_concurrent_futures.py | 62 |
1 files changed, 44 insertions, 18 deletions
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 372da27..04ee246 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -19,7 +19,7 @@ import unittest from concurrent import futures from concurrent.futures._base import ( PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) -import concurrent.futures.process +from concurrent.futures.process import BrokenProcessPool def create_future(state=PENDING, exception=None, result=None): @@ -34,7 +34,7 @@ PENDING_FUTURE = create_future(state=PENDING) RUNNING_FUTURE = create_future(state=RUNNING) CANCELLED_FUTURE = create_future(state=CANCELLED) CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) -EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) +EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) @@ -160,7 +160,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): processes = self.executor._processes self.executor.shutdown() - for p in processes: + for p in processes.values(): p.join() def test_context_manager_shutdown(self): @@ -169,7 +169,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - for p in processes: + for p in processes.values(): p.join() def test_del_shutdown(self): @@ -180,7 +180,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): del executor queue_management_thread.join() - for p in processes: + for p in processes.values(): p.join() class WaitTests(unittest.TestCase): @@ -266,14 +266,14 @@ class WaitTests(unittest.TestCase): def test_timeout(self): future1 = self.executor.submit(mul, 6, 7) - future2 = self.executor.submit(time.sleep, 3) + future2 = self.executor.submit(time.sleep, 6) finished, pending = futures.wait( [CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1, future2], - timeout=1.5, + timeout=5, return_when=futures.ALL_COMPLETED) self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, @@ -363,8 +363,8 @@ class ExecutorTest(unittest.TestCase): results = [] try: for i in self.executor.map(time.sleep, - [0, 0, 3], - timeout=1.5): + [0, 0, 6], + timeout=5): results.append(i) except futures.TimeoutError: pass @@ -373,13 +373,38 @@ class ExecutorTest(unittest.TestCase): self.assertEqual([None, None], results) + def test_shutdown_race_issue12456(self): + # Issue #12456: race condition at shutdown where trying to post a + # sentinel in the call queue blocks (the queue is full while processes + # have exited). + self.executor.map(str, [2] * (self.worker_count + 1)) + self.executor.shutdown() + class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): - pass + def test_map_submits_without_iteration(self): + """Tests verifying issue 11777.""" + finished = [] + def record_finished(n): + finished.append(n) + + self.executor.map(record_finished, range(10)) + self.executor.shutdown(wait=True) + self.assertCountEqual(finished, range(10)) class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): - pass + def test_killed_child(self): + # When a child process is abruptly terminated, the whole pool gets + # "broken". + futures = [self.executor.submit(time.sleep, 3)] + # Get one of the processes, and terminate (kill) it + p = next(iter(self.executor._processes.values())) + p.terminate() + for fut in futures: + self.assertRaises(BrokenProcessPool, fut.result) + # Submitting other jobs fails as well. + self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) class FutureTests(unittest.TestCase): @@ -482,7 +507,7 @@ class FutureTests(unittest.TestCase): '<Future at 0x[0-9a-f]+ state=cancelled>') self.assertRegex( repr(EXCEPTION_FUTURE), - '<Future at 0x[0-9a-f]+ state=finished raised IOError>') + '<Future at 0x[0-9a-f]+ state=finished raised OSError>') self.assertRegex( repr(SUCCESSFUL_FUTURE), '<Future at 0x[0-9a-f]+ state=finished returned int>') @@ -493,7 +518,7 @@ class FutureTests(unittest.TestCase): f2 = create_future(state=RUNNING) f3 = create_future(state=CANCELLED) f4 = create_future(state=CANCELLED_AND_NOTIFIED) - f5 = create_future(state=FINISHED, exception=IOError()) + f5 = create_future(state=FINISHED, exception=OSError()) f6 = create_future(state=FINISHED, result=5) self.assertTrue(f1.cancel()) @@ -547,7 +572,7 @@ class FutureTests(unittest.TestCase): CANCELLED_FUTURE.result, timeout=0) self.assertRaises(futures.CancelledError, CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) - self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) + self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0) self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) def test_result_with_success(self): @@ -586,7 +611,7 @@ class FutureTests(unittest.TestCase): self.assertRaises(futures.CancelledError, CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), - IOError)) + OSError)) self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) def test_exception_with_success(self): @@ -595,14 +620,14 @@ class FutureTests(unittest.TestCase): time.sleep(1) with f1._condition: f1._state = FINISHED - f1._exception = IOError() + f1._exception = OSError() f1._condition.notify_all() f1 = create_future(state=PENDING) t = threading.Thread(target=notification) t.start() - self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) + self.assertTrue(isinstance(f1.exception(timeout=5), OSError)) @test.support.reap_threads def test_main(): @@ -615,7 +640,8 @@ def test_main(): ThreadPoolAsCompletedTests, FutureTests, ProcessPoolShutdownTest, - ThreadPoolShutdownTest) + ThreadPoolShutdownTest, + ) finally: test.support.reap_children() |