diff options
author | Victor Stinner <vstinner@python.org> | 2023-08-26 23:20:33 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-26 23:20:33 (GMT) |
commit | bba9aa60ae7e80fbc4bd8c74554b54fc1e35a1e6 (patch) | |
tree | 58e14573a598daab71f2a21de860e11f98788f34 /Lib/test/test_concurrent_futures | |
parent | ce37fbc778cbe330cc69e588ed89c7c8123d622c (diff) | |
download | cpython-bba9aa60ae7e80fbc4bd8c74554b54fc1e35a1e6.zip cpython-bba9aa60ae7e80fbc4bd8c74554b54fc1e35a1e6.tar.gz cpython-bba9aa60ae7e80fbc4bd8c74554b54fc1e35a1e6.tar.bz2 |
[3.12] gh-108388: Convert test_concurrent_futures to package (#108401) (#108443)
gh-108388: Convert test_concurrent_futures to package (#108401)
Convert test_concurrent_futures to a package of sub-tests.
(cherry picked from commit aa6f787faa4bc45006da4dc2f942fb9b82c98836)
Diffstat (limited to 'Lib/test/test_concurrent_futures')
-rw-r--r-- | Lib/test/test_concurrent_futures/__init__.py | 16 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures/executor.py | 107 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures/test_as_completed.py | 115 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures/test_deadlock.py | 253 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures/test_future.py | 291 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures/test_init.py | 117 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures/test_process_pool.py | 202 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures/test_shutdown.py | 343 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures/test_thread_pool.py | 98 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures/test_wait.py | 161 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures/util.py | 141 |
11 files changed, 1844 insertions, 0 deletions
diff --git a/Lib/test/test_concurrent_futures/__init__.py b/Lib/test/test_concurrent_futures/__init__.py new file mode 100644 index 0000000..430fa93 --- /dev/null +++ b/Lib/test/test_concurrent_futures/__init__.py @@ -0,0 +1,16 @@ +import os.path +import unittest +from test import support +from test.support import import_helper + +# Skip tests if _multiprocessing wasn't built. +import_helper.import_module('_multiprocessing') + +if support.check_sanitizer(address=True, memory=True): + # gh-90791: Skip the test because it is too slow when Python is built + # with ASAN/MSAN: between 5 and 20 minutes on GitHub Actions. + raise unittest.SkipTest("test too slow on ASAN/MSAN build") + + +def load_tests(*args): + return support.load_package_tests(os.path.dirname(__file__), *args) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py new file mode 100644 index 0000000..36278bd --- /dev/null +++ b/Lib/test/test_concurrent_futures/executor.py @@ -0,0 +1,107 @@ +import threading +import time +import weakref +from concurrent import futures +from test import support + + +def mul(x, y): + return x * y + +def capture(*args, **kwargs): + return args, kwargs + + +class MyObject(object): + def my_method(self): + pass + + +def make_dummy_object(_): + return MyObject() + + +class ExecutorTest: + # Executor.shutdown() and context manager usage is tested by + # ExecutorShutdownTest. + def test_submit(self): + future = self.executor.submit(pow, 2, 8) + self.assertEqual(256, future.result()) + + def test_submit_keyword(self): + future = self.executor.submit(mul, 2, y=8) + self.assertEqual(16, future.result()) + future = self.executor.submit(capture, 1, self=2, fn=3) + self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3})) + with self.assertRaises(TypeError): + self.executor.submit(fn=capture, arg=1) + with self.assertRaises(TypeError): + self.executor.submit(arg=1) + + def test_map(self): + self.assertEqual( + list(self.executor.map(pow, range(10), range(10))), + list(map(pow, range(10), range(10)))) + + self.assertEqual( + list(self.executor.map(pow, range(10), range(10), chunksize=3)), + list(map(pow, range(10), range(10)))) + + def test_map_exception(self): + i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) + self.assertEqual(i.__next__(), (0, 1)) + self.assertEqual(i.__next__(), (0, 1)) + self.assertRaises(ZeroDivisionError, i.__next__) + + def test_map_timeout(self): + results = [] + try: + for i in self.executor.map(time.sleep, + [0, 0, 6], + timeout=5): + results.append(i) + except futures.TimeoutError: + pass + else: + self.fail('expected TimeoutError') + + self.assertEqual([None, None], results) + + def test_shutdown_race_issue12456(self): + # Issue #12456: race condition at shutdown where trying to post a + # sentinel in the call queue blocks (the queue is full while processes + # have exited). + self.executor.map(str, [2] * (self.worker_count + 1)) + self.executor.shutdown() + + @support.cpython_only + def test_no_stale_references(self): + # Issue #16284: check that the executors don't unnecessarily hang onto + # references. + my_object = MyObject() + my_object_collected = threading.Event() + my_object_callback = weakref.ref( + my_object, lambda obj: my_object_collected.set()) + # Deliberately discarding the future. + self.executor.submit(my_object.my_method) + del my_object + + collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT) + self.assertTrue(collected, + "Stale reference not collected within timeout.") + + def test_max_workers_negative(self): + for number in (0, -1): + with self.assertRaisesRegex(ValueError, + "max_workers must be greater " + "than 0"): + self.executor_type(max_workers=number) + + def test_free_reference(self): + # Issue #14406: Result iterator should not keep an internal + # reference to result objects. + for obj in self.executor.map(make_dummy_object, range(10)): + wr = weakref.ref(obj) + del obj + support.gc_collect() # For PyPy or other GCs. + self.assertIsNone(wr()) diff --git a/Lib/test/test_concurrent_futures/test_as_completed.py b/Lib/test/test_concurrent_futures/test_as_completed.py new file mode 100644 index 0000000..2b3bec8 --- /dev/null +++ b/Lib/test/test_concurrent_futures/test_as_completed.py @@ -0,0 +1,115 @@ +import itertools +import time +import unittest +import weakref +from concurrent import futures +from concurrent.futures._base import ( + CANCELLED_AND_NOTIFIED, FINISHED, Future) + +from test import support + +from .util import ( + PENDING_FUTURE, RUNNING_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, + create_future, create_executor_tests, setup_module) + + +def mul(x, y): + return x * y + + +class AsCompletedTests: + def test_no_timeout(self): + future1 = self.executor.submit(mul, 2, 21) + future2 = self.executor.submit(mul, 7, 6) + + completed = set(futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2])) + self.assertEqual(set( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2]), + completed) + + def test_future_times_out(self): + """Test ``futures.as_completed`` timing out before + completing it's final future.""" + already_completed = {CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE} + + for timeout in (0, 0.01): + with self.subTest(timeout): + + future = self.executor.submit(time.sleep, 0.1) + completed_futures = set() + try: + for f in futures.as_completed( + already_completed | {future}, + timeout + ): + completed_futures.add(f) + except futures.TimeoutError: + pass + + # Check that ``future`` wasn't completed. + self.assertEqual(completed_futures, already_completed) + + def test_duplicate_futures(self): + # Issue 20367. Duplicate futures should not raise exceptions or give + # duplicate responses. + # Issue #31641: accept arbitrary iterables. + future1 = self.executor.submit(time.sleep, 2) + completed = [ + f for f in futures.as_completed(itertools.repeat(future1, 3)) + ] + self.assertEqual(len(completed), 1) + + def test_free_reference_yielded_future(self): + # Issue #14406: Generator should not keep references + # to finished futures. + futures_list = [Future() for _ in range(8)] + futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) + futures_list.append(create_future(state=FINISHED, result=42)) + + with self.assertRaises(futures.TimeoutError): + for future in futures.as_completed(futures_list, timeout=0): + futures_list.remove(future) + wr = weakref.ref(future) + del future + support.gc_collect() # For PyPy or other GCs. + self.assertIsNone(wr()) + + futures_list[0].set_result("test") + for future in futures.as_completed(futures_list): + futures_list.remove(future) + wr = weakref.ref(future) + del future + support.gc_collect() # For PyPy or other GCs. + self.assertIsNone(wr()) + if futures_list: + futures_list[0].set_result("test") + + def test_correct_timeout_exception_msg(self): + futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE, + RUNNING_FUTURE, SUCCESSFUL_FUTURE] + + with self.assertRaises(futures.TimeoutError) as cm: + list(futures.as_completed(futures_list, timeout=0)) + + self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished') + + +create_executor_tests(globals(), AsCompletedTests) + + +def setUpModule(): + setup_module() + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_concurrent_futures/test_deadlock.py b/Lib/test/test_concurrent_futures/test_deadlock.py new file mode 100644 index 0000000..6b78b36 --- /dev/null +++ b/Lib/test/test_concurrent_futures/test_deadlock.py @@ -0,0 +1,253 @@ +import contextlib +import sys +import time +import unittest +from pickle import PicklingError +from concurrent import futures +from concurrent.futures.process import BrokenProcessPool + +from test import support + +from .util import ( + create_executor_tests, setup_module, + ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin) + + +def _crash(delay=None): + """Induces a segfault.""" + if delay: + time.sleep(delay) + import faulthandler + faulthandler.disable() + faulthandler._sigsegv() + + +def _crash_with_data(data): + """Induces a segfault with dummy data in input.""" + _crash() + + +def _exit(): + """Induces a sys exit with exitcode 1.""" + sys.exit(1) + + +def _raise_error(Err): + """Function that raises an Exception in process.""" + raise Err() + + +def _raise_error_ignore_stderr(Err): + """Function that raises an Exception in process and ignores stderr.""" + import io + sys.stderr = io.StringIO() + raise Err() + + +def _return_instance(cls): + """Function that returns a instance of cls.""" + return cls() + + +class CrashAtPickle(object): + """Bad object that triggers a segfault at pickling time.""" + def __reduce__(self): + _crash() + + +class CrashAtUnpickle(object): + """Bad object that triggers a segfault at unpickling time.""" + def __reduce__(self): + return _crash, () + + +class ExitAtPickle(object): + """Bad object that triggers a process exit at pickling time.""" + def __reduce__(self): + _exit() + + +class ExitAtUnpickle(object): + """Bad object that triggers a process exit at unpickling time.""" + def __reduce__(self): + return _exit, () + + +class ErrorAtPickle(object): + """Bad object that triggers an error at pickling time.""" + def __reduce__(self): + from pickle import PicklingError + raise PicklingError("Error in pickle") + + +class ErrorAtUnpickle(object): + """Bad object that triggers an error at unpickling time.""" + def __reduce__(self): + from pickle import UnpicklingError + return _raise_error_ignore_stderr, (UnpicklingError, ) + + +class ExecutorDeadlockTest: + TIMEOUT = support.SHORT_TIMEOUT + + def _fail_on_deadlock(self, executor): + # If we did not recover before TIMEOUT seconds, consider that the + # executor is in a deadlock state and forcefully clean all its + # composants. + import faulthandler + from tempfile import TemporaryFile + with TemporaryFile(mode="w+") as f: + faulthandler.dump_traceback(file=f) + f.seek(0) + tb = f.read() + for p in executor._processes.values(): + p.terminate() + # This should be safe to call executor.shutdown here as all possible + # deadlocks should have been broken. + executor.shutdown(wait=True) + print(f"\nTraceback:\n {tb}", file=sys.__stderr__) + self.fail(f"Executor deadlock:\n\n{tb}") + + + def _check_crash(self, error, func, *args, ignore_stderr=False): + # test for deadlock caused by crashes in a pool + self.executor.shutdown(wait=True) + + executor = self.executor_type( + max_workers=2, mp_context=self.get_context()) + res = executor.submit(func, *args) + + if ignore_stderr: + cm = support.captured_stderr() + else: + cm = contextlib.nullcontext() + + try: + with self.assertRaises(error): + with cm: + res.result(timeout=self.TIMEOUT) + except futures.TimeoutError: + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + self._fail_on_deadlock(executor) + executor.shutdown(wait=True) + + def test_error_at_task_pickle(self): + # Check problem occurring while pickling a task in + # the task_handler thread + self._check_crash(PicklingError, id, ErrorAtPickle()) + + def test_exit_at_task_unpickle(self): + # Check problem occurring while unpickling a task on workers + self._check_crash(BrokenProcessPool, id, ExitAtUnpickle()) + + def test_error_at_task_unpickle(self): + # Check problem occurring while unpickling a task on workers + self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle()) + + def test_crash_at_task_unpickle(self): + # Check problem occurring while unpickling a task on workers + self._check_crash(BrokenProcessPool, id, CrashAtUnpickle()) + + def test_crash_during_func_exec_on_worker(self): + # Check problem occurring during func execution on workers + self._check_crash(BrokenProcessPool, _crash) + + def test_exit_during_func_exec_on_worker(self): + # Check problem occurring during func execution on workers + self._check_crash(SystemExit, _exit) + + def test_error_during_func_exec_on_worker(self): + # Check problem occurring during func execution on workers + self._check_crash(RuntimeError, _raise_error, RuntimeError) + + def test_crash_during_result_pickle_on_worker(self): + # Check problem occurring while pickling a task result + # on workers + self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle) + + def test_exit_during_result_pickle_on_worker(self): + # Check problem occurring while pickling a task result + # on workers + self._check_crash(SystemExit, _return_instance, ExitAtPickle) + + def test_error_during_result_pickle_on_worker(self): + # Check problem occurring while pickling a task result + # on workers + self._check_crash(PicklingError, _return_instance, ErrorAtPickle) + + def test_error_during_result_unpickle_in_result_handler(self): + # Check problem occurring while unpickling a task in + # the result_handler thread + self._check_crash(BrokenProcessPool, + _return_instance, ErrorAtUnpickle, + ignore_stderr=True) + + def test_exit_during_result_unpickle_in_result_handler(self): + # Check problem occurring while unpickling a task in + # the result_handler thread + self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle) + + def test_shutdown_deadlock(self): + # Test that the pool calling shutdown do not cause deadlock + # if a worker fails after the shutdown call. + self.executor.shutdown(wait=True) + with self.executor_type(max_workers=2, + mp_context=self.get_context()) as executor: + self.executor = executor # Allow clean up in fail_on_deadlock + f = executor.submit(_crash, delay=.1) + executor.shutdown(wait=True) + with self.assertRaises(BrokenProcessPool): + f.result() + + def test_shutdown_deadlock_pickle(self): + # Test that the pool calling shutdown with wait=False does not cause + # a deadlock if a task fails at pickle after the shutdown call. + # Reported in bpo-39104. + self.executor.shutdown(wait=True) + with self.executor_type(max_workers=2, + mp_context=self.get_context()) as executor: + self.executor = executor # Allow clean up in fail_on_deadlock + + # Start the executor and get the executor_manager_thread to collect + # the threads and avoid dangling thread that should be cleaned up + # asynchronously. + executor.submit(id, 42).result() + executor_manager = executor._executor_manager_thread + + # Submit a task that fails at pickle and shutdown the executor + # without waiting + f = executor.submit(id, ErrorAtPickle()) + executor.shutdown(wait=False) + with self.assertRaises(PicklingError): + f.result() + + # Make sure the executor is eventually shutdown and do not leave + # dangling threads + executor_manager.join() + + def test_crash_big_data(self): + # Test that there is a clean exception instad of a deadlock when a + # child process crashes while some data is being written into the + # queue. + # https://github.com/python/cpython/issues/94777 + self.executor.shutdown(wait=True) + data = "a" * support.PIPE_MAX_SIZE + with self.executor_type(max_workers=2, + mp_context=self.get_context()) as executor: + self.executor = executor # Allow clean up in fail_on_deadlock + with self.assertRaises(BrokenProcessPool): + list(executor.map(_crash_with_data, [data] * 10)) + + +create_executor_tests(globals(), ExecutorDeadlockTest, + executor_mixins=(ProcessPoolForkMixin, + ProcessPoolForkserverMixin, + ProcessPoolSpawnMixin)) + +def setUpModule(): + setup_module() + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_concurrent_futures/test_future.py b/Lib/test/test_concurrent_futures/test_future.py new file mode 100644 index 0000000..4066ea1 --- /dev/null +++ b/Lib/test/test_concurrent_futures/test_future.py @@ -0,0 +1,291 @@ +import threading +import time +import unittest +from concurrent import futures +from concurrent.futures._base import ( + PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) + +from test import support + +from .util import ( + PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, + BaseTestCase, create_future, setup_module) + + +class FutureTests(BaseTestCase): + def test_done_callback_with_result(self): + callback_result = None + def fn(callback_future): + nonlocal callback_result + callback_result = callback_future.result() + + f = Future() + f.add_done_callback(fn) + f.set_result(5) + self.assertEqual(5, callback_result) + + def test_done_callback_with_exception(self): + callback_exception = None + def fn(callback_future): + nonlocal callback_exception + callback_exception = callback_future.exception() + + f = Future() + f.add_done_callback(fn) + f.set_exception(Exception('test')) + self.assertEqual(('test',), callback_exception.args) + + def test_done_callback_with_cancel(self): + was_cancelled = None + def fn(callback_future): + nonlocal was_cancelled + was_cancelled = callback_future.cancelled() + + f = Future() + f.add_done_callback(fn) + self.assertTrue(f.cancel()) + self.assertTrue(was_cancelled) + + def test_done_callback_raises(self): + with support.captured_stderr() as stderr: + raising_was_called = False + fn_was_called = False + + def raising_fn(callback_future): + nonlocal raising_was_called + raising_was_called = True + raise Exception('doh!') + + def fn(callback_future): + nonlocal fn_was_called + fn_was_called = True + + f = Future() + f.add_done_callback(raising_fn) + f.add_done_callback(fn) + f.set_result(5) + self.assertTrue(raising_was_called) + self.assertTrue(fn_was_called) + self.assertIn('Exception: doh!', stderr.getvalue()) + + def test_done_callback_already_successful(self): + callback_result = None + def fn(callback_future): + nonlocal callback_result + callback_result = callback_future.result() + + f = Future() + f.set_result(5) + f.add_done_callback(fn) + self.assertEqual(5, callback_result) + + def test_done_callback_already_failed(self): + callback_exception = None + def fn(callback_future): + nonlocal callback_exception + callback_exception = callback_future.exception() + + f = Future() + f.set_exception(Exception('test')) + f.add_done_callback(fn) + self.assertEqual(('test',), callback_exception.args) + + def test_done_callback_already_cancelled(self): + was_cancelled = None + def fn(callback_future): + nonlocal was_cancelled + was_cancelled = callback_future.cancelled() + + f = Future() + self.assertTrue(f.cancel()) + f.add_done_callback(fn) + self.assertTrue(was_cancelled) + + def test_done_callback_raises_already_succeeded(self): + with support.captured_stderr() as stderr: + def raising_fn(callback_future): + raise Exception('doh!') + + f = Future() + + # Set the result first to simulate a future that runs instantly, + # effectively allowing the callback to be run immediately. + f.set_result(5) + f.add_done_callback(raising_fn) + + self.assertIn('exception calling callback for', stderr.getvalue()) + self.assertIn('doh!', stderr.getvalue()) + + + def test_repr(self): + self.assertRegex(repr(PENDING_FUTURE), + '<Future at 0x[0-9a-f]+ state=pending>') + self.assertRegex(repr(RUNNING_FUTURE), + '<Future at 0x[0-9a-f]+ state=running>') + self.assertRegex(repr(CANCELLED_FUTURE), + '<Future at 0x[0-9a-f]+ state=cancelled>') + self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE), + '<Future at 0x[0-9a-f]+ state=cancelled>') + self.assertRegex( + repr(EXCEPTION_FUTURE), + '<Future at 0x[0-9a-f]+ state=finished raised OSError>') + self.assertRegex( + repr(SUCCESSFUL_FUTURE), + '<Future at 0x[0-9a-f]+ state=finished returned int>') + + def test_cancel(self): + f1 = create_future(state=PENDING) + f2 = create_future(state=RUNNING) + f3 = create_future(state=CANCELLED) + f4 = create_future(state=CANCELLED_AND_NOTIFIED) + f5 = create_future(state=FINISHED, exception=OSError()) + f6 = create_future(state=FINISHED, result=5) + + self.assertTrue(f1.cancel()) + self.assertEqual(f1._state, CANCELLED) + + self.assertFalse(f2.cancel()) + self.assertEqual(f2._state, RUNNING) + + self.assertTrue(f3.cancel()) + self.assertEqual(f3._state, CANCELLED) + + self.assertTrue(f4.cancel()) + self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) + + self.assertFalse(f5.cancel()) + self.assertEqual(f5._state, FINISHED) + + self.assertFalse(f6.cancel()) + self.assertEqual(f6._state, FINISHED) + + def test_cancelled(self): + self.assertFalse(PENDING_FUTURE.cancelled()) + self.assertFalse(RUNNING_FUTURE.cancelled()) + self.assertTrue(CANCELLED_FUTURE.cancelled()) + self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) + self.assertFalse(EXCEPTION_FUTURE.cancelled()) + self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) + + def test_done(self): + self.assertFalse(PENDING_FUTURE.done()) + self.assertFalse(RUNNING_FUTURE.done()) + self.assertTrue(CANCELLED_FUTURE.done()) + self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) + self.assertTrue(EXCEPTION_FUTURE.done()) + self.assertTrue(SUCCESSFUL_FUTURE.done()) + + def test_running(self): + self.assertFalse(PENDING_FUTURE.running()) + self.assertTrue(RUNNING_FUTURE.running()) + self.assertFalse(CANCELLED_FUTURE.running()) + self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) + self.assertFalse(EXCEPTION_FUTURE.running()) + self.assertFalse(SUCCESSFUL_FUTURE.running()) + + def test_result_with_timeout(self): + self.assertRaises(futures.TimeoutError, + PENDING_FUTURE.result, timeout=0) + self.assertRaises(futures.TimeoutError, + RUNNING_FUTURE.result, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_FUTURE.result, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) + self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0) + self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) + + def test_result_with_success(self): + # TODO(brian@sweetapp.com): This test is timing dependent. + def notification(): + # Wait until the main thread is waiting for the result. + time.sleep(1) + f1.set_result(42) + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertEqual(f1.result(timeout=5), 42) + t.join() + + def test_result_with_cancel(self): + # TODO(brian@sweetapp.com): This test is timing dependent. + def notification(): + # Wait until the main thread is waiting for the result. + time.sleep(1) + f1.cancel() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertRaises(futures.CancelledError, + f1.result, timeout=support.SHORT_TIMEOUT) + t.join() + + def test_exception_with_timeout(self): + self.assertRaises(futures.TimeoutError, + PENDING_FUTURE.exception, timeout=0) + self.assertRaises(futures.TimeoutError, + RUNNING_FUTURE.exception, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_FUTURE.exception, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) + self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), + OSError)) + self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) + + def test_exception_with_success(self): + def notification(): + # Wait until the main thread is waiting for the exception. + time.sleep(1) + with f1._condition: + f1._state = FINISHED + f1._exception = OSError() + f1._condition.notify_all() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError)) + t.join() + + def test_multiple_set_result(self): + f = create_future(state=PENDING) + f.set_result(1) + + with self.assertRaisesRegex( + futures.InvalidStateError, + 'FINISHED: <Future at 0x[0-9a-f]+ ' + 'state=finished returned int>' + ): + f.set_result(2) + + self.assertTrue(f.done()) + self.assertEqual(f.result(), 1) + + def test_multiple_set_exception(self): + f = create_future(state=PENDING) + e = ValueError() + f.set_exception(e) + + with self.assertRaisesRegex( + futures.InvalidStateError, + 'FINISHED: <Future at 0x[0-9a-f]+ ' + 'state=finished raised ValueError>' + ): + f.set_exception(Exception()) + + self.assertEqual(f.exception(), e) + + +def setUpModule(): + setup_module() + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_concurrent_futures/test_init.py b/Lib/test/test_concurrent_futures/test_init.py new file mode 100644 index 0000000..ce01e0f --- /dev/null +++ b/Lib/test/test_concurrent_futures/test_init.py @@ -0,0 +1,117 @@ +import contextlib +import logging +import queue +import time +import unittest +from concurrent.futures._base import BrokenExecutor +from logging.handlers import QueueHandler + +from test import support + +from .util import ExecutorMixin, create_executor_tests, setup_module + + +INITIALIZER_STATUS = 'uninitialized' + +def init(x): + global INITIALIZER_STATUS + INITIALIZER_STATUS = x + +def get_init_status(): + return INITIALIZER_STATUS + +def init_fail(log_queue=None): + if log_queue is not None: + logger = logging.getLogger('concurrent.futures') + logger.addHandler(QueueHandler(log_queue)) + logger.setLevel('CRITICAL') + logger.propagate = False + time.sleep(0.1) # let some futures be scheduled + raise ValueError('error in initializer') + + +class InitializerMixin(ExecutorMixin): + worker_count = 2 + + def setUp(self): + global INITIALIZER_STATUS + INITIALIZER_STATUS = 'uninitialized' + self.executor_kwargs = dict(initializer=init, + initargs=('initialized',)) + super().setUp() + + def test_initializer(self): + futures = [self.executor.submit(get_init_status) + for _ in range(self.worker_count)] + + for f in futures: + self.assertEqual(f.result(), 'initialized') + + +class FailingInitializerMixin(ExecutorMixin): + worker_count = 2 + + def setUp(self): + if hasattr(self, "ctx"): + # Pass a queue to redirect the child's logging output + self.mp_context = self.get_context() + self.log_queue = self.mp_context.Queue() + self.executor_kwargs = dict(initializer=init_fail, + initargs=(self.log_queue,)) + else: + # In a thread pool, the child shares our logging setup + # (see _assert_logged()) + self.mp_context = None + self.log_queue = None + self.executor_kwargs = dict(initializer=init_fail) + super().setUp() + + def test_initializer(self): + with self._assert_logged('ValueError: error in initializer'): + try: + future = self.executor.submit(get_init_status) + except BrokenExecutor: + # Perhaps the executor is already broken + pass + else: + with self.assertRaises(BrokenExecutor): + future.result() + + # At some point, the executor should break + for _ in support.sleeping_retry(support.SHORT_TIMEOUT, + "executor not broken"): + if self.executor._broken: + break + + # ... and from this point submit() is guaranteed to fail + with self.assertRaises(BrokenExecutor): + self.executor.submit(get_init_status) + + @contextlib.contextmanager + def _assert_logged(self, msg): + if self.log_queue is not None: + yield + output = [] + try: + while True: + output.append(self.log_queue.get_nowait().getMessage()) + except queue.Empty: + pass + else: + with self.assertLogs('concurrent.futures', 'CRITICAL') as cm: + yield + output = cm.output + self.assertTrue(any(msg in line for line in output), + output) + + +create_executor_tests(globals(), InitializerMixin) +create_executor_tests(globals(), FailingInitializerMixin) + + +def setUpModule(): + setup_module() + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py new file mode 100644 index 0000000..7763a49 --- /dev/null +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -0,0 +1,202 @@ +import os +import sys +import time +import unittest +from concurrent import futures +from concurrent.futures.process import BrokenProcessPool + +from test import support +from test.support import hashlib_helper + +from .executor import ExecutorTest, mul +from .util import ( + ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin, + create_executor_tests, setup_module) + + +class EventfulGCObj(): + def __init__(self, mgr): + self.event = mgr.Event() + + def __del__(self): + self.event.set() + + +class ProcessPoolExecutorTest(ExecutorTest): + + @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit') + def test_max_workers_too_large(self): + with self.assertRaisesRegex(ValueError, + "max_workers must be <= 61"): + futures.ProcessPoolExecutor(max_workers=62) + + def test_killed_child(self): + # When a child process is abruptly terminated, the whole pool gets + # "broken". + futures = [self.executor.submit(time.sleep, 3)] + # Get one of the processes, and terminate (kill) it + p = next(iter(self.executor._processes.values())) + p.terminate() + for fut in futures: + self.assertRaises(BrokenProcessPool, fut.result) + # Submitting other jobs fails as well. + self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) + + def test_map_chunksize(self): + def bad_map(): + list(self.executor.map(pow, range(40), range(40), chunksize=-1)) + + ref = list(map(pow, range(40), range(40))) + self.assertEqual( + list(self.executor.map(pow, range(40), range(40), chunksize=6)), + ref) + self.assertEqual( + list(self.executor.map(pow, range(40), range(40), chunksize=50)), + ref) + self.assertEqual( + list(self.executor.map(pow, range(40), range(40), chunksize=40)), + ref) + self.assertRaises(ValueError, bad_map) + + @classmethod + def _test_traceback(cls): + raise RuntimeError(123) # some comment + + def test_traceback(self): + # We want ensure that the traceback from the child process is + # contained in the traceback raised in the main process. + future = self.executor.submit(self._test_traceback) + with self.assertRaises(Exception) as cm: + future.result() + + exc = cm.exception + self.assertIs(type(exc), RuntimeError) + self.assertEqual(exc.args, (123,)) + cause = exc.__cause__ + self.assertIs(type(cause), futures.process._RemoteTraceback) + self.assertIn('raise RuntimeError(123) # some comment', cause.tb) + + with support.captured_stderr() as f1: + try: + raise exc + except RuntimeError: + sys.excepthook(*sys.exc_info()) + self.assertIn('raise RuntimeError(123) # some comment', + f1.getvalue()) + + @hashlib_helper.requires_hashdigest('md5') + def test_ressources_gced_in_workers(self): + # Ensure that argument for a job are correctly gc-ed after the job + # is finished + mgr = self.get_context().Manager() + obj = EventfulGCObj(mgr) + future = self.executor.submit(id, obj) + future.result() + + self.assertTrue(obj.event.wait(timeout=1)) + + # explicitly destroy the object to ensure that EventfulGCObj.__del__() + # is called while manager is still running. + obj = None + support.gc_collect() + + mgr.shutdown() + mgr.join() + + def test_saturation(self): + executor = self.executor + mp_context = self.get_context() + sem = mp_context.Semaphore(0) + job_count = 15 * executor._max_workers + for _ in range(job_count): + executor.submit(sem.acquire) + self.assertEqual(len(executor._processes), executor._max_workers) + for _ in range(job_count): + sem.release() + + def test_idle_process_reuse_one(self): + executor = self.executor + assert executor._max_workers >= 4 + if self.get_context().get_start_method(allow_none=False) == "fork": + raise unittest.SkipTest("Incompatible with the fork start method.") + executor.submit(mul, 21, 2).result() + executor.submit(mul, 6, 7).result() + executor.submit(mul, 3, 14).result() + self.assertEqual(len(executor._processes), 1) + + def test_idle_process_reuse_multiple(self): + executor = self.executor + assert executor._max_workers <= 5 + if self.get_context().get_start_method(allow_none=False) == "fork": + raise unittest.SkipTest("Incompatible with the fork start method.") + executor.submit(mul, 12, 7).result() + executor.submit(mul, 33, 25) + executor.submit(mul, 25, 26).result() + executor.submit(mul, 18, 29) + executor.submit(mul, 1, 2).result() + executor.submit(mul, 0, 9) + self.assertLessEqual(len(executor._processes), 3) + executor.shutdown() + + def test_max_tasks_per_child(self): + context = self.get_context() + if context.get_start_method(allow_none=False) == "fork": + with self.assertRaises(ValueError): + self.executor_type(1, mp_context=context, max_tasks_per_child=3) + return + # not using self.executor as we need to control construction. + # arguably this could go in another class w/o that mixin. + executor = self.executor_type( + 1, mp_context=context, max_tasks_per_child=3) + f1 = executor.submit(os.getpid) + original_pid = f1.result() + # The worker pid remains the same as the worker could be reused + f2 = executor.submit(os.getpid) + self.assertEqual(f2.result(), original_pid) + self.assertEqual(len(executor._processes), 1) + f3 = executor.submit(os.getpid) + self.assertEqual(f3.result(), original_pid) + + # A new worker is spawned, with a statistically different pid, + # while the previous was reaped. + f4 = executor.submit(os.getpid) + new_pid = f4.result() + self.assertNotEqual(original_pid, new_pid) + self.assertEqual(len(executor._processes), 1) + + executor.shutdown() + + def test_max_tasks_per_child_defaults_to_spawn_context(self): + # not using self.executor as we need to control construction. + # arguably this could go in another class w/o that mixin. + executor = self.executor_type(1, max_tasks_per_child=3) + self.assertEqual(executor._mp_context.get_start_method(), "spawn") + + def test_max_tasks_early_shutdown(self): + context = self.get_context() + if context.get_start_method(allow_none=False) == "fork": + raise unittest.SkipTest("Incompatible with the fork start method.") + # not using self.executor as we need to control construction. + # arguably this could go in another class w/o that mixin. + executor = self.executor_type( + 3, mp_context=context, max_tasks_per_child=1) + futures = [] + for i in range(6): + futures.append(executor.submit(mul, i, i)) + executor.shutdown() + for i, future in enumerate(futures): + self.assertEqual(future.result(), mul(i, i)) + + +create_executor_tests(globals(), ProcessPoolExecutorTest, + executor_mixins=(ProcessPoolForkMixin, + ProcessPoolForkserverMixin, + ProcessPoolSpawnMixin)) + + +def setUpModule(): + setup_module() + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py new file mode 100644 index 0000000..45dab7a --- /dev/null +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -0,0 +1,343 @@ +import signal +import sys +import threading +import time +import unittest +from concurrent import futures + +from test import support +from test.support.script_helper import assert_python_ok + +from .util import ( + BaseTestCase, ThreadPoolMixin, ProcessPoolForkMixin, + ProcessPoolForkserverMixin, ProcessPoolSpawnMixin, + create_executor_tests, setup_module) + + +def sleep_and_print(t, msg): + time.sleep(t) + print(msg) + sys.stdout.flush() + + +class ExecutorShutdownTest: + def test_run_after_shutdown(self): + self.executor.shutdown() + self.assertRaises(RuntimeError, + self.executor.submit, + pow, 2, 5) + + def test_interpreter_shutdown(self): + # Test the atexit hook for shutdown of worker threads and processes + rc, out, err = assert_python_ok('-c', """if 1: + from concurrent.futures import {executor_type} + from time import sleep + from test.test_concurrent_futures.test_shutdown import sleep_and_print + if __name__ == "__main__": + context = '{context}' + if context == "": + t = {executor_type}(5) + else: + from multiprocessing import get_context + context = get_context(context) + t = {executor_type}(5, mp_context=context) + t.submit(sleep_and_print, 1.0, "apple") + """.format(executor_type=self.executor_type.__name__, + context=getattr(self, "ctx", ""))) + # Errors in atexit hooks don't change the process exit code, check + # stderr manually. + self.assertFalse(err) + self.assertEqual(out.strip(), b"apple") + + def test_submit_after_interpreter_shutdown(self): + # Test the atexit hook for shutdown of worker threads and processes + rc, out, err = assert_python_ok('-c', """if 1: + import atexit + @atexit.register + def run_last(): + try: + t.submit(id, None) + except RuntimeError: + print("runtime-error") + raise + from concurrent.futures import {executor_type} + if __name__ == "__main__": + context = '{context}' + if not context: + t = {executor_type}(5) + else: + from multiprocessing import get_context + context = get_context(context) + t = {executor_type}(5, mp_context=context) + t.submit(id, 42).result() + """.format(executor_type=self.executor_type.__name__, + context=getattr(self, "ctx", ""))) + # Errors in atexit hooks don't change the process exit code, check + # stderr manually. + self.assertIn("RuntimeError: cannot schedule new futures", err.decode()) + self.assertEqual(out.strip(), b"runtime-error") + + def test_hang_issue12364(self): + fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] + self.executor.shutdown() + for f in fs: + f.result() + + def test_cancel_futures(self): + assert self.worker_count <= 5, "test needs few workers" + fs = [self.executor.submit(time.sleep, .1) for _ in range(50)] + self.executor.shutdown(cancel_futures=True) + # We can't guarantee the exact number of cancellations, but we can + # guarantee that *some* were cancelled. With few workers, many of + # the submitted futures should have been cancelled. + cancelled = [fut for fut in fs if fut.cancelled()] + self.assertGreater(len(cancelled), 20) + + # Ensure the other futures were able to finish. + # Use "not fut.cancelled()" instead of "fut.done()" to include futures + # that may have been left in a pending state. + others = [fut for fut in fs if not fut.cancelled()] + for fut in others: + self.assertTrue(fut.done(), msg=f"{fut._state=}") + self.assertIsNone(fut.exception()) + + # Similar to the number of cancelled futures, we can't guarantee the + # exact number that completed. But, we can guarantee that at least + # one finished. + self.assertGreater(len(others), 0) + + def test_hang_gh83386(self): + """shutdown(wait=False) doesn't hang at exit with running futures. + + See https://github.com/python/cpython/issues/83386. + """ + if self.executor_type == futures.ProcessPoolExecutor: + raise unittest.SkipTest( + "Hangs, see https://github.com/python/cpython/issues/83386") + + rc, out, err = assert_python_ok('-c', """if True: + from concurrent.futures import {executor_type} + from test.test_concurrent_futures.test_shutdown import sleep_and_print + if __name__ == "__main__": + if {context!r}: multiprocessing.set_start_method({context!r}) + t = {executor_type}(max_workers=3) + t.submit(sleep_and_print, 1.0, "apple") + t.shutdown(wait=False) + """.format(executor_type=self.executor_type.__name__, + context=getattr(self, 'ctx', None))) + self.assertFalse(err) + self.assertEqual(out.strip(), b"apple") + + def test_hang_gh94440(self): + """shutdown(wait=True) doesn't hang when a future was submitted and + quickly canceled right before shutdown. + + See https://github.com/python/cpython/issues/94440. + """ + if not hasattr(signal, 'alarm'): + raise unittest.SkipTest( + "Tested platform does not support the alarm signal") + + def timeout(_signum, _frame): + raise RuntimeError("timed out waiting for shutdown") + + kwargs = {} + if getattr(self, 'ctx', None): + kwargs['mp_context'] = self.get_context() + executor = self.executor_type(max_workers=1, **kwargs) + executor.submit(int).result() + old_handler = signal.signal(signal.SIGALRM, timeout) + try: + signal.alarm(5) + executor.submit(int).cancel() + executor.shutdown(wait=True) + finally: + signal.alarm(0) + signal.signal(signal.SIGALRM, old_handler) + + +class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase): + def test_threads_terminate(self): + def acquire_lock(lock): + lock.acquire() + + sem = threading.Semaphore(0) + for i in range(3): + self.executor.submit(acquire_lock, sem) + self.assertEqual(len(self.executor._threads), 3) + for i in range(3): + sem.release() + self.executor.shutdown() + for t in self.executor._threads: + t.join() + + def test_context_manager_shutdown(self): + with futures.ThreadPoolExecutor(max_workers=5) as e: + executor = e + self.assertEqual(list(e.map(abs, range(-5, 5))), + [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) + + for t in executor._threads: + t.join() + + def test_del_shutdown(self): + executor = futures.ThreadPoolExecutor(max_workers=5) + res = executor.map(abs, range(-5, 5)) + threads = executor._threads + del executor + + for t in threads: + t.join() + + # Make sure the results were all computed before the + # executor got shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + + def test_shutdown_no_wait(self): + # Ensure that the executor cleans up the threads when calling + # shutdown with wait=False + executor = futures.ThreadPoolExecutor(max_workers=5) + res = executor.map(abs, range(-5, 5)) + threads = executor._threads + executor.shutdown(wait=False) + for t in threads: + t.join() + + # Make sure the results were all computed before the + # executor got shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + + + def test_thread_names_assigned(self): + executor = futures.ThreadPoolExecutor( + max_workers=5, thread_name_prefix='SpecialPool') + executor.map(abs, range(-5, 5)) + threads = executor._threads + del executor + support.gc_collect() # For PyPy or other GCs. + + for t in threads: + self.assertRegex(t.name, r'^SpecialPool_[0-4]$') + t.join() + + def test_thread_names_default(self): + executor = futures.ThreadPoolExecutor(max_workers=5) + executor.map(abs, range(-5, 5)) + threads = executor._threads + del executor + support.gc_collect() # For PyPy or other GCs. + + for t in threads: + # Ensure that our default name is reasonably sane and unique when + # no thread_name_prefix was supplied. + self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') + t.join() + + def test_cancel_futures_wait_false(self): + # Can only be reliably tested for TPE, since PPE often hangs with + # `wait=False` (even without *cancel_futures*). + rc, out, err = assert_python_ok('-c', """if True: + from concurrent.futures import ThreadPoolExecutor + from test.test_concurrent_futures.test_shutdown import sleep_and_print + if __name__ == "__main__": + t = ThreadPoolExecutor() + t.submit(sleep_and_print, .1, "apple") + t.shutdown(wait=False, cancel_futures=True) + """) + # Errors in atexit hooks don't change the process exit code, check + # stderr manually. + self.assertFalse(err) + self.assertEqual(out.strip(), b"apple") + + +class ProcessPoolShutdownTest(ExecutorShutdownTest): + def test_processes_terminate(self): + def acquire_lock(lock): + lock.acquire() + + mp_context = self.get_context() + if mp_context.get_start_method(allow_none=False) == "fork": + # fork pre-spawns, not on demand. + expected_num_processes = self.worker_count + else: + expected_num_processes = 3 + + sem = mp_context.Semaphore(0) + for _ in range(3): + self.executor.submit(acquire_lock, sem) + self.assertEqual(len(self.executor._processes), expected_num_processes) + for _ in range(3): + sem.release() + processes = self.executor._processes + self.executor.shutdown() + + for p in processes.values(): + p.join() + + def test_context_manager_shutdown(self): + with futures.ProcessPoolExecutor( + max_workers=5, mp_context=self.get_context()) as e: + processes = e._processes + self.assertEqual(list(e.map(abs, range(-5, 5))), + [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) + + for p in processes.values(): + p.join() + + def test_del_shutdown(self): + executor = futures.ProcessPoolExecutor( + max_workers=5, mp_context=self.get_context()) + res = executor.map(abs, range(-5, 5)) + executor_manager_thread = executor._executor_manager_thread + processes = executor._processes + call_queue = executor._call_queue + executor_manager_thread = executor._executor_manager_thread + del executor + support.gc_collect() # For PyPy or other GCs. + + # Make sure that all the executor resources were properly cleaned by + # the shutdown process + executor_manager_thread.join() + for p in processes.values(): + p.join() + call_queue.join_thread() + + # Make sure the results were all computed before the + # executor got shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + + def test_shutdown_no_wait(self): + # Ensure that the executor cleans up the processes when calling + # shutdown with wait=False + executor = futures.ProcessPoolExecutor( + max_workers=5, mp_context=self.get_context()) + res = executor.map(abs, range(-5, 5)) + processes = executor._processes + call_queue = executor._call_queue + executor_manager_thread = executor._executor_manager_thread + executor.shutdown(wait=False) + + # Make sure that all the executor resources were properly cleaned by + # the shutdown process + executor_manager_thread.join() + for p in processes.values(): + p.join() + call_queue.join_thread() + + # Make sure the results were all computed before the executor got + # shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + + +create_executor_tests(globals(), ProcessPoolShutdownTest, + executor_mixins=(ProcessPoolForkMixin, + ProcessPoolForkserverMixin, + ProcessPoolSpawnMixin)) + + +def setUpModule(): + setup_module() + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_concurrent_futures/test_thread_pool.py b/Lib/test/test_concurrent_futures/test_thread_pool.py new file mode 100644 index 0000000..daef7b5 --- /dev/null +++ b/Lib/test/test_concurrent_futures/test_thread_pool.py @@ -0,0 +1,98 @@ +import contextlib +import multiprocessing as mp +import multiprocessing.process +import multiprocessing.util +import os +import threading +import unittest +from concurrent import futures + +from .executor import ExecutorTest, mul +from .util import BaseTestCase, ThreadPoolMixin, setup_module + + +class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase): + def test_map_submits_without_iteration(self): + """Tests verifying issue 11777.""" + finished = [] + def record_finished(n): + finished.append(n) + + self.executor.map(record_finished, range(10)) + self.executor.shutdown(wait=True) + self.assertCountEqual(finished, range(10)) + + def test_default_workers(self): + executor = self.executor_type() + expected = min(32, (os.cpu_count() or 1) + 4) + self.assertEqual(executor._max_workers, expected) + + def test_saturation(self): + executor = self.executor_type(4) + def acquire_lock(lock): + lock.acquire() + + sem = threading.Semaphore(0) + for i in range(15 * executor._max_workers): + executor.submit(acquire_lock, sem) + self.assertEqual(len(executor._threads), executor._max_workers) + for i in range(15 * executor._max_workers): + sem.release() + executor.shutdown(wait=True) + + def test_idle_thread_reuse(self): + executor = self.executor_type() + executor.submit(mul, 21, 2).result() + executor.submit(mul, 6, 7).result() + executor.submit(mul, 3, 14).result() + self.assertEqual(len(executor._threads), 1) + executor.shutdown(wait=True) + + @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork') + def test_hang_global_shutdown_lock(self): + # bpo-45021: _global_shutdown_lock should be reinitialized in the child + # process, otherwise it will never exit + def submit(pool): + pool.submit(submit, pool) + + with futures.ThreadPoolExecutor(1) as pool: + pool.submit(submit, pool) + + for _ in range(50): + with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers: + workers.submit(tuple) + + def test_executor_map_current_future_cancel(self): + stop_event = threading.Event() + log = [] + + def log_n_wait(ident): + log.append(f"{ident=} started") + try: + stop_event.wait() + finally: + log.append(f"{ident=} stopped") + + with self.executor_type(max_workers=1) as pool: + # submit work to saturate the pool + fut = pool.submit(log_n_wait, ident="first") + try: + with contextlib.closing( + pool.map(log_n_wait, ["second", "third"], timeout=0) + ) as gen: + with self.assertRaises(TimeoutError): + next(gen) + finally: + stop_event.set() + fut.result() + # ident='second' is cancelled as a result of raising a TimeoutError + # ident='third' is cancelled because it remained in the collection of futures + self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"]) + + +def setUpModule(): + setup_module() + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_concurrent_futures/test_wait.py b/Lib/test/test_concurrent_futures/test_wait.py new file mode 100644 index 0000000..e4bea8b --- /dev/null +++ b/Lib/test/test_concurrent_futures/test_wait.py @@ -0,0 +1,161 @@ +import sys +import threading +import time +import unittest +from concurrent import futures + +from .util import ( + CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + create_executor_tests, setup_module, + BaseTestCase, ThreadPoolMixin, + ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin) + + +def mul(x, y): + return x * y + +def sleep_and_raise(t): + time.sleep(t) + raise Exception('this is an exception') + + +class WaitTests: + def test_20369(self): + # See https://bugs.python.org/issue20369 + future = self.executor.submit(time.sleep, 1.5) + done, not_done = futures.wait([future, future], + return_when=futures.ALL_COMPLETED) + self.assertEqual({future}, done) + self.assertEqual(set(), not_done) + + + def test_first_completed(self): + future1 = self.executor.submit(mul, 21, 2) + future2 = self.executor.submit(time.sleep, 1.5) + + done, not_done = futures.wait( + [CANCELLED_FUTURE, future1, future2], + return_when=futures.FIRST_COMPLETED) + + self.assertEqual(set([future1]), done) + self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) + + def test_first_completed_some_already_completed(self): + future1 = self.executor.submit(time.sleep, 1.5) + + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], + return_when=futures.FIRST_COMPLETED) + + self.assertEqual( + set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), + finished) + self.assertEqual(set([future1]), pending) + + def test_first_exception(self): + future1 = self.executor.submit(mul, 2, 21) + future2 = self.executor.submit(sleep_and_raise, 1.5) + future3 = self.executor.submit(time.sleep, 3) + + finished, pending = futures.wait( + [future1, future2, future3], + return_when=futures.FIRST_EXCEPTION) + + self.assertEqual(set([future1, future2]), finished) + self.assertEqual(set([future3]), pending) + + def test_first_exception_some_already_complete(self): + future1 = self.executor.submit(divmod, 21, 0) + future2 = self.executor.submit(time.sleep, 1.5) + + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2], + return_when=futures.FIRST_EXCEPTION) + + self.assertEqual(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1]), finished) + self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) + + def test_first_exception_one_already_failed(self): + future1 = self.executor.submit(time.sleep, 2) + + finished, pending = futures.wait( + [EXCEPTION_FUTURE, future1], + return_when=futures.FIRST_EXCEPTION) + + self.assertEqual(set([EXCEPTION_FUTURE]), finished) + self.assertEqual(set([future1]), pending) + + def test_all_completed(self): + future1 = self.executor.submit(divmod, 2, 0) + future2 = self.executor.submit(mul, 2, 21) + + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + future1, + future2], + return_when=futures.ALL_COMPLETED) + + self.assertEqual(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + future1, + future2]), finished) + self.assertEqual(set(), pending) + + def test_timeout(self): + future1 = self.executor.submit(mul, 6, 7) + future2 = self.executor.submit(time.sleep, 6) + + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2], + timeout=5, + return_when=futures.ALL_COMPLETED) + + self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1]), finished) + self.assertEqual(set([future2]), pending) + + +class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase): + + def test_pending_calls_race(self): + # Issue #14406: multi-threaded race condition when waiting on all + # futures. + event = threading.Event() + def future_func(): + event.wait() + oldswitchinterval = sys.getswitchinterval() + sys.setswitchinterval(1e-6) + try: + fs = {self.executor.submit(future_func) for i in range(100)} + event.set() + futures.wait(fs, return_when=futures.ALL_COMPLETED) + finally: + sys.setswitchinterval(oldswitchinterval) + + +create_executor_tests(globals(), WaitTests, + executor_mixins=(ProcessPoolForkMixin, + ProcessPoolForkserverMixin, + ProcessPoolSpawnMixin)) + + +def setUpModule(): + setup_module() + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_concurrent_futures/util.py b/Lib/test/test_concurrent_futures/util.py new file mode 100644 index 0000000..dc48bec --- /dev/null +++ b/Lib/test/test_concurrent_futures/util.py @@ -0,0 +1,141 @@ +import multiprocessing +import sys +import time +import unittest +from concurrent import futures +from concurrent.futures._base import ( + PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, + ) +from concurrent.futures.process import _check_system_limits + +from test import support +from test.support import threading_helper + + +def create_future(state=PENDING, exception=None, result=None): + f = Future() + f._state = state + f._exception = exception + f._result = result + return f + + +PENDING_FUTURE = create_future(state=PENDING) +RUNNING_FUTURE = create_future(state=RUNNING) +CANCELLED_FUTURE = create_future(state=CANCELLED) +CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) +EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) +SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) + + +class BaseTestCase(unittest.TestCase): + def setUp(self): + self._thread_key = threading_helper.threading_setup() + + def tearDown(self): + support.reap_children() + threading_helper.threading_cleanup(*self._thread_key) + + +class ExecutorMixin: + worker_count = 5 + executor_kwargs = {} + + def setUp(self): + super().setUp() + + self.t1 = time.monotonic() + if hasattr(self, "ctx"): + self.executor = self.executor_type( + max_workers=self.worker_count, + mp_context=self.get_context(), + **self.executor_kwargs) + else: + self.executor = self.executor_type( + max_workers=self.worker_count, + **self.executor_kwargs) + + def tearDown(self): + self.executor.shutdown(wait=True) + self.executor = None + + dt = time.monotonic() - self.t1 + if support.verbose: + print("%.2fs" % dt, end=' ') + self.assertLess(dt, 300, "synchronization issue: test lasted too long") + + super().tearDown() + + def get_context(self): + return multiprocessing.get_context(self.ctx) + + +class ThreadPoolMixin(ExecutorMixin): + executor_type = futures.ThreadPoolExecutor + + +class ProcessPoolForkMixin(ExecutorMixin): + executor_type = futures.ProcessPoolExecutor + ctx = "fork" + + def get_context(self): + try: + _check_system_limits() + except NotImplementedError: + self.skipTest("ProcessPoolExecutor unavailable on this system") + if sys.platform == "win32": + self.skipTest("require unix system") + return super().get_context() + + +class ProcessPoolSpawnMixin(ExecutorMixin): + executor_type = futures.ProcessPoolExecutor + ctx = "spawn" + + def get_context(self): + try: + _check_system_limits() + except NotImplementedError: + self.skipTest("ProcessPoolExecutor unavailable on this system") + return super().get_context() + + +class ProcessPoolForkserverMixin(ExecutorMixin): + executor_type = futures.ProcessPoolExecutor + ctx = "forkserver" + + def get_context(self): + try: + _check_system_limits() + except NotImplementedError: + self.skipTest("ProcessPoolExecutor unavailable on this system") + if sys.platform == "win32": + self.skipTest("require unix system") + return super().get_context() + + +def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,), + executor_mixins=(ThreadPoolMixin, + ProcessPoolForkMixin, + ProcessPoolForkserverMixin, + ProcessPoolSpawnMixin)): + def strip_mixin(name): + if name.endswith(('Mixin', 'Tests')): + return name[:-5] + elif name.endswith('Test'): + return name[:-4] + else: + return name + + module = remote_globals['__name__'] + for exe in executor_mixins: + name = ("%s%sTest" + % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__))) + cls = type(name, (mixin,) + (exe,) + bases, {'__module__': module}) + remote_globals[name] = cls + + +def setup_module(): + unittest.addModuleCleanup(multiprocessing.util._cleanup_tests) + thread_info = threading_helper.threading_setup() + unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) |