from test import support from test.support import import_helper from test.support import threading_helper # Skip tests if _multiprocessing wasn't built. import_helper.import_module('_multiprocessing') from test.support import hashlib_helper from test.support.script_helper import assert_python_ok import contextlib import itertools import logging from logging.handlers import QueueHandler import os import queue import signal import sys import threading import time import unittest import weakref from pickle import PicklingError from concurrent import futures from concurrent.futures._base import ( PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, BrokenExecutor) from concurrent.futures.process import BrokenProcessPool, _check_system_limits import multiprocessing.process import multiprocessing.util import multiprocessing as mp if support.check_sanitizer(address=True, memory=True): # bpo-46633: Skip the test because it is too slow when Python is built # with ASAN/MSAN: between 5 and 20 minutes on GitHub Actions. raise unittest.SkipTest("test too slow on ASAN/MSAN build") def create_future(state=PENDING, exception=None, result=None): f = Future() f._state = state f._exception = exception f._result = result return f PENDING_FUTURE = create_future(state=PENDING) RUNNING_FUTURE = create_future(state=RUNNING) CANCELLED_FUTURE = create_future(state=CANCELLED) CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) INITIALIZER_STATUS = 'uninitialized' def mul(x, y): return x * y def capture(*args, **kwargs): return args, kwargs def sleep_and_raise(t): time.sleep(t) raise Exception('this is an exception') def sleep_and_print(t, msg): time.sleep(t) print(msg) sys.stdout.flush() def init(x): global INITIALIZER_STATUS INITIALIZER_STATUS = x def get_init_status(): return INITIALIZER_STATUS def init_fail(log_queue=None): if log_queue is not None: logger = logging.getLogger('concurrent.futures') logger.addHandler(QueueHandler(log_queue)) logger.setLevel('CRITICAL') logger.propagate = False time.sleep(0.1) # let some futures be scheduled raise ValueError('error in initializer') class MyObject(object): def my_method(self): pass class EventfulGCObj(): def __init__(self, mgr): self.event = mgr.Event() def __del__(self): self.event.set() def make_dummy_object(_): return MyObject() class BaseTestCase(unittest.TestCase): def setUp(self): self._thread_key = threading_helper.threading_setup() def tearDown(self): support.reap_children() threading_helper.threading_cleanup(*self._thread_key) class ExecutorMixin: worker_count = 5 executor_kwargs = {} def setUp(self): super().setUp() self.t1 = time.monotonic() if hasattr(self, "ctx"): self.executor = self.executor_type( max_workers=self.worker_count, mp_context=self.get_context(), **self.executor_kwargs) else: self.executor = self.executor_type( max_workers=self.worker_count, **self.executor_kwargs) def tearDown(self): self.executor.shutdown(wait=True) self.executor = None dt = time.monotonic() - self.t1 if support.verbose: print("%.2fs" % dt, end=' ') self.assertLess(dt, 300, "synchronization issue: test lasted too long") super().tearDown() def get_context(self): return mp.get_context(self.ctx) class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor class ProcessPoolForkMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor ctx = "fork" def get_context(self): try: _check_system_limits() except NotImplementedError: self.skipTest("ProcessPoolExecutor unavailable on this system") if sys.platform == "win32": self.skipTest("require unix system") return super().get_context() class ProcessPoolSpawnMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor ctx = "spawn" def get_context(self): try: _check_system_limits() except NotImplementedError: self.skipTest("ProcessPoolExecutor unavailable on this system") return super().get_context() class ProcessPoolForkserverMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor ctx = "forkserver" def get_context(self): try: _check_system_limits() except NotImplementedError: self.skipTest("ProcessPoolExecutor unavailable on this system") if sys.platform == "win32": self.skipTest("require unix system") return super().get_context() def create_executor_tests(mixin, bases=(BaseTestCase,), executor_mixins=(ThreadPoolMixin, ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)): def strip_mixin(name): if name.endswith(('Mixin', 'Tests')): return name[:-5] elif name.endswith('Test'): return name[:-4] else: return name for exe in executor_mixins: name = ("%s%sTest" % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__))) cls = type(name, (mixin,) + (exe,) + bases, {}) globals()[name] = cls class InitializerMixin(ExecutorMixin): worker_count = 2 def setUp(self): global INITIALIZER_STATUS INITIALIZER_STATUS = 'uninitialized' self.executor_kwargs = dict(initializer=init, initargs=('initialized',)) super().setUp() def test_initializer(self): futures = [self.executor.submit(get_init_status) for _ in range(self.worker_count)] for f in futures: self.assertEqual(f.result(), 'initialized') class FailingInitializerMixin(ExecutorMixin): worker_count = 2 def setUp(self): if hasattr(self, "ctx"): # Pass a queue to redirect the child's logging output self.mp_context = self.get_context() self.log_queue = self.mp_context.Queue() self.executor_kwargs = dict(initializer=init_fail, initargs=(self.log_queue,)) else: # In a thread pool, the child shares our logging setup # (see _assert_logged()) self.mp_context = None self.log_queue = None self.executor_kwargs = dict(initializer=init_fail) super().setUp() def test_initializer(self): with self._assert_logged('ValueError: error in initializer'): try: future = self.executor.submit(get_init_status) except BrokenExecutor: # Perhaps the executor is already broken pass else: with self.assertRaises(BrokenExecutor): future.result() # At some point, the executor should break for _ in support.sleeping_retry(support.SHORT_TIMEOUT, "executor not broken"): if self.executor._broken: break # ... and from this point submit() is guaranteed to fail with self.assertRaises(BrokenExecutor): self.executor.submit(get_init_status) @contextlib.contextmanager def _assert_logged(self, msg): if self.log_queue is not None: yield output = [] try: while True: output.append(self.log_queue.get_nowait().getMessage()) except queue.Empty: pass else: with self.assertLogs('concurrent.futures', 'CRITICAL') as cm: yield output = cm.output self.assertTrue(any(msg in line for line in output), output) create_executor_tests(InitializerMixin) create_executor_tests(FailingInitializerMixin) class ExecutorShutdownTest: def test_run_after_shutdown(self): self.executor.shutdown() self.assertRaises(RuntimeError, self.executor.submit, pow, 2, 5) def test_interpreter_shutdown(self): # Test the atexit hook for shutdown of worker threads and processes rc, out, err = assert_python_ok('-c', """if 1: from concurrent.futures import {executor_type} from time import sleep from test.test_concurrent_futures import sleep_and_print if __name__ == "__main__": context = '{context}' if context == "": t = {executor_type}(5) else: from multiprocessing import get_context context = get_context(context) t = {executor_type}(5, mp_context=context) t.submit(sleep_and_print, 1.0, "apple") """.format(executor_type=self.executor_type.__name__, context=getattr(self, "ctx", ""))) # Errors in atexit hooks don't change the process exit code, check # stderr manually. self.assertFalse(err) self.assertEqual(out.strip(), b"apple") def test_submit_after_interpreter_shutdown(self): # Test the atexit hook for shutdown of worker threads and processes rc, out, err = assert_python_ok('-c', """if 1: import atexit @atexit.register def run_last(): try: t.submit(id, None) except RuntimeError: print("runtime-error") raise from concurrent.futures import {executor_type} if __name__ == "__main__": context = '{context}' if not context: t = {executor_type}(5) else: from multiprocessing import get_context context = get_context(context) t = {executor_type}(5, mp_context=context) t.submit(id, 42).result() """.format(executor_type=self.executor_type.__name__, context=getattr(self, "ctx", ""))) # Errors in atexit hooks don't change the process exit code, check # stderr manually. self.assertIn("RuntimeError: cannot schedule new futures", err.decode()) self.assertEqual(out.strip(), b"runtime-error") def test_hang_issue12364(self): fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] self.executor.shutdown() for f in fs: f.result() def test_cancel_futures(self): assert self.worker_count <= 5, "test needs few workers" fs = [self.executor.submit(time.sleep, .1) for _ in range(50)] self.executor.shutdown(cancel_futures=True) # We can't guarantee the exact number of cancellations, but we can # guarantee that *some* were cancelled. With few workers, many of # the submitted futures should have been cancelled. cancelled = [fut for fut in fs if fut.cancelled()] self.assertGreater(len(cancelled), 20) # Ensure the other futures were able to finish. # Use "not fut.cancelled()" instead of "fut.done()" to include futures # that may have been left in a pending state. others = [fut for fut in fs if not fut.cancelled()] for fut in others: self.assertTrue(fut.done(), msg=f"{fut._state=}") self.assertIsNone(fut.exception()) # Similar to the number of cancelled futures, we can't guarantee the # exact number that completed. But, we can guarantee that at least # one finished. self.assertGreater(len(others), 0) def test_hang_gh83386(self): """shutdown(wait=False) doesn't hang at exit with running futures. See https://github.com/python/cpython/issues/83386. """ if self.executor_type == futures.ProcessPoolExecutor: raise unittest.SkipTest( "Hangs, see https://github.com/python/cpython/issues/83386") rc, out, err = assert_python_ok('-c', """if True: from concurrent.futures import {executor_type} from test.test_concurrent_futures import sleep_and_print if __name__ == "__main__": if {context!r}: multiprocessing.set_start_method({context!r}) t = {executor_type}(max_workers=3) t.submit(sleep_and_print, 1.0, "apple") t.shutdown(wait=False) """.format(executor_type=self.executor_type.__name__, context=getattr(self, 'ctx', None))) self.assertFalse(err) self.assertEqual(out.strip(), b"apple") def test_hang_gh94440(self): """shutdown(wait=True) doesn't hang when a future was submitted and quickly canceled right before shutdown. See https://github.com/python/cpython/issues/94440. """ if not hasattr(signal, 'alarm'): raise unittest.SkipTest( "Tested platform does not support the alarm signal") def timeout(_signum, _frame): raise RuntimeError("timed out waiting for shutdown") kwargs = {} if getattr(self, 'ctx', None): kwargs['mp_context'] = self.get_context() executor = self.executor_type(max_workers=1, **kwargs) executor.submit(int).result() old_handler = signal.signal(signal.SIGALRM, timeout) try: signal.alarm(5) executor.submit(int).cancel() executor.shutdown(wait=True) finally: signal.alarm(0) signal.signal(signal.SIGALRM, old_handler) class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase): def test_threads_terminate(self): def acquire_lock(lock): lock.acquire() sem = threading.Semaphore(0) for i in range(3): self.executor.submit(acquire_lock, sem) self.assertEqual(len(self.executor._threads), 3) for i in range(3): sem.release() self.executor.shutdown() for t in self.executor._threads: t.join() def test_context_manager_shutdown(self): with futures.ThreadPoolExecutor(max_workers=5) as e: executor = e self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) for t in executor._threads: t.join() def test_del_shutdown(self): executor = futures.ThreadPoolExecutor(max_workers=5) res = executor.map(abs, range(-5, 5)) threads = executor._threads del executor for t in threads: t.join() # Make sure the results were all computed before the # executor got shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) def test_shutdown_no_wait(self): # Ensure that the executor cleans up the threads when calling # shutdown with wait=False executor = futures.ThreadPoolExecutor(max_workers=5) res = executor.map(abs, range(-5, 5)) threads = executor._threads executor.shutdown(wait=False) for t in threads: t.join() # Make sure the results were all computed before the # executor got shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) def test_thread_names_assigned(self): executor = futures.ThreadPoolExecutor( max_workers=5, thread_name_prefix='SpecialPool') executor.map(abs, range(-5, 5)) threads = executor._threads del executor support.gc_collect() # For PyPy or other GCs. for t in threads: self.assertRegex(t.name, r'^SpecialPool_[0-4]$') t.join() def test_thread_names_default(self): executor = futures.ThreadPoolExecutor(max_workers=5) executor.map(abs, range(-5, 5)) threads = executor._threads del executor support.gc_collect() # For PyPy or other GCs. for t in threads: # Ensure that our default name is reasonably sane and unique when # no thread_name_prefix was supplied. self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') t.join() def test_cancel_futures_wait_false(self): # Can only be reliably tested for TPE, since PPE often hangs with # `wait=False` (even without *cancel_futures*). rc, out, err = assert_python_ok('-c', """if True: from concurrent.futures import ThreadPoolExecutor from test.test_concurrent_futures import sleep_and_print if __name__ == "__main__": t = ThreadPoolExecutor() t.submit(sleep_and_print, .1, "apple") t.shutdown(wait=False, cancel_futures=True) """) # Errors in atexit hooks don't change the process exit code, check # stderr manually. self.assertFalse(err) self.assertEqual(out.strip(), b"apple") class ProcessPoolShutdownTest(ExecutorShutdownTest): def test_processes_terminate(self): def acquire_lock(lock): lock.acquire() mp_context = self.get_context() if mp_context.get_start_method(allow_none=False) == "fork": # fork pre-spawns, not on demand. expected_num_processes = self.worker_count else: expected_num_processes = 3 sem = mp_context.Semaphore(0) for _ in range(3): self.executor.submit(acquire_lock, sem) self.assertEqual(len(self.executor._processes), expected_num_processes) for _ in range(3): sem.release() processes = self.executor._processes self.executor.shutdown() for p in processes.values(): p.join() def test_context_manager_shutdown(self): with futures.ProcessPoolExecutor( max_workers=5, mp_context=self.get_context()) as e: processes = e._processes self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) for p in processes.values(): p.join() def test_del_shutdown(self): executor = futures.ProcessPoolExecutor( max_workers=5, mp_context=self.get_context()) res = executor.map(abs, range(-5, 5)) executor_manager_thread = executor._executor_manager_thread processes = executor._processes call_queue = executor._call_queue executor_manager_thread = executor._executor_manager_thread del executor support.gc_collect() # For PyPy or other GCs. # Make sure that all the executor resources were properly cleaned by # the shutdown process executor_manager_thread.join() for p in processes.values(): p.join() call_queue.join_thread() # Make sure the results were all computed before the # executor got shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) def test_shutdown_no_wait(self): # Ensure that the executor cleans up the processes when calling # shutdown with wait=False executor = futures.ProcessPoolExecutor( max_workers=5, mp_context=self.get_context()) res = executor.map(abs, range(-5, 5)) processes = executor._processes call_queue = executor._call_queue executor_manager_thread = executor._executor_manager_thread executor.shutdown(wait=False) # Make sure that all the executor resources were properly cleaned by # the shutdown process executor_manager_thread.join() for p in processes.values(): p.join() call_queue.join_thread() # Make sure the results were all computed before the executor got # shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) create_executor_tests(ProcessPoolShutdownTest, executor_mixins=(ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)) class WaitTests: def test_20369(self): # See https://bugs.python.org/issue20369 future = self.executor.submit(time.sleep, 1.5) done, not_done = futures.wait([future, future], return_when=futures.ALL_COMPLETED) self.assertEqual({future}, done) self.assertEqual(set(), not_done) def test_first_completed(self): future1 = self.executor.submit(mul, 21, 2) future2 = self.executor.submit(time.sleep, 1.5) done, not_done = futures.wait( [CANCELLED_FUTURE, future1, future2], return_when=futures.FIRST_COMPLETED) self.assertEqual(set([future1]), done) self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) def test_first_completed_some_already_completed(self): future1 = self.executor.submit(time.sleep, 1.5) finished, pending = futures.wait( [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], return_when=futures.FIRST_COMPLETED) self.assertEqual( set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), finished) self.assertEqual(set([future1]), pending) def test_first_exception(self): future1 = self.executor.submit(mul, 2, 21) future2 = self.executor.submit(sleep_and_raise, 1.5) future3 = self.executor.submit(time.sleep, 3) finished, pending = futures.wait( [future1, future2, future3], return_when=futures.FIRST_EXCEPTION) self.assertEqual(set([future1, future2]), finished) self.assertEqual(set([future3]), pending) def test_first_exception_some_already_complete(self): future1 = self.executor.submit(divmod, 21, 0) future2 = self.executor.submit(time.sleep, 1.5) finished, pending = futures.wait( [SUCCESSFUL_FUTURE, CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, future1, future2], return_when=futures.FIRST_EXCEPTION) self.assertEqual(set([SUCCESSFUL_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, future1]), finished) self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) def test_first_exception_one_already_failed(self): future1 = self.executor.submit(time.sleep, 2) finished, pending = futures.wait( [EXCEPTION_FUTURE, future1], return_when=futures.FIRST_EXCEPTION) self.assertEqual(set([EXCEPTION_FUTURE]), finished) self.assertEqual(set([future1]), pending) def test_all_completed(self): future1 = self.executor.submit(divmod, 2, 0) future2 = self.executor.submit(mul, 2, 21) finished, pending = futures.wait( [SUCCESSFUL_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, future1, future2], return_when=futures.ALL_COMPLETED) self.assertEqual(set([SUCCESSFUL_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, future1, future2]), finished) self.assertEqual(set(), pending) def test_timeout(self): future1 = self.executor.submit(mul, 6, 7) future2 = self.executor.submit(time.sleep, 6) finished, pending = futures.wait( [CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1, future2], timeout=5, return_when=futures.ALL_COMPLETED) self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1]), finished) self.assertEqual(set([future2]), pending) class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase): def test_pending_calls_race(self): # Issue #14406: multi-threaded race condition when waiting on all # futures. event = threading.Event() def future_func(): event.wait() oldswitchinterval = sys.getswitchinterval() sys.setswitchinterval(1e-6) try: fs = {self.executor.submit(future_func) for i in range(100)} event.set() futures.wait(fs, return_when=futures.ALL_COMPLETED) finally: sys.setswitchinterval(oldswitchinterval) create_executor_tests(WaitTests, executor_mixins=(ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)) class AsCompletedTests: def test_no_timeout(self): future1 = self.executor.submit(mul, 2, 21) future2 = self.executor.submit(mul, 7, 6) completed = set(futures.as_completed( [CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1, future2])) self.assertEqual(set( [CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1, future2]), completed) def test_future_times_out(self): """Test ``futures.as_completed`` timing out before completing it's final future.""" already_completed = {CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE} for timeout in (0, 0.01): with self.subTest(timeout): future = self.executor.submit(time.sleep, 0.1) completed_futures = set() try: for f in futures.as_completed( already_completed | {future}, timeout ): completed_futures.add(f) except futures.TimeoutError: pass # Check that ``future`` wasn't completed. self.assertEqual(completed_futures, already_completed) def test_duplicate_futures(self): # Issue 20367. Duplicate futures should not raise exceptions or give # duplicate responses. # Issue #31641: accept arbitrary iterables. future1 = self.executor.submit(time.sleep, 2) completed = [ f for f in futures.as_completed(itertools.repeat(future1, 3)) ] self.assertEqual(len(completed), 1) def test_free_reference_yielded_future(self): # Issue #14406: Generator should not keep references # to finished futures. futures_list = [Future() for _ in range(8)] futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) futures_list.append(create_future(state=FINISHED, result=42)) with self.assertRaises(futures.TimeoutError): for future in futures.as_completed(futures_list, timeout=0): futures_list.remove(future) wr = weakref.ref(future) del future support.gc_collect() # For PyPy or other GCs. self.assertIsNone(wr()) futures_list[0].set_result("test") for future in futures.as_completed(futures_list): futures_list.remove(future) wr = weakref.ref(future) del future support.gc_collect() # For PyPy or other GCs. self.assertIsNone(wr()) if futures_list: futures_list[0].set_result("test") def test_correct_timeout_exception_msg(self): futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE, RUNNING_FUTURE, SUCCESSFUL_FUTURE] with self.assertRaises(futures.TimeoutError) as cm: list(futures.as_completed(futures_list, timeout=0)) self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished') create_executor_tests(AsCompletedTests) class ExecutorTest: # Executor.shutdown() and context manager usage is tested by # ExecutorShutdownTest. def test_submit(self): future = self.executor.submit(pow, 2, 8) self.assertEqual(256, future.result()) def test_submit_keyword(self): future = self.executor.submit(mul, 2, y=8) self.assertEqual(16, future.result()) future = self.executor.submit(capture, 1, self=2, fn=3) self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3})) with self.assertRaises(TypeError): self.executor.submit(fn=capture, arg=1) with self.assertRaises(TypeError): self.executor.submit(arg=1) def test_map(self): self.assertEqual( list(self.executor.map(pow, range(10), range(10))), list(map(pow, range(10), range(10)))) self.assertEqual( list(self.executor.map(pow, range(10), range(10), chunksize=3)), list(map(pow, range(10), range(10)))) def test_map_exception(self): i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) self.assertEqual(i.__next__(), (0, 1)) self.assertEqual(i.__next__(), (0, 1)) self.assertRaises(ZeroDivisionError, i.__next__) def test_map_timeout(self): results = [] try: for i in self.executor.map(time.sleep, [0, 0, 6], timeout=5): results.append(i) except futures.TimeoutError: pass else: self.fail('expected TimeoutError') self.assertEqual([None, None], results) def test_shutdown_race_issue12456(self): # Issue #12456: race condition at shutdown where trying to post a # sentinel in the call queue blocks (the queue is full while processes # have exited). self.executor.map(str, [2] * (self.worker_count + 1)) self.executor.shutdown() @support.cpython_only def test_no_stale_references(self): # Issue #16284: check that the executors don't unnecessarily hang onto # references. my_object = MyObject() my_object_collected = threading.Event() my_object_callback = weakref.ref( my_object, lambda obj: my_object_collected.set()) # Deliberately discarding the future. self.executor.submit(my_object.my_method) del my_object collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT) self.assertTrue(collected, "Stale reference not collected within timeout.") def test_max_workers_negative(self): for number in (0, -1): with self.assertRaisesRegex(ValueError, "max_workers must be greater " "than 0"): self.executor_type(max_workers=number) def test_free_reference(self): # Issue #14406: Result iterator should not keep an internal # reference to result objects. for obj in self.executor.map(make_dummy_object, range(10)): wr = weakref.ref(obj) del obj support.gc_collect() # For PyPy or other GCs. self.assertIsNone(wr()) class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase): def test_map_submits_without_iteration(self): """Tests verifying issue 11777.""" finished = [] def record_finished(n): finished.append(n) self.executor.map(record_finished, range(10)) self.executor.shutdown(wait=True) self.assertCountEqual(finished, range(10)) def test_default_workers(self): executor = self.executor_type() expected = min(32, (os.cpu_count() or 1) + 4) self.assertEqual(executor._max_workers, expected) def test_saturation(self): executor = self.executor_type(4) def acquire_lock(lock): lock.acquire() sem = threading.Semaphore(0) for i in range(15 * executor._max_workers): executor.submit(acquire_lock, sem) self.assertEqual(len(executor._threads), executor._max_workers) for i in range(15 * executor._max_workers): sem.release() executor.shutdown(wait=True) def test_idle_thread_reuse(self): executor = self.executor_type() executor.submit(mul, 21, 2).result() executor.submit(mul, 6, 7).result() executor.submit(mul, 3, 14).result() self.assertEqual(len(executor._threads), 1) executor.shutdown(wait=True) @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork') def test_hang_global_shutdown_lock(self): # bpo-45021: _global_shutdown_lock should be reinitialized in the child # process, otherwise it will never exit def submit(pool): pool.submit(submit, pool) with futures.ThreadPoolExecutor(1) as pool: pool.submit(submit, pool) for _ in range(50): with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers: workers.submit(tuple) def test_executor_map_current_future_cancel(self): stop_event = threading.Event() log = [] def log_n_wait(ident): log.append(f"{ident=} started") try: stop_event.wait() finally: log.append(f"{ident=} stopped") with self.executor_type(max_workers=1) as pool: # submit work to saturate the pool fut = pool.submit(log_n_wait, ident="first") try: with contextlib.closing( pool.map(log_n_wait, ["second", "third"], timeout=0) ) as gen: with self.assertRaises(TimeoutError): next(gen) finally: stop_event.set() fut.result() # ident='second' is cancelled as a result of raising a TimeoutError # ident='third' is cancelled because it remained in the collection of futures self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"]) class ProcessPoolExecutorTest(ExecutorTest): @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit') def test_max_workers_too_large(self): with self.assertRaisesRegex(ValueError, "max_workers must be <= 61"): futures.ProcessPoolExecutor(max_workers=62) def test_killed_child(self): # When a child process is abruptly terminated, the whole pool gets # "broken". futures = [self.executor.submit(time.sleep, 3)] # Get one of the processes, and terminate (kill) it p = next(iter(self.executor._processes.values())) p.terminate() for fut in futures: self.assertRaises(BrokenProcessPool, fut.result) # Submitting other jobs fails as well. self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) def test_map_chunksize(self): def bad_map(): list(self.executor.map(pow, range(40), range(40), chunksize=-1)) ref = list(map(pow, range(40), range(40))) self.assertEqual( list(self.executor.map(pow, range(40), range(40), chunksize=6)), ref) self.assertEqual( list(self.executor.map(pow, range(40), range(40), chunksize=50)), ref) self.assertEqual( list(self.executor.map(pow, range(40), range(40), chunksize=40)), ref) self.assertRaises(ValueError, bad_map) @classmethod def _test_traceback(cls): raise RuntimeError(123) # some comment def test_traceback(self): # We want ensure that the traceback from the child process is # contained in the traceback raised in the main process. future = self.executor.submit(self._test_traceback) with self.assertRaises(Exception) as cm: future.result() exc = cm.exception self.assertIs(type(exc), RuntimeError) self.assertEqual(exc.args, (123,)) cause = exc.__cause__ self.assertIs(type(cause), futures.process._RemoteTraceback) self.assertIn('raise RuntimeError(123) # some comment', cause.tb) with support.captured_stderr() as f1: try: raise exc except RuntimeError: sys.excepthook(*sys.exc_info()) self.assertIn('raise RuntimeError(123) # some comment', f1.getvalue()) @hashlib_helper.requires_hashdigest('md5') def test_ressources_gced_in_workers(self): # Ensure that argument for a job are correctly gc-ed after the job # is finished mgr = self.get_context().Manager() obj = EventfulGCObj(mgr) future = self.executor.submit(id, obj) future.result() self.assertTrue(obj.event.wait(timeout=1)) # explicitly destroy the object to ensure that EventfulGCObj.__del__() # is called while manager is still running. obj = None support.gc_collect() mgr.shutdown() mgr.join() def test_saturation(self): executor = self.executor mp_context = self.get_context() sem = mp_context.Semaphore(0) job_count = 15 * executor._max_workers for _ in range(job_count): executor.submit(sem.acquire) self.assertEqual(len(executor._processes), executor._max_workers) for _ in range(job_count): sem.release() def test_idle_process_reuse_one(self): executor = self.executor assert executor._max_workers >= 4 if self.get_context().get_start_method(allow_none=False) == "fork": raise unittest.SkipTest("Incompatible with the fork start method.") executor.submit(mul, 21, 2).result() executor.submit(mul, 6, 7).result() executor.submit(mul, 3, 14).result() self.assertEqual(len(executor._processes), 1) def test_idle_process_reuse_multiple(self): executor = self.executor assert executor._max_workers <= 5 if self.get_context().get_start_method(allow_none=False) == "fork": raise unittest.SkipTest("Incompatible with the fork start method.") executor.submit(mul, 12, 7).result() executor.submit(mul, 33, 25) executor.submit(mul, 25, 26).result() executor.submit(mul, 18, 29) executor.submit(mul, 1, 2).result() executor.submit(mul, 0, 9) self.assertLessEqual(len(executor._processes), 3) executor.shutdown() def test_max_tasks_per_child(self): context = self.get_context() if context.get_start_method(allow_none=False) == "fork": with self.assertRaises(ValueError): self.executor_type(1, mp_context=context, max_tasks_per_child=3) return # not using self.executor as we need to control construction. # arguably this could go in another class w/o that mixin. executor = self.executor_type( 1, mp_context=context, max_tasks_per_child=3) f1 = executor.submit(os.getpid) original_pid = f1.result() # The worker pid remains the same as the worker could be reused f2 = executor.submit(os.getpid) self.assertEqual(f2.result(), original_pid) self.assertEqual(len(executor._processes), 1) f3 = executor.submit(os.getpid) self.assertEqual(f3.result(), original_pid) # A new worker is spawned, with a statistically different pid, # while the previous was reaped. f4 = executor.submit(os.getpid) new_pid = f4.result() self.assertNotEqual(original_pid, new_pid) self.assertEqual(len(executor._processes), 1) executor.shutdown() def test_max_tasks_per_child_defaults_to_spawn_context(self): # not using self.executor as we need to control construction. # arguably this could go in another class w/o that mixin. executor = self.executor_type(1, max_tasks_per_child=3) self.assertEqual(executor._mp_context.get_start_method(), "spawn") def test_max_tasks_early_shutdown(self): context = self.get_context() if context.get_start_method(allow_none=False) == "fork": raise unittest.SkipTest("Incompatible with the fork start method.") # not using self.executor as we need to control construction. # arguably this could go in another class w/o that mixin. executor = self.executor_type( 3, mp_context=context, max_tasks_per_child=1) futures = [] for i in range(6): futures.append(executor.submit(mul, i, i)) executor.shutdown() for i, future in enumerate(futures): self.assertEqual(future.result(), mul(i, i)) create_executor_tests(ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)) def _crash(delay=None): """Induces a segfault.""" if delay: time.sleep(delay) import faulthandler faulthandler.disable() faulthandler._sigsegv() def _exit(): """Induces a sys exit with exitcode 1.""" sys.exit(1) def _raise_error(Err): """Function that raises an Exception in process.""" raise Err() def _raise_error_ignore_stderr(Err): """Function that raises an Exception in process and ignores stderr.""" import io sys.stderr = io.StringIO() raise Err() def _return_instance(cls): """Function that returns a instance of cls.""" return cls() class CrashAtPickle(object): """Bad object that triggers a segfault at pickling time.""" def __reduce__(self): _crash() class CrashAtUnpickle(object): """Bad object that triggers a segfault at unpickling time.""" def __reduce__(self): return _crash, () class ExitAtPickle(object): """Bad object that triggers a process exit at pickling time.""" def __reduce__(self): _exit() class ExitAtUnpickle(object): """Bad object that triggers a process exit at unpickling time.""" def __reduce__(self): return _exit, () class ErrorAtPickle(object): """Bad object that triggers an error at pickling time.""" def __reduce__(self): from pickle import PicklingError raise PicklingError("Error in pickle") class ErrorAtUnpickle(object): """Bad object that triggers an error at unpickling time.""" def __reduce__(self): from pickle import UnpicklingError return _raise_error_ignore_stderr, (UnpicklingError, ) class ExecutorDeadlockTest: TIMEOUT = support.SHORT_TIMEOUT def _fail_on_deadlock(self, executor): # If we did not recover before TIMEOUT seconds, consider that the # executor is in a deadlock state and forcefully clean all its # composants. import faulthandler from tempfile import TemporaryFile with TemporaryFile(mode="w+") as f: faulthandler.dump_traceback(file=f) f.seek(0) tb = f.read() for p in executor._processes.values(): p.terminate() # This should be safe to call executor.shutdown here as all possible # deadlocks should have been broken. executor.shutdown(wait=True) print(f"\nTraceback:\n {tb}", file=sys.__stderr__) self.fail(f"Executor deadlock:\n\n{tb}") def _check_crash(self, error, func, *args, ignore_stderr=False): # test for deadlock caused by crashes in a pool self.executor.shutdown(wait=True) executor = self.executor_type( max_workers=2, mp_context=self.get_context()) res = executor.submit(func, *args) if ignore_stderr: cm = support.captured_stderr() else: cm = contextlib.nullcontext() try: with self.assertRaises(error): with cm: res.result(timeout=self.TIMEOUT) except futures.TimeoutError: # If we did not recover before TIMEOUT seconds, # consider that the executor is in a deadlock state self._fail_on_deadlock(executor) executor.shutdown(wait=True) def test_error_at_task_pickle(self): # Check problem occurring while pickling a task in # the task_handler thread self._check_crash(PicklingError, id, ErrorAtPickle()) def test_exit_at_task_unpickle(self): # Check problem occurring while unpickling a task on workers self._check_crash(BrokenProcessPool, id, ExitAtUnpickle()) def test_error_at_task_unpickle(self): # Check problem occurring while unpickling a task on workers self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle()) def test_crash_at_task_unpickle(self): # Check problem occurring while unpickling a task on workers self._check_crash(BrokenProcessPool, id, CrashAtUnpickle()) def test_crash_during_func_exec_on_worker(self): # Check problem occurring during func execution on workers self._check_crash(BrokenProcessPool, _crash) def test_exit_during_func_exec_on_worker(self): # Check problem occurring during func execution on workers self._check_crash(SystemExit, _exit) def test_error_during_func_exec_on_worker(self): # Check problem occurring during func execution on workers self._check_crash(RuntimeError, _raise_error, RuntimeError) def test_crash_during_result_pickle_on_worker(self): # Check problem occurring while pickling a task result # on workers self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle) def test_exit_during_result_pickle_on_worker(self): # Check problem occurring while pickling a task result # on workers self._check_crash(SystemExit, _return_instance, ExitAtPickle) def test_error_during_result_pickle_on_worker(self): # Check problem occurring while pickling a task result # on workers self._check_crash(PicklingError, _return_instance, ErrorAtPickle) def test_error_during_result_unpickle_in_result_handler(self): # Check problem occurring while unpickling a task in # the result_handler thread self._check_crash(BrokenProcessPool, _return_instance, ErrorAtUnpickle, ignore_stderr=True) def test_exit_during_result_unpickle_in_result_handler(self): # Check problem occurring while unpickling a task in # the result_handler thread self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle) def test_shutdown_deadlock(self): # Test that the pool calling shutdown do not cause deadlock # if a worker fails after the shutdown call. self.executor.shutdown(wait=True) with self.executor_type(max_workers=2, mp_context=self.get_context()) as executor: self.executor = executor # Allow clean up in fail_on_deadlock f = executor.submit(_crash, delay=.1) executor.shutdown(wait=True) with self.assertRaises(BrokenProcessPool): f.result() def test_shutdown_deadlock_pickle(self): # Test that the pool calling shutdown with wait=False does not cause # a deadlock if a task fails at pickle after the shutdown call. # Reported in bpo-39104. self.executor.shutdown(wait=True) with self.executor_type(max_workers=2, mp_context=self.get_context()) as executor: self.executor = executor # Allow clean up in fail_on_deadlock # Start the executor and get the executor_manager_thread to collect # the threads and avoid dangling thread that should be cleaned up # asynchronously. executor.submit(id, 42).result() executor_manager = executor._executor_manager_thread # Submit a task that fails at pickle and shutdown the executor # without waiting f = executor.submit(id, ErrorAtPickle()) executor.shutdown(wait=False) with self.assertRaises(PicklingError): f.result() # Make sure the executor is eventually shutdown and do not leave # dangling threads executor_manager.join() create_executor_tests(ExecutorDeadlockTest, executor_mixins=(ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)) class FutureTests(BaseTestCase): def test_done_callback_with_result(self): callback_result = None def fn(callback_future): nonlocal callback_result callback_result = callback_future.result() f = Future() f.add_done_callback(fn) f.set_result(5) self.assertEqual(5, callback_result) def test_done_callback_with_exception(self): callback_exception = None def fn(callback_future): nonlocal callback_exception callback_exception = callback_future.exception() f = Future() f.add_done_callback(fn) f.set_exception(Exception('test')) self.assertEqual(('test',), callback_exception.args) def test_done_callback_with_cancel(self): was_cancelled = None def fn(callback_future): nonlocal was_cancelled was_cancelled = callback_future.cancelled() f = Future() f.add_done_callback(fn) self.assertTrue(f.cancel()) self.assertTrue(was_cancelled) def test_done_callback_raises(self): with support.captured_stderr() as stderr: raising_was_called = False fn_was_called = False def raising_fn(callback_future): nonlocal raising_was_called raising_was_called = True raise Exception('doh!') def fn(callback_future): nonlocal fn_was_called fn_was_called = True f = Future() f.add_done_callback(raising_fn) f.add_done_callback(fn) f.set_result(5) self.assertTrue(raising_was_called) self.assertTrue(fn_was_called) self.assertIn('Exception: doh!', stderr.getvalue()) def test_done_callback_already_successful(self): callback_result = None def fn(callback_future): nonlocal callback_result callback_result = callback_future.result() f = Future() f.set_result(5) f.add_done_callback(fn) self.assertEqual(5, callback_result) def test_done_callback_already_failed(self): callback_exception = None def fn(callback_future): nonlocal callback_exception callback_exception = callback_future.exception() f = Future() f.set_exception(Exception('test')) f.add_done_callback(fn) self.assertEqual(('test',), callback_exception.args) def test_done_callback_already_cancelled(self): was_cancelled = None def fn(callback_future): nonlocal was_cancelled was_cancelled = callback_future.cancelled() f = Future() self.assertTrue(f.cancel()) f.add_done_callback(fn) self.assertTrue(was_cancelled) def test_done_callback_raises_already_succeeded(self): with support.captured_stderr() as stderr: def raising_fn(callback_future): raise Exception('doh!') f = Future() # Set the result first to simulate a future that runs instantly, # effectively allowing the callback to be run immediately. f.set_result(5) f.add_done_callback(raising_fn) self.assertIn('exception calling callback for', stderr.getvalue()) self.assertIn('doh!', stderr.getvalue()) def test_repr(self): self.assertRegex(repr(PENDING_FUTURE), '') self.assertRegex(repr(RUNNING_FUTURE), '') self.assertRegex(repr(CANCELLED_FUTURE), '') self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE), '') self.assertRegex( repr(EXCEPTION_FUTURE), '') self.assertRegex( repr(SUCCESSFUL_FUTURE), '') def test_cancel(self): f1 = create_future(state=PENDING) f2 = create_future(state=RUNNING) f3 = create_future(state=CANCELLED) f4 = create_future(state=CANCELLED_AND_NOTIFIED) f5 = create_future(state=FINISHED, exception=OSError()) f6 = create_future(state=FINISHED, result=5) self.assertTrue(f1.cancel()) self.assertEqual(f1._state, CANCELLED) self.assertFalse(f2.cancel()) self.assertEqual(f2._state, RUNNING) self.assertTrue(f3.cancel()) self.assertEqual(f3._state, CANCELLED) self.assertTrue(f4.cancel()) self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) self.assertFalse(f5.cancel()) self.assertEqual(f5._state, FINISHED) self.assertFalse(f6.cancel()) self.assertEqual(f6._state, FINISHED) def test_cancelled(self): self.assertFalse(PENDING_FUTURE.cancelled()) self.assertFalse(RUNNING_FUTURE.cancelled()) self.assertTrue(CANCELLED_FUTURE.cancelled()) self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) self.assertFalse(EXCEPTION_FUTURE.cancelled()) self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) def test_done(self): self.assertFalse(PENDING_FUTURE.done()) self.assertFalse(RUNNING_FUTURE.done()) self.assertTrue(CANCELLED_FUTURE.done()) self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) self.assertTrue(EXCEPTION_FUTURE.done()) self.assertTrue(SUCCESSFUL_FUTURE.done()) def test_running(self): self.assertFalse(PENDING_FUTURE.running()) self.assertTrue(RUNNING_FUTURE.running()) self.assertFalse(CANCELLED_FUTURE.running()) self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) self.assertFalse(EXCEPTION_FUTURE.running()) self.assertFalse(SUCCESSFUL_FUTURE.running()) def test_result_with_timeout(self): self.assertRaises(futures.TimeoutError, PENDING_FUTURE.result, timeout=0) self.assertRaises(futures.TimeoutError, RUNNING_FUTURE.result, timeout=0) self.assertRaises(futures.CancelledError, CANCELLED_FUTURE.result, timeout=0) self.assertRaises(futures.CancelledError, CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0) self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) def test_result_with_success(self): # TODO(brian@sweetapp.com): This test is timing dependent. def notification(): # Wait until the main thread is waiting for the result. time.sleep(1) f1.set_result(42) f1 = create_future(state=PENDING) t = threading.Thread(target=notification) t.start() self.assertEqual(f1.result(timeout=5), 42) t.join() def test_result_with_cancel(self): # TODO(brian@sweetapp.com): This test is timing dependent. def notification(): # Wait until the main thread is waiting for the result. time.sleep(1) f1.cancel() f1 = create_future(state=PENDING) t = threading.Thread(target=notification) t.start() self.assertRaises(futures.CancelledError, f1.result, timeout=support.SHORT_TIMEOUT) t.join() def test_exception_with_timeout(self): self.assertRaises(futures.TimeoutError, PENDING_FUTURE.exception, timeout=0) self.assertRaises(futures.TimeoutError, RUNNING_FUTURE.exception, timeout=0) self.assertRaises(futures.CancelledError, CANCELLED_FUTURE.exception, timeout=0) self.assertRaises(futures.CancelledError, CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), OSError)) self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) def test_exception_with_success(self): def notification(): # Wait until the main thread is waiting for the exception. time.sleep(1) with f1._condition: f1._state = FINISHED f1._exception = OSError() f1._condition.notify_all() f1 = create_future(state=PENDING) t = threading.Thread(target=notification) t.start() self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError)) t.join() def test_multiple_set_result(self): f = create_future(state=PENDING) f.set_result(1) with self.assertRaisesRegex( futures.InvalidStateError, 'FINISHED: ' ): f.set_result(2) self.assertTrue(f.done()) self.assertEqual(f.result(), 1) def test_multiple_set_exception(self): f = create_future(state=PENDING) e = ValueError() f.set_exception(e) with self.assertRaisesRegex( futures.InvalidStateError, 'FINISHED: ' ): f.set_exception(Exception()) self.assertEqual(f.exception(), e) def setUpModule(): unittest.addModuleCleanup(multiprocessing.util._cleanup_tests) thread_info = threading_helper.threading_setup() unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) if __name__ == "__main__": unittest.main()