summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorThomas Moreau <thomas.moreau.2010@gmail.com>2018-03-21 15:50:28 (GMT)
committerAntoine Pitrou <pitrou@free.fr>2018-03-21 15:50:28 (GMT)
commite2f33add635df4fde81be9960bab367e010c19bf (patch)
tree26f61daafe01350703a9f0d40f962e5789b086a7 /Lib
parent9308dea3e1fd565d50a76a667e4e8ef0568b7053 (diff)
downloadcpython-e2f33add635df4fde81be9960bab367e010c19bf.zip
cpython-e2f33add635df4fde81be9960bab367e010c19bf.tar.gz
cpython-e2f33add635df4fde81be9960bab367e010c19bf.tar.bz2
bpo-33078 - Fix queue size on pickling error (GH-6119)
Diffstat (limited to 'Lib')
-rw-r--r--Lib/multiprocessing/queues.py10
-rw-r--r--Lib/test/_test_multiprocessing.py13
2 files changed, 21 insertions, 2 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index d66d37a..715a9b0 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -161,7 +161,7 @@ class Queue(object):
target=Queue._feed,
args=(self._buffer, self._notempty, self._send_bytes,
self._wlock, self._writer.close, self._ignore_epipe,
- self._on_queue_feeder_error),
+ self._on_queue_feeder_error, self._sem),
name='QueueFeederThread'
)
self._thread.daemon = True
@@ -203,7 +203,7 @@ class Queue(object):
@staticmethod
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
- onerror):
+ onerror, queue_sem):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
@@ -255,6 +255,12 @@ class Queue(object):
info('error in queue thread: %s', e)
return
else:
+ # Since the object has not been sent in the queue, we need
+ # to decrease the size of the queue. The error acts as
+ # if the object had been silently removed from the queue
+ # and this step is necessary to have a properly working
+ # queue.
+ queue_sem.release()
onerror(e, obj)
@staticmethod
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 940fe58..c787702 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -1056,6 +1056,19 @@ class _TestQueue(BaseTestCase):
self.assertTrue(q.get(timeout=1.0))
close_queue(q)
+ with test.support.captured_stderr():
+ # bpo-33078: verify that the queue size is correctly handled
+ # on errors.
+ q = self.Queue(maxsize=1)
+ q.put(NotSerializable())
+ q.put(True)
+ self.assertEqual(q.qsize(), 1)
+ # bpo-30595: use a timeout of 1 second for slow buildbots
+ self.assertTrue(q.get(timeout=1.0))
+ # Check that the size of the queue is correct
+ self.assertEqual(q.qsize(), 0)
+ close_queue(q)
+
def test_queue_feeder_on_queue_feeder_error(self):
# bpo-30006: verify feeder handles exceptions using the
# _on_queue_feeder_error hook.