summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_concurrent_futures
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-08-26 23:20:33 (GMT)
committerGitHub <noreply@github.com>2023-08-26 23:20:33 (GMT)
commitbba9aa60ae7e80fbc4bd8c74554b54fc1e35a1e6 (patch)
tree58e14573a598daab71f2a21de860e11f98788f34 /Lib/test/test_concurrent_futures
parentce37fbc778cbe330cc69e588ed89c7c8123d622c (diff)
downloadcpython-bba9aa60ae7e80fbc4bd8c74554b54fc1e35a1e6.zip
cpython-bba9aa60ae7e80fbc4bd8c74554b54fc1e35a1e6.tar.gz
cpython-bba9aa60ae7e80fbc4bd8c74554b54fc1e35a1e6.tar.bz2
[3.12] gh-108388: Convert test_concurrent_futures to package (#108401) (#108443)
gh-108388: Convert test_concurrent_futures to package (#108401) Convert test_concurrent_futures to a package of sub-tests. (cherry picked from commit aa6f787faa4bc45006da4dc2f942fb9b82c98836)
Diffstat (limited to 'Lib/test/test_concurrent_futures')
-rw-r--r--Lib/test/test_concurrent_futures/__init__.py16
-rw-r--r--Lib/test/test_concurrent_futures/executor.py107
-rw-r--r--Lib/test/test_concurrent_futures/test_as_completed.py115
-rw-r--r--Lib/test/test_concurrent_futures/test_deadlock.py253
-rw-r--r--Lib/test/test_concurrent_futures/test_future.py291
-rw-r--r--Lib/test/test_concurrent_futures/test_init.py117
-rw-r--r--Lib/test/test_concurrent_futures/test_process_pool.py202
-rw-r--r--Lib/test/test_concurrent_futures/test_shutdown.py343
-rw-r--r--Lib/test/test_concurrent_futures/test_thread_pool.py98
-rw-r--r--Lib/test/test_concurrent_futures/test_wait.py161
-rw-r--r--Lib/test/test_concurrent_futures/util.py141
11 files changed, 1844 insertions, 0 deletions
diff --git a/Lib/test/test_concurrent_futures/__init__.py b/Lib/test/test_concurrent_futures/__init__.py
new file mode 100644
index 0000000..430fa93
--- /dev/null
+++ b/Lib/test/test_concurrent_futures/__init__.py
@@ -0,0 +1,16 @@
+import os.path
+import unittest
+from test import support
+from test.support import import_helper
+
+# Skip tests if _multiprocessing wasn't built.
+import_helper.import_module('_multiprocessing')
+
+if support.check_sanitizer(address=True, memory=True):
+ # gh-90791: Skip the test because it is too slow when Python is built
+ # with ASAN/MSAN: between 5 and 20 minutes on GitHub Actions.
+ raise unittest.SkipTest("test too slow on ASAN/MSAN build")
+
+
+def load_tests(*args):
+ return support.load_package_tests(os.path.dirname(__file__), *args)
diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py
new file mode 100644
index 0000000..36278bd
--- /dev/null
+++ b/Lib/test/test_concurrent_futures/executor.py
@@ -0,0 +1,107 @@
+import threading
+import time
+import weakref
+from concurrent import futures
+from test import support
+
+
+def mul(x, y):
+ return x * y
+
+def capture(*args, **kwargs):
+ return args, kwargs
+
+
+class MyObject(object):
+ def my_method(self):
+ pass
+
+
+def make_dummy_object(_):
+ return MyObject()
+
+
+class ExecutorTest:
+ # Executor.shutdown() and context manager usage is tested by
+ # ExecutorShutdownTest.
+ def test_submit(self):
+ future = self.executor.submit(pow, 2, 8)
+ self.assertEqual(256, future.result())
+
+ def test_submit_keyword(self):
+ future = self.executor.submit(mul, 2, y=8)
+ self.assertEqual(16, future.result())
+ future = self.executor.submit(capture, 1, self=2, fn=3)
+ self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
+ with self.assertRaises(TypeError):
+ self.executor.submit(fn=capture, arg=1)
+ with self.assertRaises(TypeError):
+ self.executor.submit(arg=1)
+
+ def test_map(self):
+ self.assertEqual(
+ list(self.executor.map(pow, range(10), range(10))),
+ list(map(pow, range(10), range(10))))
+
+ self.assertEqual(
+ list(self.executor.map(pow, range(10), range(10), chunksize=3)),
+ list(map(pow, range(10), range(10))))
+
+ def test_map_exception(self):
+ i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
+ self.assertEqual(i.__next__(), (0, 1))
+ self.assertEqual(i.__next__(), (0, 1))
+ self.assertRaises(ZeroDivisionError, i.__next__)
+
+ def test_map_timeout(self):
+ results = []
+ try:
+ for i in self.executor.map(time.sleep,
+ [0, 0, 6],
+ timeout=5):
+ results.append(i)
+ except futures.TimeoutError:
+ pass
+ else:
+ self.fail('expected TimeoutError')
+
+ self.assertEqual([None, None], results)
+
+ def test_shutdown_race_issue12456(self):
+ # Issue #12456: race condition at shutdown where trying to post a
+ # sentinel in the call queue blocks (the queue is full while processes
+ # have exited).
+ self.executor.map(str, [2] * (self.worker_count + 1))
+ self.executor.shutdown()
+
+ @support.cpython_only
+ def test_no_stale_references(self):
+ # Issue #16284: check that the executors don't unnecessarily hang onto
+ # references.
+ my_object = MyObject()
+ my_object_collected = threading.Event()
+ my_object_callback = weakref.ref(
+ my_object, lambda obj: my_object_collected.set())
+ # Deliberately discarding the future.
+ self.executor.submit(my_object.my_method)
+ del my_object
+
+ collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
+ self.assertTrue(collected,
+ "Stale reference not collected within timeout.")
+
+ def test_max_workers_negative(self):
+ for number in (0, -1):
+ with self.assertRaisesRegex(ValueError,
+ "max_workers must be greater "
+ "than 0"):
+ self.executor_type(max_workers=number)
+
+ def test_free_reference(self):
+ # Issue #14406: Result iterator should not keep an internal
+ # reference to result objects.
+ for obj in self.executor.map(make_dummy_object, range(10)):
+ wr = weakref.ref(obj)
+ del obj
+ support.gc_collect() # For PyPy or other GCs.
+ self.assertIsNone(wr())
diff --git a/Lib/test/test_concurrent_futures/test_as_completed.py b/Lib/test/test_concurrent_futures/test_as_completed.py
new file mode 100644
index 0000000..2b3bec8
--- /dev/null
+++ b/Lib/test/test_concurrent_futures/test_as_completed.py
@@ -0,0 +1,115 @@
+import itertools
+import time
+import unittest
+import weakref
+from concurrent import futures
+from concurrent.futures._base import (
+ CANCELLED_AND_NOTIFIED, FINISHED, Future)
+
+from test import support
+
+from .util import (
+ PENDING_FUTURE, RUNNING_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE,
+ create_future, create_executor_tests, setup_module)
+
+
+def mul(x, y):
+ return x * y
+
+
+class AsCompletedTests:
+ def test_no_timeout(self):
+ future1 = self.executor.submit(mul, 2, 21)
+ future2 = self.executor.submit(mul, 7, 6)
+
+ completed = set(futures.as_completed(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2]))
+ self.assertEqual(set(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2]),
+ completed)
+
+ def test_future_times_out(self):
+ """Test ``futures.as_completed`` timing out before
+ completing it's final future."""
+ already_completed = {CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE}
+
+ for timeout in (0, 0.01):
+ with self.subTest(timeout):
+
+ future = self.executor.submit(time.sleep, 0.1)
+ completed_futures = set()
+ try:
+ for f in futures.as_completed(
+ already_completed | {future},
+ timeout
+ ):
+ completed_futures.add(f)
+ except futures.TimeoutError:
+ pass
+
+ # Check that ``future`` wasn't completed.
+ self.assertEqual(completed_futures, already_completed)
+
+ def test_duplicate_futures(self):
+ # Issue 20367. Duplicate futures should not raise exceptions or give
+ # duplicate responses.
+ # Issue #31641: accept arbitrary iterables.
+ future1 = self.executor.submit(time.sleep, 2)
+ completed = [
+ f for f in futures.as_completed(itertools.repeat(future1, 3))
+ ]
+ self.assertEqual(len(completed), 1)
+
+ def test_free_reference_yielded_future(self):
+ # Issue #14406: Generator should not keep references
+ # to finished futures.
+ futures_list = [Future() for _ in range(8)]
+ futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
+ futures_list.append(create_future(state=FINISHED, result=42))
+
+ with self.assertRaises(futures.TimeoutError):
+ for future in futures.as_completed(futures_list, timeout=0):
+ futures_list.remove(future)
+ wr = weakref.ref(future)
+ del future
+ support.gc_collect() # For PyPy or other GCs.
+ self.assertIsNone(wr())
+
+ futures_list[0].set_result("test")
+ for future in futures.as_completed(futures_list):
+ futures_list.remove(future)
+ wr = weakref.ref(future)
+ del future
+ support.gc_collect() # For PyPy or other GCs.
+ self.assertIsNone(wr())
+ if futures_list:
+ futures_list[0].set_result("test")
+
+ def test_correct_timeout_exception_msg(self):
+ futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
+ RUNNING_FUTURE, SUCCESSFUL_FUTURE]
+
+ with self.assertRaises(futures.TimeoutError) as cm:
+ list(futures.as_completed(futures_list, timeout=0))
+
+ self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
+
+
+create_executor_tests(globals(), AsCompletedTests)
+
+
+def setUpModule():
+ setup_module()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_concurrent_futures/test_deadlock.py b/Lib/test/test_concurrent_futures/test_deadlock.py
new file mode 100644
index 0000000..6b78b36
--- /dev/null
+++ b/Lib/test/test_concurrent_futures/test_deadlock.py
@@ -0,0 +1,253 @@
+import contextlib
+import sys
+import time
+import unittest
+from pickle import PicklingError
+from concurrent import futures
+from concurrent.futures.process import BrokenProcessPool
+
+from test import support
+
+from .util import (
+ create_executor_tests, setup_module,
+ ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)
+
+
+def _crash(delay=None):
+ """Induces a segfault."""
+ if delay:
+ time.sleep(delay)
+ import faulthandler
+ faulthandler.disable()
+ faulthandler._sigsegv()
+
+
+def _crash_with_data(data):
+ """Induces a segfault with dummy data in input."""
+ _crash()
+
+
+def _exit():
+ """Induces a sys exit with exitcode 1."""
+ sys.exit(1)
+
+
+def _raise_error(Err):
+ """Function that raises an Exception in process."""
+ raise Err()
+
+
+def _raise_error_ignore_stderr(Err):
+ """Function that raises an Exception in process and ignores stderr."""
+ import io
+ sys.stderr = io.StringIO()
+ raise Err()
+
+
+def _return_instance(cls):
+ """Function that returns a instance of cls."""
+ return cls()
+
+
+class CrashAtPickle(object):
+ """Bad object that triggers a segfault at pickling time."""
+ def __reduce__(self):
+ _crash()
+
+
+class CrashAtUnpickle(object):
+ """Bad object that triggers a segfault at unpickling time."""
+ def __reduce__(self):
+ return _crash, ()
+
+
+class ExitAtPickle(object):
+ """Bad object that triggers a process exit at pickling time."""
+ def __reduce__(self):
+ _exit()
+
+
+class ExitAtUnpickle(object):
+ """Bad object that triggers a process exit at unpickling time."""
+ def __reduce__(self):
+ return _exit, ()
+
+
+class ErrorAtPickle(object):
+ """Bad object that triggers an error at pickling time."""
+ def __reduce__(self):
+ from pickle import PicklingError
+ raise PicklingError("Error in pickle")
+
+
+class ErrorAtUnpickle(object):
+ """Bad object that triggers an error at unpickling time."""
+ def __reduce__(self):
+ from pickle import UnpicklingError
+ return _raise_error_ignore_stderr, (UnpicklingError, )
+
+
+class ExecutorDeadlockTest:
+ TIMEOUT = support.SHORT_TIMEOUT
+
+ def _fail_on_deadlock(self, executor):
+ # If we did not recover before TIMEOUT seconds, consider that the
+ # executor is in a deadlock state and forcefully clean all its
+ # composants.
+ import faulthandler
+ from tempfile import TemporaryFile
+ with TemporaryFile(mode="w+") as f:
+ faulthandler.dump_traceback(file=f)
+ f.seek(0)
+ tb = f.read()
+ for p in executor._processes.values():
+ p.terminate()
+ # This should be safe to call executor.shutdown here as all possible
+ # deadlocks should have been broken.
+ executor.shutdown(wait=True)
+ print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
+ self.fail(f"Executor deadlock:\n\n{tb}")
+
+
+ def _check_crash(self, error, func, *args, ignore_stderr=False):
+ # test for deadlock caused by crashes in a pool
+ self.executor.shutdown(wait=True)
+
+ executor = self.executor_type(
+ max_workers=2, mp_context=self.get_context())
+ res = executor.submit(func, *args)
+
+ if ignore_stderr:
+ cm = support.captured_stderr()
+ else:
+ cm = contextlib.nullcontext()
+
+ try:
+ with self.assertRaises(error):
+ with cm:
+ res.result(timeout=self.TIMEOUT)
+ except futures.TimeoutError:
+ # If we did not recover before TIMEOUT seconds,
+ # consider that the executor is in a deadlock state
+ self._fail_on_deadlock(executor)
+ executor.shutdown(wait=True)
+
+ def test_error_at_task_pickle(self):
+ # Check problem occurring while pickling a task in
+ # the task_handler thread
+ self._check_crash(PicklingError, id, ErrorAtPickle())
+
+ def test_exit_at_task_unpickle(self):
+ # Check problem occurring while unpickling a task on workers
+ self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
+
+ def test_error_at_task_unpickle(self):
+ # Check problem occurring while unpickling a task on workers
+ self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
+
+ def test_crash_at_task_unpickle(self):
+ # Check problem occurring while unpickling a task on workers
+ self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
+
+ def test_crash_during_func_exec_on_worker(self):
+ # Check problem occurring during func execution on workers
+ self._check_crash(BrokenProcessPool, _crash)
+
+ def test_exit_during_func_exec_on_worker(self):
+ # Check problem occurring during func execution on workers
+ self._check_crash(SystemExit, _exit)
+
+ def test_error_during_func_exec_on_worker(self):
+ # Check problem occurring during func execution on workers
+ self._check_crash(RuntimeError, _raise_error, RuntimeError)
+
+ def test_crash_during_result_pickle_on_worker(self):
+ # Check problem occurring while pickling a task result
+ # on workers
+ self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
+
+ def test_exit_during_result_pickle_on_worker(self):
+ # Check problem occurring while pickling a task result
+ # on workers
+ self._check_crash(SystemExit, _return_instance, ExitAtPickle)
+
+ def test_error_during_result_pickle_on_worker(self):
+ # Check problem occurring while pickling a task result
+ # on workers
+ self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
+
+ def test_error_during_result_unpickle_in_result_handler(self):
+ # Check problem occurring while unpickling a task in
+ # the result_handler thread
+ self._check_crash(BrokenProcessPool,
+ _return_instance, ErrorAtUnpickle,
+ ignore_stderr=True)
+
+ def test_exit_during_result_unpickle_in_result_handler(self):
+ # Check problem occurring while unpickling a task in
+ # the result_handler thread
+ self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
+
+ def test_shutdown_deadlock(self):
+ # Test that the pool calling shutdown do not cause deadlock
+ # if a worker fails after the shutdown call.
+ self.executor.shutdown(wait=True)
+ with self.executor_type(max_workers=2,
+ mp_context=self.get_context()) as executor:
+ self.executor = executor # Allow clean up in fail_on_deadlock
+ f = executor.submit(_crash, delay=.1)
+ executor.shutdown(wait=True)
+ with self.assertRaises(BrokenProcessPool):
+ f.result()
+
+ def test_shutdown_deadlock_pickle(self):
+ # Test that the pool calling shutdown with wait=False does not cause
+ # a deadlock if a task fails at pickle after the shutdown call.
+ # Reported in bpo-39104.
+ self.executor.shutdown(wait=True)
+ with self.executor_type(max_workers=2,
+ mp_context=self.get_context()) as executor:
+ self.executor = executor # Allow clean up in fail_on_deadlock
+
+ # Start the executor and get the executor_manager_thread to collect
+ # the threads and avoid dangling thread that should be cleaned up
+ # asynchronously.
+ executor.submit(id, 42).result()
+ executor_manager = executor._executor_manager_thread
+
+ # Submit a task that fails at pickle and shutdown the executor
+ # without waiting
+ f = executor.submit(id, ErrorAtPickle())
+ executor.shutdown(wait=False)
+ with self.assertRaises(PicklingError):
+ f.result()
+
+ # Make sure the executor is eventually shutdown and do not leave
+ # dangling threads
+ executor_manager.join()
+
+ def test_crash_big_data(self):
+ # Test that there is a clean exception instad of a deadlock when a
+ # child process crashes while some data is being written into the
+ # queue.
+ # https://github.com/python/cpython/issues/94777
+ self.executor.shutdown(wait=True)
+ data = "a" * support.PIPE_MAX_SIZE
+ with self.executor_type(max_workers=2,
+ mp_context=self.get_context()) as executor:
+ self.executor = executor # Allow clean up in fail_on_deadlock
+ with self.assertRaises(BrokenProcessPool):
+ list(executor.map(_crash_with_data, [data] * 10))
+
+
+create_executor_tests(globals(), ExecutorDeadlockTest,
+ executor_mixins=(ProcessPoolForkMixin,
+ ProcessPoolForkserverMixin,
+ ProcessPoolSpawnMixin))
+
+def setUpModule():
+ setup_module()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_concurrent_futures/test_future.py b/Lib/test/test_concurrent_futures/test_future.py
new file mode 100644
index 0000000..4066ea1
--- /dev/null
+++ b/Lib/test/test_concurrent_futures/test_future.py
@@ -0,0 +1,291 @@
+import threading
+import time
+import unittest
+from concurrent import futures
+from concurrent.futures._base import (
+ PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
+
+from test import support
+
+from .util import (
+ PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE,
+ BaseTestCase, create_future, setup_module)
+
+
+class FutureTests(BaseTestCase):
+ def test_done_callback_with_result(self):
+ callback_result = None
+ def fn(callback_future):
+ nonlocal callback_result
+ callback_result = callback_future.result()
+
+ f = Future()
+ f.add_done_callback(fn)
+ f.set_result(5)
+ self.assertEqual(5, callback_result)
+
+ def test_done_callback_with_exception(self):
+ callback_exception = None
+ def fn(callback_future):
+ nonlocal callback_exception
+ callback_exception = callback_future.exception()
+
+ f = Future()
+ f.add_done_callback(fn)
+ f.set_exception(Exception('test'))
+ self.assertEqual(('test',), callback_exception.args)
+
+ def test_done_callback_with_cancel(self):
+ was_cancelled = None
+ def fn(callback_future):
+ nonlocal was_cancelled
+ was_cancelled = callback_future.cancelled()
+
+ f = Future()
+ f.add_done_callback(fn)
+ self.assertTrue(f.cancel())
+ self.assertTrue(was_cancelled)
+
+ def test_done_callback_raises(self):
+ with support.captured_stderr() as stderr:
+ raising_was_called = False
+ fn_was_called = False
+
+ def raising_fn(callback_future):
+ nonlocal raising_was_called
+ raising_was_called = True
+ raise Exception('doh!')
+
+ def fn(callback_future):
+ nonlocal fn_was_called
+ fn_was_called = True
+
+ f = Future()
+ f.add_done_callback(raising_fn)
+ f.add_done_callback(fn)
+ f.set_result(5)
+ self.assertTrue(raising_was_called)
+ self.assertTrue(fn_was_called)
+ self.assertIn('Exception: doh!', stderr.getvalue())
+
+ def test_done_callback_already_successful(self):
+ callback_result = None
+ def fn(callback_future):
+ nonlocal callback_result
+ callback_result = callback_future.result()
+
+ f = Future()
+ f.set_result(5)
+ f.add_done_callback(fn)
+ self.assertEqual(5, callback_result)
+
+ def test_done_callback_already_failed(self):
+ callback_exception = None
+ def fn(callback_future):
+ nonlocal callback_exception
+ callback_exception = callback_future.exception()
+
+ f = Future()
+ f.set_exception(Exception('test'))
+ f.add_done_callback(fn)
+ self.assertEqual(('test',), callback_exception.args)
+
+ def test_done_callback_already_cancelled(self):
+ was_cancelled = None
+ def fn(callback_future):
+ nonlocal was_cancelled
+ was_cancelled = callback_future.cancelled()
+
+ f = Future()
+ self.assertTrue(f.cancel())
+ f.add_done_callback(fn)
+ self.assertTrue(was_cancelled)
+
+ def test_done_callback_raises_already_succeeded(self):
+ with support.captured_stderr() as stderr:
+ def raising_fn(callback_future):
+ raise Exception('doh!')
+
+ f = Future()
+
+ # Set the result first to simulate a future that runs instantly,
+ # effectively allowing the callback to be run immediately.
+ f.set_result(5)
+ f.add_done_callback(raising_fn)
+
+ self.assertIn('exception calling callback for', stderr.getvalue())
+ self.assertIn('doh!', stderr.getvalue())
+
+
+ def test_repr(self):
+ self.assertRegex(repr(PENDING_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=pending>')
+ self.assertRegex(repr(RUNNING_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=running>')
+ self.assertRegex(repr(CANCELLED_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=cancelled>')
+ self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=cancelled>')
+ self.assertRegex(
+ repr(EXCEPTION_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
+ self.assertRegex(
+ repr(SUCCESSFUL_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=finished returned int>')
+
+ def test_cancel(self):
+ f1 = create_future(state=PENDING)
+ f2 = create_future(state=RUNNING)
+ f3 = create_future(state=CANCELLED)
+ f4 = create_future(state=CANCELLED_AND_NOTIFIED)
+ f5 = create_future(state=FINISHED, exception=OSError())
+ f6 = create_future(state=FINISHED, result=5)
+
+ self.assertTrue(f1.cancel())
+ self.assertEqual(f1._state, CANCELLED)
+
+ self.assertFalse(f2.cancel())
+ self.assertEqual(f2._state, RUNNING)
+
+ self.assertTrue(f3.cancel())
+ self.assertEqual(f3._state, CANCELLED)
+
+ self.assertTrue(f4.cancel())
+ self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
+
+ self.assertFalse(f5.cancel())
+ self.assertEqual(f5._state, FINISHED)
+
+ self.assertFalse(f6.cancel())
+ self.assertEqual(f6._state, FINISHED)
+
+ def test_cancelled(self):
+ self.assertFalse(PENDING_FUTURE.cancelled())
+ self.assertFalse(RUNNING_FUTURE.cancelled())
+ self.assertTrue(CANCELLED_FUTURE.cancelled())
+ self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
+ self.assertFalse(EXCEPTION_FUTURE.cancelled())
+ self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
+
+ def test_done(self):
+ self.assertFalse(PENDING_FUTURE.done())
+ self.assertFalse(RUNNING_FUTURE.done())
+ self.assertTrue(CANCELLED_FUTURE.done())
+ self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
+ self.assertTrue(EXCEPTION_FUTURE.done())
+ self.assertTrue(SUCCESSFUL_FUTURE.done())
+
+ def test_running(self):
+ self.assertFalse(PENDING_FUTURE.running())
+ self.assertTrue(RUNNING_FUTURE.running())
+ self.assertFalse(CANCELLED_FUTURE.running())
+ self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
+ self.assertFalse(EXCEPTION_FUTURE.running())
+ self.assertFalse(SUCCESSFUL_FUTURE.running())
+
+ def test_result_with_timeout(self):
+ self.assertRaises(futures.TimeoutError,
+ PENDING_FUTURE.result, timeout=0)
+ self.assertRaises(futures.TimeoutError,
+ RUNNING_FUTURE.result, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_FUTURE.result, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
+ self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
+ self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
+
+ def test_result_with_success(self):
+ # TODO(brian@sweetapp.com): This test is timing dependent.
+ def notification():
+ # Wait until the main thread is waiting for the result.
+ time.sleep(1)
+ f1.set_result(42)
+
+ f1 = create_future(state=PENDING)
+ t = threading.Thread(target=notification)
+ t.start()
+
+ self.assertEqual(f1.result(timeout=5), 42)
+ t.join()
+
+ def test_result_with_cancel(self):
+ # TODO(brian@sweetapp.com): This test is timing dependent.
+ def notification():
+ # Wait until the main thread is waiting for the result.
+ time.sleep(1)
+ f1.cancel()
+
+ f1 = create_future(state=PENDING)
+ t = threading.Thread(target=notification)
+ t.start()
+
+ self.assertRaises(futures.CancelledError,
+ f1.result, timeout=support.SHORT_TIMEOUT)
+ t.join()
+
+ def test_exception_with_timeout(self):
+ self.assertRaises(futures.TimeoutError,
+ PENDING_FUTURE.exception, timeout=0)
+ self.assertRaises(futures.TimeoutError,
+ RUNNING_FUTURE.exception, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_FUTURE.exception, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
+ self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
+ OSError))
+ self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
+
+ def test_exception_with_success(self):
+ def notification():
+ # Wait until the main thread is waiting for the exception.
+ time.sleep(1)
+ with f1._condition:
+ f1._state = FINISHED
+ f1._exception = OSError()
+ f1._condition.notify_all()
+
+ f1 = create_future(state=PENDING)
+ t = threading.Thread(target=notification)
+ t.start()
+
+ self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError))
+ t.join()
+
+ def test_multiple_set_result(self):
+ f = create_future(state=PENDING)
+ f.set_result(1)
+
+ with self.assertRaisesRegex(
+ futures.InvalidStateError,
+ 'FINISHED: <Future at 0x[0-9a-f]+ '
+ 'state=finished returned int>'
+ ):
+ f.set_result(2)
+
+ self.assertTrue(f.done())
+ self.assertEqual(f.result(), 1)
+
+ def test_multiple_set_exception(self):
+ f = create_future(state=PENDING)
+ e = ValueError()
+ f.set_exception(e)
+
+ with self.assertRaisesRegex(
+ futures.InvalidStateError,
+ 'FINISHED: <Future at 0x[0-9a-f]+ '
+ 'state=finished raised ValueError>'
+ ):
+ f.set_exception(Exception())
+
+ self.assertEqual(f.exception(), e)
+
+
+def setUpModule():
+ setup_module()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_concurrent_futures/test_init.py b/Lib/test/test_concurrent_futures/test_init.py
new file mode 100644
index 0000000..ce01e0f
--- /dev/null
+++ b/Lib/test/test_concurrent_futures/test_init.py
@@ -0,0 +1,117 @@
+import contextlib
+import logging
+import queue
+import time
+import unittest
+from concurrent.futures._base import BrokenExecutor
+from logging.handlers import QueueHandler
+
+from test import support
+
+from .util import ExecutorMixin, create_executor_tests, setup_module
+
+
+INITIALIZER_STATUS = 'uninitialized'
+
+def init(x):
+ global INITIALIZER_STATUS
+ INITIALIZER_STATUS = x
+
+def get_init_status():
+ return INITIALIZER_STATUS
+
+def init_fail(log_queue=None):
+ if log_queue is not None:
+ logger = logging.getLogger('concurrent.futures')
+ logger.addHandler(QueueHandler(log_queue))
+ logger.setLevel('CRITICAL')
+ logger.propagate = False
+ time.sleep(0.1) # let some futures be scheduled
+ raise ValueError('error in initializer')
+
+
+class InitializerMixin(ExecutorMixin):
+ worker_count = 2
+
+ def setUp(self):
+ global INITIALIZER_STATUS
+ INITIALIZER_STATUS = 'uninitialized'
+ self.executor_kwargs = dict(initializer=init,
+ initargs=('initialized',))
+ super().setUp()
+
+ def test_initializer(self):
+ futures = [self.executor.submit(get_init_status)
+ for _ in range(self.worker_count)]
+
+ for f in futures:
+ self.assertEqual(f.result(), 'initialized')
+
+
+class FailingInitializerMixin(ExecutorMixin):
+ worker_count = 2
+
+ def setUp(self):
+ if hasattr(self, "ctx"):
+ # Pass a queue to redirect the child's logging output
+ self.mp_context = self.get_context()
+ self.log_queue = self.mp_context.Queue()
+ self.executor_kwargs = dict(initializer=init_fail,
+ initargs=(self.log_queue,))
+ else:
+ # In a thread pool, the child shares our logging setup
+ # (see _assert_logged())
+ self.mp_context = None
+ self.log_queue = None
+ self.executor_kwargs = dict(initializer=init_fail)
+ super().setUp()
+
+ def test_initializer(self):
+ with self._assert_logged('ValueError: error in initializer'):
+ try:
+ future = self.executor.submit(get_init_status)
+ except BrokenExecutor:
+ # Perhaps the executor is already broken
+ pass
+ else:
+ with self.assertRaises(BrokenExecutor):
+ future.result()
+
+ # At some point, the executor should break
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT,
+ "executor not broken"):
+ if self.executor._broken:
+ break
+
+ # ... and from this point submit() is guaranteed to fail
+ with self.assertRaises(BrokenExecutor):
+ self.executor.submit(get_init_status)
+
+ @contextlib.contextmanager
+ def _assert_logged(self, msg):
+ if self.log_queue is not None:
+ yield
+ output = []
+ try:
+ while True:
+ output.append(self.log_queue.get_nowait().getMessage())
+ except queue.Empty:
+ pass
+ else:
+ with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
+ yield
+ output = cm.output
+ self.assertTrue(any(msg in line for line in output),
+ output)
+
+
+create_executor_tests(globals(), InitializerMixin)
+create_executor_tests(globals(), FailingInitializerMixin)
+
+
+def setUpModule():
+ setup_module()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py
new file mode 100644
index 0000000..7763a49
--- /dev/null
+++ b/Lib/test/test_concurrent_futures/test_process_pool.py
@@ -0,0 +1,202 @@
+import os
+import sys
+import time
+import unittest
+from concurrent import futures
+from concurrent.futures.process import BrokenProcessPool
+
+from test import support
+from test.support import hashlib_helper
+
+from .executor import ExecutorTest, mul
+from .util import (
+ ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
+ create_executor_tests, setup_module)
+
+
+class EventfulGCObj():
+ def __init__(self, mgr):
+ self.event = mgr.Event()
+
+ def __del__(self):
+ self.event.set()
+
+
+class ProcessPoolExecutorTest(ExecutorTest):
+
+ @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
+ def test_max_workers_too_large(self):
+ with self.assertRaisesRegex(ValueError,
+ "max_workers must be <= 61"):
+ futures.ProcessPoolExecutor(max_workers=62)
+
+ def test_killed_child(self):
+ # When a child process is abruptly terminated, the whole pool gets
+ # "broken".
+ futures = [self.executor.submit(time.sleep, 3)]
+ # Get one of the processes, and terminate (kill) it
+ p = next(iter(self.executor._processes.values()))
+ p.terminate()
+ for fut in futures:
+ self.assertRaises(BrokenProcessPool, fut.result)
+ # Submitting other jobs fails as well.
+ self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
+
+ def test_map_chunksize(self):
+ def bad_map():
+ list(self.executor.map(pow, range(40), range(40), chunksize=-1))
+
+ ref = list(map(pow, range(40), range(40)))
+ self.assertEqual(
+ list(self.executor.map(pow, range(40), range(40), chunksize=6)),
+ ref)
+ self.assertEqual(
+ list(self.executor.map(pow, range(40), range(40), chunksize=50)),
+ ref)
+ self.assertEqual(
+ list(self.executor.map(pow, range(40), range(40), chunksize=40)),
+ ref)
+ self.assertRaises(ValueError, bad_map)
+
+ @classmethod
+ def _test_traceback(cls):
+ raise RuntimeError(123) # some comment
+
+ def test_traceback(self):
+ # We want ensure that the traceback from the child process is
+ # contained in the traceback raised in the main process.
+ future = self.executor.submit(self._test_traceback)
+ with self.assertRaises(Exception) as cm:
+ future.result()
+
+ exc = cm.exception
+ self.assertIs(type(exc), RuntimeError)
+ self.assertEqual(exc.args, (123,))
+ cause = exc.__cause__
+ self.assertIs(type(cause), futures.process._RemoteTraceback)
+ self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
+
+ with support.captured_stderr() as f1:
+ try:
+ raise exc
+ except RuntimeError:
+ sys.excepthook(*sys.exc_info())
+ self.assertIn('raise RuntimeError(123) # some comment',
+ f1.getvalue())
+
+ @hashlib_helper.requires_hashdigest('md5')
+ def test_ressources_gced_in_workers(self):
+ # Ensure that argument for a job are correctly gc-ed after the job
+ # is finished
+ mgr = self.get_context().Manager()
+ obj = EventfulGCObj(mgr)
+ future = self.executor.submit(id, obj)
+ future.result()
+
+ self.assertTrue(obj.event.wait(timeout=1))
+
+ # explicitly destroy the object to ensure that EventfulGCObj.__del__()
+ # is called while manager is still running.
+ obj = None
+ support.gc_collect()
+
+ mgr.shutdown()
+ mgr.join()
+
+ def test_saturation(self):
+ executor = self.executor
+ mp_context = self.get_context()
+ sem = mp_context.Semaphore(0)
+ job_count = 15 * executor._max_workers
+ for _ in range(job_count):
+ executor.submit(sem.acquire)
+ self.assertEqual(len(executor._processes), executor._max_workers)
+ for _ in range(job_count):
+ sem.release()
+
+ def test_idle_process_reuse_one(self):
+ executor = self.executor
+ assert executor._max_workers >= 4
+ if self.get_context().get_start_method(allow_none=False) == "fork":
+ raise unittest.SkipTest("Incompatible with the fork start method.")
+ executor.submit(mul, 21, 2).result()
+ executor.submit(mul, 6, 7).result()
+ executor.submit(mul, 3, 14).result()
+ self.assertEqual(len(executor._processes), 1)
+
+ def test_idle_process_reuse_multiple(self):
+ executor = self.executor
+ assert executor._max_workers <= 5
+ if self.get_context().get_start_method(allow_none=False) == "fork":
+ raise unittest.SkipTest("Incompatible with the fork start method.")
+ executor.submit(mul, 12, 7).result()
+ executor.submit(mul, 33, 25)
+ executor.submit(mul, 25, 26).result()
+ executor.submit(mul, 18, 29)
+ executor.submit(mul, 1, 2).result()
+ executor.submit(mul, 0, 9)
+ self.assertLessEqual(len(executor._processes), 3)
+ executor.shutdown()
+
+ def test_max_tasks_per_child(self):
+ context = self.get_context()
+ if context.get_start_method(allow_none=False) == "fork":
+ with self.assertRaises(ValueError):
+ self.executor_type(1, mp_context=context, max_tasks_per_child=3)
+ return
+ # not using self.executor as we need to control construction.
+ # arguably this could go in another class w/o that mixin.
+ executor = self.executor_type(
+ 1, mp_context=context, max_tasks_per_child=3)
+ f1 = executor.submit(os.getpid)
+ original_pid = f1.result()
+ # The worker pid remains the same as the worker could be reused
+ f2 = executor.submit(os.getpid)
+ self.assertEqual(f2.result(), original_pid)
+ self.assertEqual(len(executor._processes), 1)
+ f3 = executor.submit(os.getpid)
+ self.assertEqual(f3.result(), original_pid)
+
+ # A new worker is spawned, with a statistically different pid,
+ # while the previous was reaped.
+ f4 = executor.submit(os.getpid)
+ new_pid = f4.result()
+ self.assertNotEqual(original_pid, new_pid)
+ self.assertEqual(len(executor._processes), 1)
+
+ executor.shutdown()
+
+ def test_max_tasks_per_child_defaults_to_spawn_context(self):
+ # not using self.executor as we need to control construction.
+ # arguably this could go in another class w/o that mixin.
+ executor = self.executor_type(1, max_tasks_per_child=3)
+ self.assertEqual(executor._mp_context.get_start_method(), "spawn")
+
+ def test_max_tasks_early_shutdown(self):
+ context = self.get_context()
+ if context.get_start_method(allow_none=False) == "fork":
+ raise unittest.SkipTest("Incompatible with the fork start method.")
+ # not using self.executor as we need to control construction.
+ # arguably this could go in another class w/o that mixin.
+ executor = self.executor_type(
+ 3, mp_context=context, max_tasks_per_child=1)
+ futures = []
+ for i in range(6):
+ futures.append(executor.submit(mul, i, i))
+ executor.shutdown()
+ for i, future in enumerate(futures):
+ self.assertEqual(future.result(), mul(i, i))
+
+
+create_executor_tests(globals(), ProcessPoolExecutorTest,
+ executor_mixins=(ProcessPoolForkMixin,
+ ProcessPoolForkserverMixin,
+ ProcessPoolSpawnMixin))
+
+
+def setUpModule():
+ setup_module()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py
new file mode 100644
index 0000000..45dab7a
--- /dev/null
+++ b/Lib/test/test_concurrent_futures/test_shutdown.py
@@ -0,0 +1,343 @@
+import signal
+import sys
+import threading
+import time
+import unittest
+from concurrent import futures
+
+from test import support
+from test.support.script_helper import assert_python_ok
+
+from .util import (
+ BaseTestCase, ThreadPoolMixin, ProcessPoolForkMixin,
+ ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
+ create_executor_tests, setup_module)
+
+
+def sleep_and_print(t, msg):
+ time.sleep(t)
+ print(msg)
+ sys.stdout.flush()
+
+
+class ExecutorShutdownTest:
+ def test_run_after_shutdown(self):
+ self.executor.shutdown()
+ self.assertRaises(RuntimeError,
+ self.executor.submit,
+ pow, 2, 5)
+
+ def test_interpreter_shutdown(self):
+ # Test the atexit hook for shutdown of worker threads and processes
+ rc, out, err = assert_python_ok('-c', """if 1:
+ from concurrent.futures import {executor_type}
+ from time import sleep
+ from test.test_concurrent_futures.test_shutdown import sleep_and_print
+ if __name__ == "__main__":
+ context = '{context}'
+ if context == "":
+ t = {executor_type}(5)
+ else:
+ from multiprocessing import get_context
+ context = get_context(context)
+ t = {executor_type}(5, mp_context=context)
+ t.submit(sleep_and_print, 1.0, "apple")
+ """.format(executor_type=self.executor_type.__name__,
+ context=getattr(self, "ctx", "")))
+ # Errors in atexit hooks don't change the process exit code, check
+ # stderr manually.
+ self.assertFalse(err)
+ self.assertEqual(out.strip(), b"apple")
+
+ def test_submit_after_interpreter_shutdown(self):
+ # Test the atexit hook for shutdown of worker threads and processes
+ rc, out, err = assert_python_ok('-c', """if 1:
+ import atexit
+ @atexit.register
+ def run_last():
+ try:
+ t.submit(id, None)
+ except RuntimeError:
+ print("runtime-error")
+ raise
+ from concurrent.futures import {executor_type}
+ if __name__ == "__main__":
+ context = '{context}'
+ if not context:
+ t = {executor_type}(5)
+ else:
+ from multiprocessing import get_context
+ context = get_context(context)
+ t = {executor_type}(5, mp_context=context)
+ t.submit(id, 42).result()
+ """.format(executor_type=self.executor_type.__name__,
+ context=getattr(self, "ctx", "")))
+ # Errors in atexit hooks don't change the process exit code, check
+ # stderr manually.
+ self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
+ self.assertEqual(out.strip(), b"runtime-error")
+
+ def test_hang_issue12364(self):
+ fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
+ self.executor.shutdown()
+ for f in fs:
+ f.result()
+
+ def test_cancel_futures(self):
+ assert self.worker_count <= 5, "test needs few workers"
+ fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
+ self.executor.shutdown(cancel_futures=True)
+ # We can't guarantee the exact number of cancellations, but we can
+ # guarantee that *some* were cancelled. With few workers, many of
+ # the submitted futures should have been cancelled.
+ cancelled = [fut for fut in fs if fut.cancelled()]
+ self.assertGreater(len(cancelled), 20)
+
+ # Ensure the other futures were able to finish.
+ # Use "not fut.cancelled()" instead of "fut.done()" to include futures
+ # that may have been left in a pending state.
+ others = [fut for fut in fs if not fut.cancelled()]
+ for fut in others:
+ self.assertTrue(fut.done(), msg=f"{fut._state=}")
+ self.assertIsNone(fut.exception())
+
+ # Similar to the number of cancelled futures, we can't guarantee the
+ # exact number that completed. But, we can guarantee that at least
+ # one finished.
+ self.assertGreater(len(others), 0)
+
+ def test_hang_gh83386(self):
+ """shutdown(wait=False) doesn't hang at exit with running futures.
+
+ See https://github.com/python/cpython/issues/83386.
+ """
+ if self.executor_type == futures.ProcessPoolExecutor:
+ raise unittest.SkipTest(
+ "Hangs, see https://github.com/python/cpython/issues/83386")
+
+ rc, out, err = assert_python_ok('-c', """if True:
+ from concurrent.futures import {executor_type}
+ from test.test_concurrent_futures.test_shutdown import sleep_and_print
+ if __name__ == "__main__":
+ if {context!r}: multiprocessing.set_start_method({context!r})
+ t = {executor_type}(max_workers=3)
+ t.submit(sleep_and_print, 1.0, "apple")
+ t.shutdown(wait=False)
+ """.format(executor_type=self.executor_type.__name__,
+ context=getattr(self, 'ctx', None)))
+ self.assertFalse(err)
+ self.assertEqual(out.strip(), b"apple")
+
+ def test_hang_gh94440(self):
+ """shutdown(wait=True) doesn't hang when a future was submitted and
+ quickly canceled right before shutdown.
+
+ See https://github.com/python/cpython/issues/94440.
+ """
+ if not hasattr(signal, 'alarm'):
+ raise unittest.SkipTest(
+ "Tested platform does not support the alarm signal")
+
+ def timeout(_signum, _frame):
+ raise RuntimeError("timed out waiting for shutdown")
+
+ kwargs = {}
+ if getattr(self, 'ctx', None):
+ kwargs['mp_context'] = self.get_context()
+ executor = self.executor_type(max_workers=1, **kwargs)
+ executor.submit(int).result()
+ old_handler = signal.signal(signal.SIGALRM, timeout)
+ try:
+ signal.alarm(5)
+ executor.submit(int).cancel()
+ executor.shutdown(wait=True)
+ finally:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, old_handler)
+
+
+class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
+ def test_threads_terminate(self):
+ def acquire_lock(lock):
+ lock.acquire()
+
+ sem = threading.Semaphore(0)
+ for i in range(3):
+ self.executor.submit(acquire_lock, sem)
+ self.assertEqual(len(self.executor._threads), 3)
+ for i in range(3):
+ sem.release()
+ self.executor.shutdown()
+ for t in self.executor._threads:
+ t.join()
+
+ def test_context_manager_shutdown(self):
+ with futures.ThreadPoolExecutor(max_workers=5) as e:
+ executor = e
+ self.assertEqual(list(e.map(abs, range(-5, 5))),
+ [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
+
+ for t in executor._threads:
+ t.join()
+
+ def test_del_shutdown(self):
+ executor = futures.ThreadPoolExecutor(max_workers=5)
+ res = executor.map(abs, range(-5, 5))
+ threads = executor._threads
+ del executor
+
+ for t in threads:
+ t.join()
+
+ # Make sure the results were all computed before the
+ # executor got shutdown.
+ assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+ def test_shutdown_no_wait(self):
+ # Ensure that the executor cleans up the threads when calling
+ # shutdown with wait=False
+ executor = futures.ThreadPoolExecutor(max_workers=5)
+ res = executor.map(abs, range(-5, 5))
+ threads = executor._threads
+ executor.shutdown(wait=False)
+ for t in threads:
+ t.join()
+
+ # Make sure the results were all computed before the
+ # executor got shutdown.
+ assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+
+ def test_thread_names_assigned(self):
+ executor = futures.ThreadPoolExecutor(
+ max_workers=5, thread_name_prefix='SpecialPool')
+ executor.map(abs, range(-5, 5))
+ threads = executor._threads
+ del executor
+ support.gc_collect() # For PyPy or other GCs.
+
+ for t in threads:
+ self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
+ t.join()
+
+ def test_thread_names_default(self):
+ executor = futures.ThreadPoolExecutor(max_workers=5)
+ executor.map(abs, range(-5, 5))
+ threads = executor._threads
+ del executor
+ support.gc_collect() # For PyPy or other GCs.
+
+ for t in threads:
+ # Ensure that our default name is reasonably sane and unique when
+ # no thread_name_prefix was supplied.
+ self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
+ t.join()
+
+ def test_cancel_futures_wait_false(self):
+ # Can only be reliably tested for TPE, since PPE often hangs with
+ # `wait=False` (even without *cancel_futures*).
+ rc, out, err = assert_python_ok('-c', """if True:
+ from concurrent.futures import ThreadPoolExecutor
+ from test.test_concurrent_futures.test_shutdown import sleep_and_print
+ if __name__ == "__main__":
+ t = ThreadPoolExecutor()
+ t.submit(sleep_and_print, .1, "apple")
+ t.shutdown(wait=False, cancel_futures=True)
+ """)
+ # Errors in atexit hooks don't change the process exit code, check
+ # stderr manually.
+ self.assertFalse(err)
+ self.assertEqual(out.strip(), b"apple")
+
+
+class ProcessPoolShutdownTest(ExecutorShutdownTest):
+ def test_processes_terminate(self):
+ def acquire_lock(lock):
+ lock.acquire()
+
+ mp_context = self.get_context()
+ if mp_context.get_start_method(allow_none=False) == "fork":
+ # fork pre-spawns, not on demand.
+ expected_num_processes = self.worker_count
+ else:
+ expected_num_processes = 3
+
+ sem = mp_context.Semaphore(0)
+ for _ in range(3):
+ self.executor.submit(acquire_lock, sem)
+ self.assertEqual(len(self.executor._processes), expected_num_processes)
+ for _ in range(3):
+ sem.release()
+ processes = self.executor._processes
+ self.executor.shutdown()
+
+ for p in processes.values():
+ p.join()
+
+ def test_context_manager_shutdown(self):
+ with futures.ProcessPoolExecutor(
+ max_workers=5, mp_context=self.get_context()) as e:
+ processes = e._processes
+ self.assertEqual(list(e.map(abs, range(-5, 5))),
+ [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
+
+ for p in processes.values():
+ p.join()
+
+ def test_del_shutdown(self):
+ executor = futures.ProcessPoolExecutor(
+ max_workers=5, mp_context=self.get_context())
+ res = executor.map(abs, range(-5, 5))
+ executor_manager_thread = executor._executor_manager_thread
+ processes = executor._processes
+ call_queue = executor._call_queue
+ executor_manager_thread = executor._executor_manager_thread
+ del executor
+ support.gc_collect() # For PyPy or other GCs.
+
+ # Make sure that all the executor resources were properly cleaned by
+ # the shutdown process
+ executor_manager_thread.join()
+ for p in processes.values():
+ p.join()
+ call_queue.join_thread()
+
+ # Make sure the results were all computed before the
+ # executor got shutdown.
+ assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+ def test_shutdown_no_wait(self):
+ # Ensure that the executor cleans up the processes when calling
+ # shutdown with wait=False
+ executor = futures.ProcessPoolExecutor(
+ max_workers=5, mp_context=self.get_context())
+ res = executor.map(abs, range(-5, 5))
+ processes = executor._processes
+ call_queue = executor._call_queue
+ executor_manager_thread = executor._executor_manager_thread
+ executor.shutdown(wait=False)
+
+ # Make sure that all the executor resources were properly cleaned by
+ # the shutdown process
+ executor_manager_thread.join()
+ for p in processes.values():
+ p.join()
+ call_queue.join_thread()
+
+ # Make sure the results were all computed before the executor got
+ # shutdown.
+ assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+
+create_executor_tests(globals(), ProcessPoolShutdownTest,
+ executor_mixins=(ProcessPoolForkMixin,
+ ProcessPoolForkserverMixin,
+ ProcessPoolSpawnMixin))
+
+
+def setUpModule():
+ setup_module()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_concurrent_futures/test_thread_pool.py b/Lib/test/test_concurrent_futures/test_thread_pool.py
new file mode 100644
index 0000000..daef7b5
--- /dev/null
+++ b/Lib/test/test_concurrent_futures/test_thread_pool.py
@@ -0,0 +1,98 @@
+import contextlib
+import multiprocessing as mp
+import multiprocessing.process
+import multiprocessing.util
+import os
+import threading
+import unittest
+from concurrent import futures
+
+from .executor import ExecutorTest, mul
+from .util import BaseTestCase, ThreadPoolMixin, setup_module
+
+
+class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
+ def test_map_submits_without_iteration(self):
+ """Tests verifying issue 11777."""
+ finished = []
+ def record_finished(n):
+ finished.append(n)
+
+ self.executor.map(record_finished, range(10))
+ self.executor.shutdown(wait=True)
+ self.assertCountEqual(finished, range(10))
+
+ def test_default_workers(self):
+ executor = self.executor_type()
+ expected = min(32, (os.cpu_count() or 1) + 4)
+ self.assertEqual(executor._max_workers, expected)
+
+ def test_saturation(self):
+ executor = self.executor_type(4)
+ def acquire_lock(lock):
+ lock.acquire()
+
+ sem = threading.Semaphore(0)
+ for i in range(15 * executor._max_workers):
+ executor.submit(acquire_lock, sem)
+ self.assertEqual(len(executor._threads), executor._max_workers)
+ for i in range(15 * executor._max_workers):
+ sem.release()
+ executor.shutdown(wait=True)
+
+ def test_idle_thread_reuse(self):
+ executor = self.executor_type()
+ executor.submit(mul, 21, 2).result()
+ executor.submit(mul, 6, 7).result()
+ executor.submit(mul, 3, 14).result()
+ self.assertEqual(len(executor._threads), 1)
+ executor.shutdown(wait=True)
+
+ @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
+ def test_hang_global_shutdown_lock(self):
+ # bpo-45021: _global_shutdown_lock should be reinitialized in the child
+ # process, otherwise it will never exit
+ def submit(pool):
+ pool.submit(submit, pool)
+
+ with futures.ThreadPoolExecutor(1) as pool:
+ pool.submit(submit, pool)
+
+ for _ in range(50):
+ with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
+ workers.submit(tuple)
+
+ def test_executor_map_current_future_cancel(self):
+ stop_event = threading.Event()
+ log = []
+
+ def log_n_wait(ident):
+ log.append(f"{ident=} started")
+ try:
+ stop_event.wait()
+ finally:
+ log.append(f"{ident=} stopped")
+
+ with self.executor_type(max_workers=1) as pool:
+ # submit work to saturate the pool
+ fut = pool.submit(log_n_wait, ident="first")
+ try:
+ with contextlib.closing(
+ pool.map(log_n_wait, ["second", "third"], timeout=0)
+ ) as gen:
+ with self.assertRaises(TimeoutError):
+ next(gen)
+ finally:
+ stop_event.set()
+ fut.result()
+ # ident='second' is cancelled as a result of raising a TimeoutError
+ # ident='third' is cancelled because it remained in the collection of futures
+ self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
+
+
+def setUpModule():
+ setup_module()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_concurrent_futures/test_wait.py b/Lib/test/test_concurrent_futures/test_wait.py
new file mode 100644
index 0000000..e4bea8b
--- /dev/null
+++ b/Lib/test/test_concurrent_futures/test_wait.py
@@ -0,0 +1,161 @@
+import sys
+import threading
+import time
+import unittest
+from concurrent import futures
+
+from .util import (
+ CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ create_executor_tests, setup_module,
+ BaseTestCase, ThreadPoolMixin,
+ ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)
+
+
+def mul(x, y):
+ return x * y
+
+def sleep_and_raise(t):
+ time.sleep(t)
+ raise Exception('this is an exception')
+
+
+class WaitTests:
+ def test_20369(self):
+ # See https://bugs.python.org/issue20369
+ future = self.executor.submit(time.sleep, 1.5)
+ done, not_done = futures.wait([future, future],
+ return_when=futures.ALL_COMPLETED)
+ self.assertEqual({future}, done)
+ self.assertEqual(set(), not_done)
+
+
+ def test_first_completed(self):
+ future1 = self.executor.submit(mul, 21, 2)
+ future2 = self.executor.submit(time.sleep, 1.5)
+
+ done, not_done = futures.wait(
+ [CANCELLED_FUTURE, future1, future2],
+ return_when=futures.FIRST_COMPLETED)
+
+ self.assertEqual(set([future1]), done)
+ self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
+
+ def test_first_completed_some_already_completed(self):
+ future1 = self.executor.submit(time.sleep, 1.5)
+
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
+ return_when=futures.FIRST_COMPLETED)
+
+ self.assertEqual(
+ set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
+ finished)
+ self.assertEqual(set([future1]), pending)
+
+ def test_first_exception(self):
+ future1 = self.executor.submit(mul, 2, 21)
+ future2 = self.executor.submit(sleep_and_raise, 1.5)
+ future3 = self.executor.submit(time.sleep, 3)
+
+ finished, pending = futures.wait(
+ [future1, future2, future3],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEqual(set([future1, future2]), finished)
+ self.assertEqual(set([future3]), pending)
+
+ def test_first_exception_some_already_complete(self):
+ future1 = self.executor.submit(divmod, 21, 0)
+ future2 = self.executor.submit(time.sleep, 1.5)
+
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1, future2],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEqual(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1]), finished)
+ self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
+
+ def test_first_exception_one_already_failed(self):
+ future1 = self.executor.submit(time.sleep, 2)
+
+ finished, pending = futures.wait(
+ [EXCEPTION_FUTURE, future1],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEqual(set([EXCEPTION_FUTURE]), finished)
+ self.assertEqual(set([future1]), pending)
+
+ def test_all_completed(self):
+ future1 = self.executor.submit(divmod, 2, 0)
+ future2 = self.executor.submit(mul, 2, 21)
+
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ future1,
+ future2],
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEqual(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ future1,
+ future2]), finished)
+ self.assertEqual(set(), pending)
+
+ def test_timeout(self):
+ future1 = self.executor.submit(mul, 6, 7)
+ future2 = self.executor.submit(time.sleep, 6)
+
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2],
+ timeout=5,
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1]), finished)
+ self.assertEqual(set([future2]), pending)
+
+
+class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
+
+ def test_pending_calls_race(self):
+ # Issue #14406: multi-threaded race condition when waiting on all
+ # futures.
+ event = threading.Event()
+ def future_func():
+ event.wait()
+ oldswitchinterval = sys.getswitchinterval()
+ sys.setswitchinterval(1e-6)
+ try:
+ fs = {self.executor.submit(future_func) for i in range(100)}
+ event.set()
+ futures.wait(fs, return_when=futures.ALL_COMPLETED)
+ finally:
+ sys.setswitchinterval(oldswitchinterval)
+
+
+create_executor_tests(globals(), WaitTests,
+ executor_mixins=(ProcessPoolForkMixin,
+ ProcessPoolForkserverMixin,
+ ProcessPoolSpawnMixin))
+
+
+def setUpModule():
+ setup_module()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_concurrent_futures/util.py b/Lib/test/test_concurrent_futures/util.py
new file mode 100644
index 0000000..dc48bec
--- /dev/null
+++ b/Lib/test/test_concurrent_futures/util.py
@@ -0,0 +1,141 @@
+import multiprocessing
+import sys
+import time
+import unittest
+from concurrent import futures
+from concurrent.futures._base import (
+ PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
+ )
+from concurrent.futures.process import _check_system_limits
+
+from test import support
+from test.support import threading_helper
+
+
+def create_future(state=PENDING, exception=None, result=None):
+ f = Future()
+ f._state = state
+ f._exception = exception
+ f._result = result
+ return f
+
+
+PENDING_FUTURE = create_future(state=PENDING)
+RUNNING_FUTURE = create_future(state=RUNNING)
+CANCELLED_FUTURE = create_future(state=CANCELLED)
+CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
+EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
+SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
+
+
+class BaseTestCase(unittest.TestCase):
+ def setUp(self):
+ self._thread_key = threading_helper.threading_setup()
+
+ def tearDown(self):
+ support.reap_children()
+ threading_helper.threading_cleanup(*self._thread_key)
+
+
+class ExecutorMixin:
+ worker_count = 5
+ executor_kwargs = {}
+
+ def setUp(self):
+ super().setUp()
+
+ self.t1 = time.monotonic()
+ if hasattr(self, "ctx"):
+ self.executor = self.executor_type(
+ max_workers=self.worker_count,
+ mp_context=self.get_context(),
+ **self.executor_kwargs)
+ else:
+ self.executor = self.executor_type(
+ max_workers=self.worker_count,
+ **self.executor_kwargs)
+
+ def tearDown(self):
+ self.executor.shutdown(wait=True)
+ self.executor = None
+
+ dt = time.monotonic() - self.t1
+ if support.verbose:
+ print("%.2fs" % dt, end=' ')
+ self.assertLess(dt, 300, "synchronization issue: test lasted too long")
+
+ super().tearDown()
+
+ def get_context(self):
+ return multiprocessing.get_context(self.ctx)
+
+
+class ThreadPoolMixin(ExecutorMixin):
+ executor_type = futures.ThreadPoolExecutor
+
+
+class ProcessPoolForkMixin(ExecutorMixin):
+ executor_type = futures.ProcessPoolExecutor
+ ctx = "fork"
+
+ def get_context(self):
+ try:
+ _check_system_limits()
+ except NotImplementedError:
+ self.skipTest("ProcessPoolExecutor unavailable on this system")
+ if sys.platform == "win32":
+ self.skipTest("require unix system")
+ return super().get_context()
+
+
+class ProcessPoolSpawnMixin(ExecutorMixin):
+ executor_type = futures.ProcessPoolExecutor
+ ctx = "spawn"
+
+ def get_context(self):
+ try:
+ _check_system_limits()
+ except NotImplementedError:
+ self.skipTest("ProcessPoolExecutor unavailable on this system")
+ return super().get_context()
+
+
+class ProcessPoolForkserverMixin(ExecutorMixin):
+ executor_type = futures.ProcessPoolExecutor
+ ctx = "forkserver"
+
+ def get_context(self):
+ try:
+ _check_system_limits()
+ except NotImplementedError:
+ self.skipTest("ProcessPoolExecutor unavailable on this system")
+ if sys.platform == "win32":
+ self.skipTest("require unix system")
+ return super().get_context()
+
+
+def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
+ executor_mixins=(ThreadPoolMixin,
+ ProcessPoolForkMixin,
+ ProcessPoolForkserverMixin,
+ ProcessPoolSpawnMixin)):
+ def strip_mixin(name):
+ if name.endswith(('Mixin', 'Tests')):
+ return name[:-5]
+ elif name.endswith('Test'):
+ return name[:-4]
+ else:
+ return name
+
+ module = remote_globals['__name__']
+ for exe in executor_mixins:
+ name = ("%s%sTest"
+ % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
+ cls = type(name, (mixin,) + (exe,) + bases, {'__module__': module})
+ remote_globals[name] = cls
+
+
+def setup_module():
+ unittest.addModuleCleanup(multiprocessing.util._cleanup_tests)
+ thread_info = threading_helper.threading_setup()
+ unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)