diff options
author | Martin v. Löwis <martin@v.loewis.de> | 2011-01-03 00:07:01 (GMT) |
---|---|---|
committer | Martin v. Löwis <martin@v.loewis.de> | 2011-01-03 00:07:01 (GMT) |
commit | 9f6d48ba4e43b9cadd0b842e6ed1abbfebf759ed (patch) | |
tree | 51276793e8a55e3b9a4ebbbee8de929e3f3e78c0 /Lib | |
parent | e10608cf5de214985308a8d9eb6ace07bc9c590e (diff) | |
download | cpython-9f6d48ba4e43b9cadd0b842e6ed1abbfebf759ed.zip cpython-9f6d48ba4e43b9cadd0b842e6ed1abbfebf759ed.tar.gz cpython-9f6d48ba4e43b9cadd0b842e6ed1abbfebf759ed.tar.bz2 |
Issue #10798: Reject supporting concurrent.futures if the system has
too few POSIX semaphores.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/concurrent/futures/process.py | 26 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures.py | 130 |
2 files changed, 84 insertions, 72 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index f461b77..79c60c3 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -244,6 +244,31 @@ def _queue_manangement_worker(executor_reference, else: work_item.future.set_result(result_item.result) +_system_limits_checked = False +_system_limited = None +def _check_system_limits(): + global _system_limits_checked, _system_limited + if _system_limits_checked: + if _system_limited: + raise NotImplementedError(_system_limited) + _system_limits_checked = True + try: + import os + nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") + except (AttributeError, ValueError): + # sysconf not available or setting not available + return + if nsems_max == -1: + # indetermine limit, assume that limit is determined + # by available memory only + return + if nsems_max >= 256: + # minimum number of semaphores available + # according to POSIX + return + _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max + raise NotImplementedError(_system_limited) + class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None): """Initializes a new ProcessPoolExecutor instance. @@ -253,6 +278,7 @@ class ProcessPoolExecutor(_base.Executor): execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. """ + _check_system_limits() _remove_dead_thread_references() if max_workers is None: diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 6a95a36..48f2e7e 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -69,7 +69,7 @@ class Call(object): assert handle is not None return handle else: - event = multiprocessing.Event() + event = self.Event[0]() self.CALL_LOCKS[id(event)] = event return id(event) @@ -99,7 +99,8 @@ class Call(object): else: self.CALL_LOCKS[handle].set() - def __init__(self, manual_finish=False, result=42): + def __init__(self, Event, manual_finish=False, result=42): + self.Event = Event self._called_event = self._create_event() self._can_finish = self._create_event() @@ -138,8 +139,8 @@ class ExceptionCall(Call): raise ZeroDivisionError() class MapCall(Call): - def __init__(self, result=42): - super().__init__(manual_finish=True, result=result) + def __init__(self, Event, result=42): + super().__init__(Event, manual_finish=True, result=result) def __call__(self, manual_finish): if manual_finish: @@ -155,9 +156,9 @@ class ExecutorShutdownTest(unittest.TestCase): def _start_some_futures(self): - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) + call2 = Call(self.Event, manual_finish=True) + call3 = Call(self.Event, manual_finish=True) try: self.executor.submit(call1) @@ -176,13 +177,28 @@ class ExecutorShutdownTest(unittest.TestCase): call2.close() call3.close() -class ThreadPoolShutdownTest(ExecutorShutdownTest): +class ThreadPoolMixin: + # wrap in tuple to prevent creation of instance methods + Event = (threading.Event,) def setUp(self): self.executor = futures.ThreadPoolExecutor(max_workers=5) def tearDown(self): self.executor.shutdown(wait=True) +class ProcessPoolMixin: + # wrap in tuple to prevent creation of instance methods + Event = (multiprocessing.Event,) + def setUp(self): + try: + self.executor = futures.ProcessPoolExecutor(max_workers=5) + except NotImplementedError as e: + self.skipTest(str(e)) + + def tearDown(self): + self.executor.shutdown(wait=True) + +class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest): def test_threads_terminate(self): self._start_some_futures() self.assertEqual(len(self.executor._threads), 3) @@ -208,13 +224,7 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest): for t in threads: t.join() -class ProcessPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=5) - - def tearDown(self): - self.executor.shutdown(wait=True) - +class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): def test_processes_terminate(self): self._start_some_futures() self.assertEqual(len(self.executor._processes), 5) @@ -251,8 +261,8 @@ class WaitTests(unittest.TestCase): pass call1.set_can() - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) + call2 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) future2 = self.executor.submit(call2) @@ -270,7 +280,7 @@ class WaitTests(unittest.TestCase): call2.close() def test_first_completed_one_already_completed(self): - call1 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) @@ -290,9 +300,9 @@ class WaitTests(unittest.TestCase): call1.set_can() call2.set_can() - call1 = Call(manual_finish=True) - call2 = ExceptionCall(manual_finish=True) - call3 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) + call2 = ExceptionCall(self.Event, manual_finish=True) + call3 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) future2 = self.executor.submit(call2) @@ -317,8 +327,8 @@ class WaitTests(unittest.TestCase): pass call1.set_can() - call1 = ExceptionCall(manual_finish=True) - call2 = Call(manual_finish=True) + call1 = ExceptionCall(self.Event, manual_finish=True) + call2 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) future2 = self.executor.submit(call2) @@ -343,7 +353,7 @@ class WaitTests(unittest.TestCase): call2.close() def test_first_exception_one_already_failed(self): - call1 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) @@ -363,8 +373,8 @@ class WaitTests(unittest.TestCase): call1.set_can() call2.set_can() - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) + call2 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) future2 = self.executor.submit(call2) @@ -397,10 +407,10 @@ class WaitTests(unittest.TestCase): 'this test assumes that future4 will be cancelled before it is ' 'queued to run - which might not be the case if ' 'ProcessPoolExecutor is too aggresive in scheduling futures') - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - call4 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) + call2 = Call(self.Event, manual_finish=True) + call3 = Call(self.Event, manual_finish=True) + call4 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) future2 = self.executor.submit(call2) @@ -432,8 +442,8 @@ class WaitTests(unittest.TestCase): pass call1.set_can() - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) + call2 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) future2 = self.executor.submit(call2) @@ -460,19 +470,11 @@ class WaitTests(unittest.TestCase): call2.close() -class ThreadPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) +class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests): + pass - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) +class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests): + pass class AsCompletedTests(unittest.TestCase): # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. @@ -483,8 +485,8 @@ class AsCompletedTests(unittest.TestCase): call1.set_can() call2.set_can() - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) + call2 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) future2 = self.executor.submit(call2) @@ -507,7 +509,7 @@ class AsCompletedTests(unittest.TestCase): call2.close() def test_zero_timeout(self): - call1 = Call(manual_finish=True) + call1 = Call(self.Event, manual_finish=True) try: future1 = self.executor.submit(call1) completed_futures = set() @@ -529,19 +531,11 @@ class AsCompletedTests(unittest.TestCase): finally: call1.close() -class ThreadPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) +class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests): + pass -class ProcessPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) +class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests): + pass class ExecutorTest(unittest.TestCase): # Executor.shutdown() and context manager usage is tested by @@ -567,7 +561,7 @@ class ExecutorTest(unittest.TestCase): def test_map_timeout(self): results = [] - timeout_call = MapCall() + timeout_call = MapCall(self.Event) try: try: for i in self.executor.map(timeout_call, @@ -583,19 +577,11 @@ class ExecutorTest(unittest.TestCase): self.assertEqual([42, 42], results) -class ThreadPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) +class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): + pass -class ProcessPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) +class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): + pass class FutureTests(unittest.TestCase): def test_done_callback_with_result(self): |