diff options
author | Serhiy Storchaka <storchaka@gmail.com> | 2015-03-13 06:31:34 (GMT) |
---|---|---|
committer | Serhiy Storchaka <storchaka@gmail.com> | 2015-03-13 06:31:34 (GMT) |
commit | 7c26be5b18d71c1c9863d81f1f478bb803e8bd5c (patch) | |
tree | 42e71c48092183531bbad9eee645ae1e108aa795 | |
parent | 59bdf6392de446de8a19bfa37cee52981612830e (diff) | |
download | cpython-7c26be5b18d71c1c9863d81f1f478bb803e8bd5c.zip cpython-7c26be5b18d71c1c9863d81f1f478bb803e8bd5c.tar.gz cpython-7c26be5b18d71c1c9863d81f1f478bb803e8bd5c.tar.bz2 |
Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now
handle exceptions raised by an iterator. Patch by Alon Diamant and Davin
Potts.
-rw-r--r-- | Lib/multiprocessing/pool.py | 37 | ||||
-rw-r--r-- | Lib/test/test_multiprocessing.py | 47 | ||||
-rw-r--r-- | Misc/ACKS | 1 | ||||
-rw-r--r-- | Misc/NEWS | 4 |
4 files changed, 75 insertions, 14 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 04531b9..991f87f 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -334,25 +334,34 @@ class Pool(object): thread = threading.current_thread() for taskseq, set_length in iter(taskqueue.get, None): + task = None i = -1 - for i, task in enumerate(taskseq): - if thread._state: - debug('task handler found thread._state != RUN') - break - try: - put(task) - except Exception as e: - job, ind = task[:2] + try: + for i, task in enumerate(taskseq): + if thread._state: + debug('task handler found thread._state != RUN') + break try: - cache[job]._set(ind, (False, e)) - except KeyError: - pass - else: + put(task) + except Exception as e: + job, ind = task[:2] + try: + cache[job]._set(ind, (False, e)) + except KeyError: + pass + else: + if set_length: + debug('doing set_length()') + set_length(i+1) + continue + break + except Exception as ex: + job, ind = task[:2] if task else (0, 0) + if job in cache: + cache[job]._set(ind + 1, (False, ex)) if set_length: debug('doing set_length()') set_length(i+1) - continue - break else: debug('task handler got sentinel') diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index b1e75b5..681529f 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -1122,6 +1122,15 @@ class _TestContainers(BaseTestCase): def sqr(x, wait=0.0): time.sleep(wait) return x*x + +class SayWhenError(ValueError): pass + +def exception_throwing_generator(total, when): + for i in range(total): + if i == when: + raise SayWhenError("Somebody said when") + yield i + class _TestPool(BaseTestCase): def test_apply(self): @@ -1177,6 +1186,25 @@ class _TestPool(BaseTestCase): self.assertEqual(it.next(), i*i) self.assertRaises(StopIteration, it.next) + def test_imap_handle_iterable_exception(self): + if self.TYPE == 'manager': + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) + for i in range(3): + self.assertEqual(next(it), i*i) + self.assertRaises(SayWhenError, it.next) + + # SayWhenError seen at start of problematic chunk's results + it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) + for i in range(6): + self.assertEqual(next(it), i*i) + self.assertRaises(SayWhenError, it.next) + it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) + for i in range(4): + self.assertEqual(next(it), i*i) + self.assertRaises(SayWhenError, it.next) + def test_imap_unordered(self): it = self.pool.imap_unordered(sqr, range(1000)) self.assertEqual(sorted(it), map(sqr, range(1000))) @@ -1184,6 +1212,25 @@ class _TestPool(BaseTestCase): it = self.pool.imap_unordered(sqr, range(1000), chunksize=53) self.assertEqual(sorted(it), map(sqr, range(1000))) + def test_imap_unordered_handle_iterable_exception(self): + if self.TYPE == 'manager': + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + it = self.pool.imap_unordered(sqr, + exception_throwing_generator(10, 3), + 1) + with self.assertRaises(SayWhenError): + # imap_unordered makes it difficult to anticipate the SayWhenError + for i in range(10): + self.assertEqual(next(it), i*i) + + it = self.pool.imap_unordered(sqr, + exception_throwing_generator(20, 7), + 2) + with self.assertRaises(SayWhenError): + for i in range(20): + self.assertEqual(next(it), i*i) + def test_make_pool(self): self.assertRaises(ValueError, multiprocessing.Pool, -1) self.assertRaises(ValueError, multiprocessing.Pool, 0) @@ -325,6 +325,7 @@ Raghuram Devarakonda Caleb Deveraux Catherine Devlin Scott Dial +Alon Diamant Toby Dickenson Mark Dickinson Jack Diederich @@ -21,6 +21,10 @@ Core and Builtins Library ------- +- Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now + handle exceptions raised by an iterator. Patch by Alon Diamant and Davin + Potts. + - Issue #22928: Disabled HTTP header injections in httplib. Original patch by Demian Brecht. |