diff options
Diffstat (limited to 'Lib/test/test_concurrent_futures.py')
-rw-r--r-- | Lib/test/test_concurrent_futures.py | 567 |
1 files changed, 167 insertions, 400 deletions
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 82b8d09..06cdf0b 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -9,24 +9,16 @@ test.support.import_module('multiprocessing.synchronize') # without thread support. test.support.import_module('threading') -import io -import logging -import multiprocessing -import sys import threading import time import unittest -if sys.platform.startswith('win'): - import ctypes - import ctypes.wintypes - from concurrent import futures from concurrent.futures._base import ( - PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, - LOGGER, wait) + PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) import concurrent.futures.process + def create_future(state=PENDING, exception=None, result=None): f = Future() f._state = state @@ -34,6 +26,7 @@ def create_future(state=PENDING, exception=None, result=None): f._result = result return f + PENDING_FUTURE = create_future(state=PENDING) RUNNING_FUTURE = create_future(state=RUNNING) CANCELLED_FUTURE = create_future(state=CANCELLED) @@ -41,166 +34,65 @@ CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) + def mul(x, y): return x * y -class Call(object): - """A call that can be submitted to a future.Executor for testing. - - The call signals when it is called and waits for an event before finishing. - """ - CALL_LOCKS = {} - def _create_event(self): - if sys.platform.startswith('win'): - class SECURITY_ATTRIBUTES(ctypes.Structure): - _fields_ = [("nLength", ctypes.wintypes.DWORD), - ("lpSecurityDescriptor", ctypes.wintypes.LPVOID), - ("bInheritHandle", ctypes.wintypes.BOOL)] - - s = SECURITY_ATTRIBUTES() - s.nLength = ctypes.sizeof(s) - s.lpSecurityDescriptor = None - s.bInheritHandle = True - - handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s), - True, - False, - None) - assert handle is not None - return handle - else: - event = self.Event[0]() - self.CALL_LOCKS[id(event)] = event - return id(event) - - def _wait_on_event(self, handle): - if sys.platform.startswith('win'): - # WaitForSingleObject returns 0 if handle is signaled. - r = ctypes.windll.kernel32.WaitForSingleObject(handle, 60 * 1000) - if r != 0: - message = ( - 'WaitForSingleObject({}, ...) failed with {}, ' - 'GetLastError() = {}'.format( - handle, r, ctypes.GetLastError())) - logging.critical(message) - assert False, message - else: - self.CALL_LOCKS[handle].wait() - - def _signal_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.SetEvent(handle) # Returns 0 on failure. - if r == 0: - message = ( - 'SetEvent({}) failed with {}, GetLastError() = {}'.format( - handle, r, ctypes.GetLastError())) - logging.critical(message) - assert False, message - else: - self.CALL_LOCKS[handle].set() - - def __init__(self, Event, manual_finish=False, result=42): - self.Event = Event - self._called_event = self._create_event() - self._can_finish = self._create_event() - - self._result = result - - if not manual_finish: - self._signal_event(self._can_finish) - def wait_on_called(self): - self._wait_on_event(self._called_event) +def sleep_and_raise(t): + time.sleep(t) + raise Exception('this is an exception') - def set_can(self): - self._signal_event(self._can_finish) - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) +class ExecutorMixin: + worker_count = 5 + def _prime_executor(self): + # Make sure that the executor is ready to do work before running the + # tests. This should reduce the probability of timeouts in the tests. + futures = [self.executor.submit(time.sleep, 0.1) + for _ in range(self.worker_count)] - return self._result + for f in futures: + f.result() - def close(self): - self.set_can() - if sys.platform.startswith('win'): - ctypes.windll.kernel32.CloseHandle(self._called_event) - ctypes.windll.kernel32.CloseHandle(self._can_finish) - self._called_event = None - self._can_finish = None - else: - del self.CALL_LOCKS[self._called_event] - del self.CALL_LOCKS[self._can_finish] - -class ExceptionCall(Call): - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) - raise ZeroDivisionError() - -class MapCall(Call): - def __init__(self, Event, result=42): - super().__init__(Event, manual_finish=True, result=result) - def __call__(self, manual_finish): - if manual_finish: - super().__call__() - return self._result - -class ExecutorShutdownTest(unittest.TestCase): - def test_run_after_shutdown(self): - self.executor.shutdown() - self.assertRaises(RuntimeError, - self.executor.submit, - pow, 2, 5) - - - def _start_some_futures(self): - call1 = Call(self.Event, manual_finish=True) - call2 = Call(self.Event, manual_finish=True) - call3 = Call(self.Event, manual_finish=True) - - try: - self.executor.submit(call1) - self.executor.submit(call2) - self.executor.submit(call3) - - call1.wait_on_called() - call2.wait_on_called() - call3.wait_on_called() - - call1.set_can() - call2.set_can() - call3.set_can() - finally: - call1.close() - call2.close() - call3.close() - -class ThreadPoolMixin: - # wrap in tuple to prevent creation of instance methods - Event = (threading.Event,) +class ThreadPoolMixin(ExecutorMixin): def setUp(self): self.executor = futures.ThreadPoolExecutor(max_workers=5) + self._prime_executor() def tearDown(self): self.executor.shutdown(wait=True) -class ProcessPoolMixin: - # wrap in tuple to prevent creation of instance methods - Event = (multiprocessing.Event,) + +class ProcessPoolMixin(ExecutorMixin): def setUp(self): try: self.executor = futures.ProcessPoolExecutor(max_workers=5) except NotImplementedError as e: self.skipTest(str(e)) + self._prime_executor() def tearDown(self): self.executor.shutdown(wait=True) + +class ExecutorShutdownTest(unittest.TestCase): + def test_run_after_shutdown(self): + self.executor.shutdown() + self.assertRaises(RuntimeError, + self.executor.submit, + pow, 2, 5) + + class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest): + def _prime_executor(self): + pass + def test_threads_terminate(self): - self._start_some_futures() + self.executor.submit(mul, 21, 2) + self.executor.submit(mul, 6, 7) + self.executor.submit(mul, 3, 14) self.assertEqual(len(self.executor._threads), 3) self.executor.shutdown() for t in self.executor._threads: @@ -224,9 +116,15 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest): for t in threads: t.join() + class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): + def _prime_executor(self): + pass + def test_processes_terminate(self): - self._start_some_futures() + self.executor.submit(mul, 21, 2) + self.executor.submit(mul, 6, 7) + self.executor.submit(mul, 3, 14) self.assertEqual(len(self.executor._processes), 5) processes = self.executor._processes self.executor.shutdown() @@ -236,11 +134,11 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): def test_context_manager_shutdown(self): with futures.ProcessPoolExecutor(max_workers=5) as e: - executor = e + processes = e._processes self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - for p in self.executor._processes: + for p in processes: p.join() def test_del_shutdown(self): @@ -256,288 +154,158 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): class WaitTests(unittest.TestCase): def test_first_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() + future1 = self.executor.submit(mul, 21, 2) + future2 = self.executor.submit(time.sleep, 5) - call1 = Call(self.Event, manual_finish=True) - call2 = Call(self.Event, manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - done, not_done = futures.wait( - [CANCELLED_FUTURE, future1, future2], - return_when=futures.FIRST_COMPLETED) - - self.assertEqual(set([future1]), done) - self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) - finally: - call1.close() - call2.close() - - def test_first_completed_one_already_completed(self): - call1 = Call(self.Event, manual_finish=True) - try: - future1 = self.executor.submit(call1) + done, not_done = futures.wait( + [CANCELLED_FUTURE, future1, future2], + return_when=futures.FIRST_COMPLETED) - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, future1], - return_when=futures.FIRST_COMPLETED) + self.assertEqual(set([future1]), done) + self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) - self.assertEqual(set([SUCCESSFUL_FUTURE]), finished) - self.assertEqual(set([future1]), pending) - finally: - call1.close() + def test_first_completed_some_already_completed(self): + future1 = self.executor.submit(time.sleep, 2) - def test_first_exception(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(self.Event, manual_finish=True) - call2 = ExceptionCall(self.Event, manual_finish=True) - call3 = Call(self.Event, manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2, future3], - return_when=futures.FIRST_EXCEPTION) - - self.assertEqual(set([future1, future2]), finished) - self.assertEqual(set([future3]), pending) - finally: - call1.close() - call2.close() - call3.close() + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], + return_when=futures.FIRST_COMPLETED) - def test_first_exception_some_already_complete(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() + self.assertEqual( + set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), + finished) + self.assertEqual(set([future1]), pending) - call1 = ExceptionCall(self.Event, manual_finish=True) - call2 = Call(self.Event, manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) + def test_first_exception(self): + future1 = self.executor.submit(mul, 2, 21) + future2 = self.executor.submit(sleep_and_raise, 5) + future3 = self.executor.submit(time.sleep, 10) - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2], - return_when=futures.FIRST_EXCEPTION) + finished, pending = futures.wait( + [future1, future2, future3], + return_when=futures.FIRST_EXCEPTION) - self.assertEqual(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1]), finished) - self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) + self.assertEqual(set([future1, future2]), finished) + self.assertEqual(set([future3]), pending) + + def test_first_exception_some_already_complete(self): + future1 = self.executor.submit(divmod, 21, 0) + future2 = self.executor.submit(time.sleep, 5) + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2], + return_when=futures.FIRST_EXCEPTION) - finally: - call1.close() - call2.close() + self.assertEqual(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1]), finished) + self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) def test_first_exception_one_already_failed(self): - call1 = Call(self.Event, manual_finish=True) - try: - future1 = self.executor.submit(call1) + future1 = self.executor.submit(time.sleep, 2) - finished, pending = futures.wait( - [EXCEPTION_FUTURE, future1], - return_when=futures.FIRST_EXCEPTION) + finished, pending = futures.wait( + [EXCEPTION_FUTURE, future1], + return_when=futures.FIRST_EXCEPTION) - self.assertEqual(set([EXCEPTION_FUTURE]), finished) - self.assertEqual(set([future1]), pending) - finally: - call1.close() + self.assertEqual(set([EXCEPTION_FUTURE]), finished) + self.assertEqual(set([future1]), pending) def test_all_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(self.Event, manual_finish=True) - call2 = Call(self.Event, manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2], - return_when=futures.ALL_COMPLETED) - - self.assertEqual(set([future1, future2]), finished) - self.assertEqual(set(), pending) - finally: - call1.close() - call2.close() - - @unittest.skip # XXX skip the test for now as it hangs - def test_all_completed_some_already_completed(self): - def wait_test(): - while not future1._waiters: - pass - - future4.cancel() - call1.set_can() - call2.set_can() - call3.set_can() - - self.assertLessEqual( - futures.process.EXTRA_QUEUED_CALLS, - 1, - 'this test assumes that future4 will be cancelled before it is ' - 'queued to run - which might not be the case if ' - 'ProcessPoolExecutor is too aggresive in scheduling futures') - call1 = Call(self.Event, manual_finish=True) - call2 = Call(self.Event, manual_finish=True) - call3 = Call(self.Event, manual_finish=True) - call4 = Call(self.Event, manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - future4 = self.executor.submit(call4) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4], - return_when=futures.ALL_COMPLETED) - - self.assertEqual(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4]), - finished) - self.assertEqual(set(), pending) - finally: - call1.close() - call2.close() - call3.close() - call4.close() + future1 = self.executor.submit(divmod, 2, 0) + future2 = self.executor.submit(mul, 2, 21) + + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + future1, + future2], + return_when=futures.ALL_COMPLETED) + + self.assertEqual(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + future1, + future2]), finished) + self.assertEqual(set(), pending) def test_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = Call(self.Event, manual_finish=True) - call2 = Call(self.Event, manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2], - timeout=5, - return_when=futures.ALL_COMPLETED) + future1 = self.executor.submit(mul, 6, 7) + future2 = self.executor.submit(time.sleep, 10) - self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1]), finished) - self.assertEqual(set([future2]), pending) + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2], + timeout=5, + return_when=futures.ALL_COMPLETED) - - finally: - call1.close() - call2.close() + self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1]), finished) + self.assertEqual(set([future2]), pending) class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests): pass + class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests): pass + class AsCompletedTests(unittest.TestCase): # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. def test_no_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(self.Event, manual_finish=True) - call2 = Call(self.Event, manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) + future1 = self.executor.submit(mul, 2, 21) + future2 = self.executor.submit(mul, 7, 6) + + completed = set(futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2])) + self.assertEqual(set( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2]), + completed) - t = threading.Thread(target=wait_test) - t.start() - completed = set(futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2])) - self.assertEqual(set( + def test_zero_timeout(self): + future1 = self.executor.submit(time.sleep, 2) + completed_futures = set() + try: + for future in futures.as_completed( [CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, - future1, future2]), - completed) - finally: - call1.close() - call2.close() + future1], + timeout=0): + completed_futures.add(future) + except futures.TimeoutError: + pass + + self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE]), + completed_futures) - def test_zero_timeout(self): - call1 = Call(self.Event, manual_finish=True) - try: - future1 = self.executor.submit(call1) - completed_futures = set() - try: - for future in futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1], - timeout=0): - completed_futures.add(future) - except futures.TimeoutError: - pass - - self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]), - completed_futures) - finally: - call1.close() class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests): pass + class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests): pass + class ExecutorTest(unittest.TestCase): # Executor.shutdown() and context manager usage is tested by # ExecutorShutdownTest. @@ -562,28 +330,27 @@ class ExecutorTest(unittest.TestCase): def test_map_timeout(self): results = [] - timeout_call = MapCall(self.Event) try: - try: - for i in self.executor.map(timeout_call, - [False, False, True], - timeout=5): - results.append(i) - except futures.TimeoutError: - pass - else: - self.fail('expected TimeoutError') - finally: - timeout_call.close() - - self.assertEqual([42, 42], results) + for i in self.executor.map(time.sleep, + [0, 0, 10], + timeout=5): + results.append(i) + except futures.TimeoutError: + pass + else: + self.fail('expected TimeoutError') + + self.assertEqual([None, None], results) + class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): pass + class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): pass + class FutureTests(unittest.TestCase): def test_done_callback_with_result(self): callback_result = None |