From 78f55ffc63ba637ffa4a07ca869f451e680b002a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Fran=C3=A7ois=20Natali?= Date: Wed, 10 Feb 2016 22:58:18 +0000 Subject: Issue #23992: multiprocessing: make MapResult not fail-fast upon exception. --- Lib/multiprocessing/pool.py | 20 ++++++++++++-------- Lib/test/_test_multiprocessing.py | 24 ++++++++++++++++++++++++ Misc/NEWS | 2 ++ 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 6d25469..ffdf426 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -638,22 +638,26 @@ class MapResult(ApplyResult): self._number_left = length//chunksize + bool(length % chunksize) def _set(self, i, success_result): + self._number_left -= 1 success, result = success_result - if success: + if success and self._success: self._value[i*self._chunksize:(i+1)*self._chunksize] = result - self._number_left -= 1 if self._number_left == 0: if self._callback: self._callback(self._value) del self._cache[self._job] self._event.set() else: - self._success = False - self._value = result - if self._error_callback: - self._error_callback(self._value) - del self._cache[self._job] - self._event.set() + if not success and self._success: + # only store first exception + self._success = False + self._value = result + if self._number_left == 0: + # only consider the result ready once all jobs are done + if self._error_callback: + self._error_callback(self._value) + del self._cache[self._job] + self._event.set() # # Class whose instances are returned by `Pool.imap()` diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index e9120ab..a59d2ba 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1660,6 +1660,10 @@ def sqr(x, wait=0.0): def mul(x, y): return x*y +def raise_large_valuerror(wait): + time.sleep(wait) + raise ValueError("x" * 1024**2) + class SayWhenError(ValueError): pass def exception_throwing_generator(total, when): @@ -1895,6 +1899,26 @@ class _TestPool(BaseTestCase): with self.assertRaises(RuntimeError): p.apply(self._test_wrapped_exception) + def test_map_no_failfast(self): + # Issue #23992: the fail-fast behaviour when an exception is raised + # during map() would make Pool.join() deadlock, because a worker + # process would fill the result queue (after the result handler thread + # terminated, hence not draining it anymore). + + t_start = time.time() + + with self.assertRaises(ValueError): + with self.Pool(2) as p: + try: + p.map(raise_large_valuerror, [0, 1]) + finally: + time.sleep(0.5) + p.close() + p.join() + + # check that we indeed waited for all jobs + self.assertGreater(time.time() - t_start, 0.9) + def raising(): raise KeyError("key") diff --git a/Misc/NEWS b/Misc/NEWS index 1ac4ad0..22a392e 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -179,6 +179,8 @@ Core and Builtins Library ------- +- Issue #23992: multiprocessing: make MapResult not fail-fast upon exception. + - Issue #26243: Support keyword arguments to zlib.compress(). Patch by Aviv Palivoda. -- cgit v0.12