diff options
-rw-r--r-- | Lib/multiprocessing/managers.py | 11 | ||||
-rw-r--r-- | Lib/multiprocessing/pool.py | 12 | ||||
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 11 | ||||
-rw-r--r-- | Misc/NEWS | 2 |
4 files changed, 30 insertions, 6 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index cc87d36..66d46fc 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -1077,17 +1077,22 @@ ArrayProxy = MakeProxyType('ArrayProxy', ( )) -PoolProxy = MakeProxyType('PoolProxy', ( +BasePoolProxy = MakeProxyType('PoolProxy', ( 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', - 'map', 'map_async', 'starmap', 'starmap_async', 'terminate' + 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', )) -PoolProxy._method_to_typeid_ = { +BasePoolProxy._method_to_typeid_ = { 'apply_async': 'AsyncResult', 'map_async': 'AsyncResult', 'starmap_async': 'AsyncResult', 'imap': 'Iterator', 'imap_unordered': 'Iterator' } +class PoolProxy(BasePoolProxy): + def __enter__(self): + return self + def __exit__(self, exc_type, exc_val, exc_tb): + self.terminate() # # Definition of SyncManager diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 74d0875..8832a5c 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -90,7 +90,8 @@ class MaybeEncodingError(Exception): return "<MaybeEncodingError: %s>" % str(self) -def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): +def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, + wrap_exception=False): assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) put = outqueue.put get = inqueue.get @@ -117,7 +118,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): try: result = (True, func(*args, **kwds)) except Exception as e: - e = ExceptionWithTraceback(e, e.__traceback__) + if wrap_exception: + e = ExceptionWithTraceback(e, e.__traceback__) result = (False, e) try: put((job, i, result)) @@ -137,6 +139,8 @@ class Pool(object): ''' Class which supports an async version of applying functions to arguments. ''' + _wrap_exception = True + def Process(self, *args, **kwds): return self._ctx.Process(*args, **kwds) @@ -220,7 +224,8 @@ class Pool(object): w = self.Process(target=worker, args=(self._inqueue, self._outqueue, self._initializer, - self._initargs, self._maxtasksperchild) + self._initargs, self._maxtasksperchild, + self._wrap_exception) ) self._pool.append(w) w.name = w.name.replace('Process', 'PoolWorker') @@ -736,6 +741,7 @@ class IMapUnorderedIterator(IMapIterator): # class ThreadPool(Pool): + _wrap_exception = False @staticmethod def Process(*args, **kwds): diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 8eb57fe..44d6c71 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1810,6 +1810,17 @@ class _TestPool(BaseTestCase): self.assertIn('raise RuntimeError(123) # some comment', f1.getvalue()) + @classmethod + def _test_wrapped_exception(cls): + raise RuntimeError('foo') + + def test_wrapped_exception(self): + # Issue #20980: Should not wrap exception when using thread pool + with self.Pool(1) as p: + with self.assertRaises(RuntimeError): + p.apply(self._test_wrapped_exception) + + def raising(): raise KeyError("key") @@ -21,6 +21,8 @@ Core and Builtins Library ------- +- Issue #20980: Stop wrapping exception when using ThreadPool. + - Issue #20990: Fix issues found by pyflakes for multiprocessing. - Issue #21015: SSL contexts will now automatically select an elliptic |