import os import sys import threading import time import unittest from concurrent import futures from concurrent.futures.process import BrokenProcessPool from test import support from test.support import hashlib_helper from .executor import ExecutorTest, mul from .util import ( ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin, create_executor_tests, setup_module) class EventfulGCObj(): def __init__(self, mgr): self.event = mgr.Event() def __del__(self): self.event.set() class ProcessPoolExecutorTest(ExecutorTest): @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit') def test_max_workers_too_large(self): with self.assertRaisesRegex(ValueError, "max_workers must be <= 61"): futures.ProcessPoolExecutor(max_workers=62) def test_killed_child(self): # When a child process is abruptly terminated, the whole pool gets # "broken". futures = [self.executor.submit(time.sleep, 3)] # Get one of the processes, and terminate (kill) it p = next(iter(self.executor._processes.values())) p.terminate() for fut in futures: self.assertRaises(BrokenProcessPool, fut.result) # Submitting other jobs fails as well. self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) def test_map_chunksize(self): def bad_map(): list(self.executor.map(pow, range(40), range(40), chunksize=-1)) ref = list(map(pow, range(40), range(40))) self.assertEqual( list(self.executor.map(pow, range(40), range(40), chunksize=6)), ref) self.assertEqual( list(self.executor.map(pow, range(40), range(40), chunksize=50)), ref) self.assertEqual( list(self.executor.map(pow, range(40), range(40), chunksize=40)), ref) self.assertRaises(ValueError, bad_map) @classmethod def _test_traceback(cls): raise RuntimeError(123) # some comment def test_traceback(self): # We want ensure that the traceback from the child process is # contained in the traceback raised in the main process. future = self.executor.submit(self._test_traceback) with self.assertRaises(Exception) as cm: future.result() exc = cm.exception self.assertIs(type(exc), RuntimeError) self.assertEqual(exc.args, (123,)) cause = exc.__cause__ self.assertIs(type(cause), futures.process._RemoteTraceback) self.assertIn('raise RuntimeError(123) # some comment', cause.tb) with support.captured_stderr() as f1: try: raise exc except RuntimeError: sys.excepthook(*sys.exc_info()) self.assertIn('raise RuntimeError(123) # some comment', f1.getvalue()) @hashlib_helper.requires_hashdigest('md5') def test_ressources_gced_in_workers(self): # Ensure that argument for a job are correctly gc-ed after the job # is finished mgr = self.get_context().Manager() obj = EventfulGCObj(mgr) future = self.executor.submit(id, obj) future.result() self.assertTrue(obj.event.wait(timeout=1)) # explicitly destroy the object to ensure that EventfulGCObj.__del__() # is called while manager is still running. obj = None support.gc_collect() mgr.shutdown() mgr.join() def test_saturation(self): executor = self.executor mp_context = self.get_context() sem = mp_context.Semaphore(0) job_count = 15 * executor._max_workers for _ in range(job_count): executor.submit(sem.acquire) self.assertEqual(len(executor._processes), executor._max_workers) for _ in range(job_count): sem.release() def test_idle_process_reuse_one(self): executor = self.executor assert executor._max_workers >= 4 if self.get_context().get_start_method(allow_none=False) == "fork": raise unittest.SkipTest("Incompatible with the fork start method.") executor.submit(mul, 21, 2).result() executor.submit(mul, 6, 7).result() executor.submit(mul, 3, 14).result() self.assertEqual(len(executor._processes), 1) def test_idle_process_reuse_multiple(self): executor = self.executor assert executor._max_workers <= 5 if self.get_context().get_start_method(allow_none=False) == "fork": raise unittest.SkipTest("Incompatible with the fork start method.") executor.submit(mul, 12, 7).result() executor.submit(mul, 33, 25) executor.submit(mul, 25, 26).result() executor.submit(mul, 18, 29) executor.submit(mul, 1, 2).result() executor.submit(mul, 0, 9) self.assertLessEqual(len(executor._processes), 3) executor.shutdown() def test_max_tasks_per_child(self): context = self.get_context() if context.get_start_method(allow_none=False) == "fork": with self.assertRaises(ValueError): self.executor_type(1, mp_context=context, max_tasks_per_child=3) return # not using self.executor as we need to control construction. # arguably this could go in another class w/o that mixin. executor = self.executor_type( 1, mp_context=context, max_tasks_per_child=3) f1 = executor.submit(os.getpid) original_pid = f1.result() # The worker pid remains the same as the worker could be reused f2 = executor.submit(os.getpid) self.assertEqual(f2.result(), original_pid) self.assertEqual(len(executor._processes), 1) f3 = executor.submit(os.getpid) self.assertEqual(f3.result(), original_pid) # A new worker is spawned, with a statistically different pid, # while the previous was reaped. f4 = executor.submit(os.getpid) new_pid = f4.result() self.assertNotEqual(original_pid, new_pid) self.assertEqual(len(executor._processes), 1) executor.shutdown() def test_max_tasks_per_child_defaults_to_spawn_context(self): # not using self.executor as we need to control construction. # arguably this could go in another class w/o that mixin. executor = self.executor_type(1, max_tasks_per_child=3) self.assertEqual(executor._mp_context.get_start_method(), "spawn") def test_max_tasks_early_shutdown(self): context = self.get_context() if context.get_start_method(allow_none=False) == "fork": raise unittest.SkipTest("Incompatible with the fork start method.") # not using self.executor as we need to control construction. # arguably this could go in another class w/o that mixin. executor = self.executor_type( 3, mp_context=context, max_tasks_per_child=1) futures = [] for i in range(6): futures.append(executor.submit(mul, i, i)) executor.shutdown() for i, future in enumerate(futures): self.assertEqual(future.result(), mul(i, i)) def test_python_finalization_error(self): # gh-109047: Catch RuntimeError on thread creation # during Python finalization. context = self.get_context() # gh-109047: Mock the threading.start_joinable_thread() function to inject # RuntimeError: simulate the error raised during Python finalization. # Block the second creation: create _ExecutorManagerThread, but block # QueueFeederThread. orig_start_new_thread = threading._start_joinable_thread nthread = 0 def mock_start_new_thread(func, *args): nonlocal nthread if nthread >= 1: raise RuntimeError("can't create new thread at " "interpreter shutdown") nthread += 1 return orig_start_new_thread(func, *args) with support.swap_attr(threading, '_start_joinable_thread', mock_start_new_thread): executor = self.executor_type(max_workers=2, mp_context=context) with executor: with self.assertRaises(BrokenProcessPool): list(executor.map(mul, [(2, 3)] * 10)) executor.shutdown() create_executor_tests(globals(), ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)) def setUpModule(): setup_module() if __name__ == "__main__": unittest.main()