diff options
Diffstat (limited to 'Lib/multiprocessing/pool.py')
-rw-r--r-- | Lib/multiprocessing/pool.py | 53 |
1 files changed, 38 insertions, 15 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index fc9d904..8082ad6 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -17,9 +17,11 @@ import threading import queue import itertools import collections +import os import time +import traceback -from multiprocessing import Process, cpu_count, TimeoutError +from multiprocessing import Process, TimeoutError from multiprocessing.util import Finalize, debug # @@ -43,6 +45,29 @@ def starmapstar(args): return list(itertools.starmap(args[0], args[1])) # +# Hack to embed stringification of remote traceback in local traceback +# + +class RemoteTraceback(Exception): + def __init__(self, tb): + self.tb = tb + def __str__(self): + return self.tb + +class ExceptionWithTraceback: + def __init__(self, exc, tb): + tb = traceback.format_exception(type(exc), exc, tb) + tb = ''.join(tb) + self.exc = exc + self.tb = '\n"""\n%s"""' % tb + def __reduce__(self): + return rebuild_exc, (self.exc, self.tb) + +def rebuild_exc(exc, tb): + exc.__cause__ = RemoteTraceback(tb) + return exc + +# # Code run by worker processes # @@ -78,8 +103,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): while maxtasks is None or (maxtasks and completed < maxtasks): try: task = get() - except (EOFError, IOError): - debug('worker got EOFError or IOError -- exiting') + except (EOFError, OSError): + debug('worker got EOFError or OSError -- exiting') break if task is None: @@ -90,6 +115,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): try: result = (True, func(*args, **kwds)) except Exception as e: + e = ExceptionWithTraceback(e, e.__traceback__) result = (False, e) try: put((job, i, result)) @@ -122,10 +148,7 @@ class Pool(object): self._initargs = initargs if processes is None: - try: - processes = cpu_count() - except NotImplementedError: - processes = 1 + processes = os.cpu_count() or 1 if processes < 1: raise ValueError("Number of processes must be at least 1") @@ -349,7 +372,7 @@ class Pool(object): break try: put(task) - except IOError: + except OSError: debug('could not put task on queue') break else: @@ -371,8 +394,8 @@ class Pool(object): debug('task handler sending sentinel to workers') for p in pool: put(None) - except IOError: - debug('task handler got IOError when sending sentinels') + except OSError: + debug('task handler got OSError when sending sentinels') debug('task handler exiting') @@ -383,8 +406,8 @@ class Pool(object): while 1: try: task = get() - except (IOError, EOFError): - debug('result handler got EOFError/IOError -- exiting') + except (OSError, EOFError): + debug('result handler got EOFError/OSError -- exiting') return if thread._state: @@ -405,8 +428,8 @@ class Pool(object): while cache and thread._state != TERMINATE: try: task = get() - except (IOError, EOFError): - debug('result handler got EOFError/IOError -- exiting') + except (OSError, EOFError): + debug('result handler got EOFError/OSError -- exiting') return if task is None: @@ -428,7 +451,7 @@ class Pool(object): if not outqueue._reader.poll(): break get() - except (IOError, EOFError): + except (OSError, EOFError): pass debug('result handler exiting: len(cache)=%s, thread._state=%s', |