summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
authorSerhiy Storchaka <storchaka@gmail.com>2015-03-13 06:30:33 (GMT)
committerSerhiy Storchaka <storchaka@gmail.com>2015-03-13 06:30:33 (GMT)
commit63623ac2529ce9f293784c06215b018664c74491 (patch)
treeb0420f59515afa599c4597b7abe3dee2f28f39e6 /Lib/multiprocessing
parent38dae173d11c0f87cec46d137df9180e1f43d01c (diff)
parent79fbeee2378dc31a5edebc9a5aa8f3fe9726933e (diff)
downloadcpython-63623ac2529ce9f293784c06215b018664c74491.zip
cpython-63623ac2529ce9f293784c06215b018664c74491.tar.gz
cpython-63623ac2529ce9f293784c06215b018664c74491.tar.bz2
Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now
handle exceptions raised by an iterator. Patch by Alon Diamant and Davin Potts.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/pool.py37
1 files changed, 23 insertions, 14 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 75a76a4..6d25469 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -374,25 +374,34 @@ class Pool(object):
thread = threading.current_thread()
for taskseq, set_length in iter(taskqueue.get, None):
+ task = None
i = -1
- for i, task in enumerate(taskseq):
- if thread._state:
- util.debug('task handler found thread._state != RUN')
- break
- try:
- put(task)
- except Exception as e:
- job, ind = task[:2]
+ try:
+ for i, task in enumerate(taskseq):
+ if thread._state:
+ util.debug('task handler found thread._state != RUN')
+ break
try:
- cache[job]._set(ind, (False, e))
- except KeyError:
- pass
- else:
+ put(task)
+ except Exception as e:
+ job, ind = task[:2]
+ try:
+ cache[job]._set(ind, (False, e))
+ except KeyError:
+ pass
+ else:
+ if set_length:
+ util.debug('doing set_length()')
+ set_length(i+1)
+ continue
+ break
+ except Exception as ex:
+ job, ind = task[:2] if task else (0, 0)
+ if job in cache:
+ cache[job]._set(ind + 1, (False, ex))
if set_length:
util.debug('doing set_length()')
set_length(i+1)
- continue
- break
else:
util.debug('task handler got sentinel')