summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_concurrent_futures.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_concurrent_futures.py')
-rw-r--r--Lib/test/test_concurrent_futures.py38
1 files changed, 28 insertions, 10 deletions
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 2662af7..5968980 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -19,7 +19,7 @@ import unittest
from concurrent import futures
from concurrent.futures._base import (
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
-import concurrent.futures.process
+from concurrent.futures.process import BrokenProcessPool
def create_future(state=PENDING, exception=None, result=None):
@@ -154,7 +154,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
processes = self.executor._processes
self.executor.shutdown()
- for p in processes:
+ for p in processes.values():
p.join()
def test_context_manager_shutdown(self):
@@ -163,7 +163,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
- for p in processes:
+ for p in processes.values():
p.join()
def test_del_shutdown(self):
@@ -174,7 +174,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
del executor
queue_management_thread.join()
- for p in processes:
+ for p in processes.values():
p.join()
class WaitTests(unittest.TestCase):
@@ -260,14 +260,14 @@ class WaitTests(unittest.TestCase):
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
- future2 = self.executor.submit(time.sleep, 3)
+ future2 = self.executor.submit(time.sleep, 6)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
- timeout=1.5,
+ timeout=5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
@@ -357,8 +357,8 @@ class ExecutorTest(unittest.TestCase):
results = []
try:
for i in self.executor.map(time.sleep,
- [0, 0, 3],
- timeout=1.5):
+ [0, 0, 6],
+ timeout=5):
results.append(i)
except futures.TimeoutError:
pass
@@ -369,11 +369,29 @@ class ExecutorTest(unittest.TestCase):
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
- pass
+ def test_map_submits_without_iteration(self):
+ """Tests verifying issue 11777."""
+ finished = []
+ def record_finished(n):
+ finished.append(n)
+
+ self.executor.map(record_finished, range(10))
+ self.executor.shutdown(wait=True)
+ self.assertCountEqual(finished, range(10))
class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
- pass
+ def test_killed_child(self):
+ # When a child process is abruptly terminated, the whole pool gets
+ # "broken".
+ futures = [self.executor.submit(time.sleep, 3)]
+ # Get one of the processes, and terminate (kill) it
+ p = next(iter(self.executor._processes.values()))
+ p.terminate()
+ for fut in futures:
+ self.assertRaises(BrokenProcessPool, fut.result)
+ # Submitting other jobs fails as well.
+ self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
class FutureTests(unittest.TestCase):