import signal import sys import threading import time import unittest from concurrent import futures from test import support from test.support.script_helper import assert_python_ok from .util import ( BaseTestCase, ThreadPoolMixin, ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin, create_executor_tests, setup_module) def sleep_and_print(t, msg): time.sleep(t) print(msg) sys.stdout.flush() class ExecutorShutdownTest: def test_run_after_shutdown(self): self.executor.shutdown() self.assertRaises(RuntimeError, self.executor.submit, pow, 2, 5) def test_interpreter_shutdown(self): # Test the atexit hook for shutdown of worker threads and processes rc, out, err = assert_python_ok('-c', """if 1: from concurrent.futures import {executor_type} from time import sleep from test.test_concurrent_futures.test_shutdown import sleep_and_print if __name__ == "__main__": context = '{context}' if context == "": t = {executor_type}(5) else: from multiprocessing import get_context context = get_context(context) t = {executor_type}(5, mp_context=context) t.submit(sleep_and_print, 1.0, "apple") """.format(executor_type=self.executor_type.__name__, context=getattr(self, "ctx", ""))) # Errors in atexit hooks don't change the process exit code, check # stderr manually. self.assertFalse(err) self.assertEqual(out.strip(), b"apple") def test_submit_after_interpreter_shutdown(self): # Test the atexit hook for shutdown of worker threads and processes rc, out, err = assert_python_ok('-c', """if 1: import atexit @atexit.register def run_last(): try: t.submit(id, None) except RuntimeError: print("runtime-error") raise from concurrent.futures import {executor_type} if __name__ == "__main__": context = '{context}' if not context: t = {executor_type}(5) else: from multiprocessing import get_context context = get_context(context) t = {executor_type}(5, mp_context=context) t.submit(id, 42).result() """.format(executor_type=self.executor_type.__name__, context=getattr(self, "ctx", ""))) # Errors in atexit hooks don't change the process exit code, check # stderr manually. self.assertIn("RuntimeError: cannot schedule new futures", err.decode()) self.assertEqual(out.strip(), b"runtime-error") def test_hang_issue12364(self): fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] self.executor.shutdown() for f in fs: f.result() def test_cancel_futures(self): assert self.worker_count <= 5, "test needs few workers" fs = [self.executor.submit(time.sleep, .1) for _ in range(50)] self.executor.shutdown(cancel_futures=True) # We can't guarantee the exact number of cancellations, but we can # guarantee that *some* were cancelled. With few workers, many of # the submitted futures should have been cancelled. cancelled = [fut for fut in fs if fut.cancelled()] self.assertGreater(len(cancelled), 20) # Ensure the other futures were able to finish. # Use "not fut.cancelled()" instead of "fut.done()" to include futures # that may have been left in a pending state. others = [fut for fut in fs if not fut.cancelled()] for fut in others: self.assertTrue(fut.done(), msg=f"{fut._state=}") self.assertIsNone(fut.exception()) # Similar to the number of cancelled futures, we can't guarantee the # exact number that completed. But, we can guarantee that at least # one finished. self.assertGreater(len(others), 0) def test_hang_gh83386(self): """shutdown(wait=False) doesn't hang at exit with running futures. See https://github.com/python/cpython/issues/83386. """ if self.executor_type == futures.ProcessPoolExecutor: raise unittest.SkipTest( "Hangs, see https://github.com/python/cpython/issues/83386") rc, out, err = assert_python_ok('-c', """if True: from concurrent.futures import {executor_type} from test.test_concurrent_futures.test_shutdown import sleep_and_print if __name__ == "__main__": if {context!r}: multiprocessing.set_start_method({context!r}) t = {executor_type}(max_workers=3) t.submit(sleep_and_print, 1.0, "apple") t.shutdown(wait=False) """.format(executor_type=self.executor_type.__name__, context=getattr(self, 'ctx', None))) self.assertFalse(err) self.assertEqual(out.strip(), b"apple") def test_hang_gh94440(self): """shutdown(wait=True) doesn't hang when a future was submitted and quickly canceled right before shutdown. See https://github.com/python/cpython/issues/94440. """ if not hasattr(signal, 'alarm'): raise unittest.SkipTest( "Tested platform does not support the alarm signal") def timeout(_signum, _frame): raise RuntimeError("timed out waiting for shutdown") kwargs = {} if getattr(self, 'ctx', None): kwargs['mp_context'] = self.get_context() executor = self.executor_type(max_workers=1, **kwargs) executor.submit(int).result() old_handler = signal.signal(signal.SIGALRM, timeout) try: signal.alarm(5) executor.submit(int).cancel() executor.shutdown(wait=True) finally: signal.alarm(0) signal.signal(signal.SIGALRM, old_handler) class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase): def test_threads_terminate(self): def acquire_lock(lock): lock.acquire() sem = threading.Semaphore(0) for i in range(3): self.executor.submit(acquire_lock, sem) self.assertEqual(len(self.executor._threads), 3) for i in range(3): sem.release() self.executor.shutdown() for t in self.executor._threads: t.join() def test_context_manager_shutdown(self): with futures.ThreadPoolExecutor(max_workers=5) as e: executor = e self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) for t in executor._threads: t.join() def test_del_shutdown(self): executor = futures.ThreadPoolExecutor(max_workers=5) res = executor.map(abs, range(-5, 5)) threads = executor._threads del executor for t in threads: t.join() # Make sure the results were all computed before the # executor got shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) def test_shutdown_no_wait(self): # Ensure that the executor cleans up the threads when calling # shutdown with wait=False executor = futures.ThreadPoolExecutor(max_workers=5) res = executor.map(abs, range(-5, 5)) threads = executor._threads executor.shutdown(wait=False) for t in threads: t.join() # Make sure the results were all computed before the # executor got shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) def test_thread_names_assigned(self): executor = futures.ThreadPoolExecutor( max_workers=5, thread_name_prefix='SpecialPool') executor.map(abs, range(-5, 5)) threads = executor._threads del executor support.gc_collect() # For PyPy or other GCs. for t in threads: self.assertRegex(t.name, r'^SpecialPool_[0-4]$') t.join() def test_thread_names_default(self): executor = futures.ThreadPoolExecutor(max_workers=5) executor.map(abs, range(-5, 5)) threads = executor._threads del executor support.gc_collect() # For PyPy or other GCs. for t in threads: # Ensure that our default name is reasonably sane and unique when # no thread_name_prefix was supplied. self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') t.join() def test_cancel_futures_wait_false(self): # Can only be reliably tested for TPE, since PPE often hangs with # `wait=False` (even without *cancel_futures*). rc, out, err = assert_python_ok('-c', """if True: from concurrent.futures import ThreadPoolExecutor from test.test_concurrent_futures.test_shutdown import sleep_and_print if __name__ == "__main__": t = ThreadPoolExecutor() t.submit(sleep_and_print, .1, "apple") t.shutdown(wait=False, cancel_futures=True) """) # Errors in atexit hooks don't change the process exit code, check # stderr manually. self.assertFalse(err) self.assertEqual(out.strip(), b"apple") class ProcessPoolShutdownTest(ExecutorShutdownTest): def test_processes_terminate(self): def acquire_lock(lock): lock.acquire() mp_context = self.get_context() if mp_context.get_start_method(allow_none=False) == "fork": # fork pre-spawns, not on demand. expected_num_processes = self.worker_count else: expected_num_processes = 3 sem = mp_context.Semaphore(0) for _ in range(3): self.executor.submit(acquire_lock, sem) self.assertEqual(len(self.executor._processes), expected_num_processes) for _ in range(3): sem.release() processes = self.executor._processes self.executor.shutdown() for p in processes.values(): p.join() def test_context_manager_shutdown(self): with futures.ProcessPoolExecutor( max_workers=5, mp_context=self.get_context()) as e: processes = e._processes self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) for p in processes.values(): p.join() def test_del_shutdown(self): executor = futures.ProcessPoolExecutor( max_workers=5, mp_context=self.get_context()) res = executor.map(abs, range(-5, 5)) executor_manager_thread = executor._executor_manager_thread processes = executor._processes call_queue = executor._call_queue executor_manager_thread = executor._executor_manager_thread del executor support.gc_collect() # For PyPy or other GCs. # Make sure that all the executor resources were properly cleaned by # the shutdown process executor_manager_thread.join() for p in processes.values(): p.join() call_queue.join_thread() # Make sure the results were all computed before the # executor got shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) def test_shutdown_no_wait(self): # Ensure that the executor cleans up the processes when calling # shutdown with wait=False executor = futures.ProcessPoolExecutor( max_workers=5, mp_context=self.get_context()) res = executor.map(abs, range(-5, 5)) processes = executor._processes call_queue = executor._call_queue executor_manager_thread = executor._executor_manager_thread executor.shutdown(wait=False) # Make sure that all the executor resources were properly cleaned by # the shutdown process executor_manager_thread.join() for p in processes.values(): p.join() call_queue.join_thread() # Make sure the results were all computed before the executor got # shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) create_executor_tests(globals(), ProcessPoolShutdownTest, executor_mixins=(ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)) def setUpModule(): setup_module() if __name__ == "__main__": unittest.main()