diff options
author | Serhiy Storchaka <storchaka@gmail.com> | 2015-03-13 06:25:26 (GMT) |
---|---|---|
committer | Serhiy Storchaka <storchaka@gmail.com> | 2015-03-13 06:25:26 (GMT) |
commit | 79fbeee2378dc31a5edebc9a5aa8f3fe9726933e (patch) | |
tree | c6e548a0204d89160067d496de135893cbfbf06b /Lib/multiprocessing | |
parent | f0f14f72bb8aba4955749c7993f72083c2680fbe (diff) | |
download | cpython-79fbeee2378dc31a5edebc9a5aa8f3fe9726933e.zip cpython-79fbeee2378dc31a5edebc9a5aa8f3fe9726933e.tar.gz cpython-79fbeee2378dc31a5edebc9a5aa8f3fe9726933e.tar.bz2 |
Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now
handle exceptions raised by an iterator. Patch by Alon Diamant and Davin
Potts.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/pool.py | 37 |
1 files changed, 23 insertions, 14 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 8832a5c..db6e3e1 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -374,25 +374,34 @@ class Pool(object): thread = threading.current_thread() for taskseq, set_length in iter(taskqueue.get, None): + task = None i = -1 - for i, task in enumerate(taskseq): - if thread._state: - util.debug('task handler found thread._state != RUN') - break - try: - put(task) - except Exception as e: - job, ind = task[:2] + try: + for i, task in enumerate(taskseq): + if thread._state: + util.debug('task handler found thread._state != RUN') + break try: - cache[job]._set(ind, (False, e)) - except KeyError: - pass - else: + put(task) + except Exception as e: + job, ind = task[:2] + try: + cache[job]._set(ind, (False, e)) + except KeyError: + pass + else: + if set_length: + util.debug('doing set_length()') + set_length(i+1) + continue + break + except Exception as ex: + job, ind = task[:2] if task else (0, 0) + if job in cache: + cache[job]._set(ind + 1, (False, ex)) if set_length: util.debug('doing set_length()') set_length(i+1) - continue - break else: util.debug('task handler got sentinel') |