summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRichard Oudkerk <shibturn@gmail.com>2012-05-02 15:36:26 (GMT)
committerRichard Oudkerk <shibturn@gmail.com>2012-05-02 15:36:26 (GMT)
commit0c200c282b460d84e0f8ddd56ce10cc79dfb3600 (patch)
treebecc1fabd2d67b561396005d8f63094ae23b4dc0
parenta9e18cdd7f3de14c49fea7785a4f632f0c097fe4 (diff)
downloadcpython-0c200c282b460d84e0f8ddd56ce10cc79dfb3600.zip
cpython-0c200c282b460d84e0f8ddd56ce10cc79dfb3600.tar.gz
cpython-0c200c282b460d84e0f8ddd56ce10cc79dfb3600.tar.bz2
Issue #9400: Partial backport of fix for #9244
In multiprocessing, a pool worker process would die if the result/error could not be pickled. This could cause pool methods to hang. In 3.x this was fixed by 0aa8af79359d (which also added an error_callback argument to some methods), but the fix was not back ported.
-rw-r--r--Lib/multiprocessing/pool.py25
-rw-r--r--Lib/test/test_multiprocessing.py18
2 files changed, 42 insertions, 1 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index bcbf7e3..99b4df4 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -68,6 +68,23 @@ def mapstar(args):
# Code run by worker processes
#
+class MaybeEncodingError(Exception):
+ """Wraps possible unpickleable errors, so they can be
+ safely sent through the socket."""
+
+ def __init__(self, exc, value):
+ self.exc = repr(exc)
+ self.value = repr(value)
+ super(MaybeEncodingError, self).__init__(self.exc, self.value)
+
+ def __str__(self):
+ return "Error sending result: '%s'. Reason: '%s'" % (self.value,
+ self.exc)
+
+ def __repr__(self):
+ return "<MaybeEncodingError: %s>" % str(self)
+
+
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
put = outqueue.put
@@ -96,7 +113,13 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
result = (True, func(*args, **kwds))
except Exception, e:
result = (False, e)
- put((job, i, result))
+ try:
+ put((job, i, result))
+ except Exception as e:
+ wrapped = MaybeEncodingError(e, result[1])
+ debug("Possible encoding error while sending result: %s" % (
+ wrapped))
+ put((job, i, (False, wrapped)))
completed += 1
debug('worker exiting after %d tasks' % completed)
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index e5258bb..eeb768f 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1152,6 +1152,24 @@ class _TestPool(BaseTestCase):
join()
self.assertTrue(join.elapsed < 0.2)
+def unpickleable_result():
+ return lambda: 42
+
+class _TestPoolWorkerErrors(BaseTestCase):
+ ALLOWED_TYPES = ('processes', )
+
+ def test_unpickleable_result(self):
+ from multiprocessing.pool import MaybeEncodingError
+ p = multiprocessing.Pool(2)
+
+ # Make sure we don't lose pool processes because of encoding errors.
+ for iteration in range(20):
+ res = p.apply_async(unpickleable_result)
+ self.assertRaises(MaybeEncodingError, res.get)
+
+ p.close()
+ p.join()
+
class _TestPoolWorkerLifetime(BaseTestCase):
ALLOWED_TYPES = ('processes', )