diff options
author | Yury Selivanov <yselivanov@sprymix.com> | 2015-08-05 17:55:54 (GMT) |
---|---|---|
committer | Yury Selivanov <yselivanov@sprymix.com> | 2015-08-05 17:55:54 (GMT) |
commit | 22506d24ee1a82fc9e5d268645113d1256114072 (patch) | |
tree | cca9466c1faee452b646fcf22f7f4033850b554c /Lib | |
parent | 40c0ce472e560ba9514300e800e4be020931a45f (diff) | |
parent | 3fc0f2d288cad93d374d98ca67aa8648e22e9d2f (diff) | |
download | cpython-22506d24ee1a82fc9e5d268645113d1256114072.zip cpython-22506d24ee1a82fc9e5d268645113d1256114072.tar.gz cpython-22506d24ee1a82fc9e5d268645113d1256114072.tar.bz2 |
Issue #23812: Fix asyncio.Queue.get() to avoid loosing items on cancellation.
Patch by Gustavo J. A. M. Carneiro.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/asyncio/queues.py | 47 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_queues.py | 61 |
2 files changed, 98 insertions, 10 deletions
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index c55dd8b..b26edfb 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -47,7 +47,7 @@ class Queue: # Futures. self._getters = collections.deque() - # Pairs of (item, Future). + # Futures self._putters = collections.deque() self._unfinished_tasks = 0 self._finished = locks.Event(loop=self._loop) @@ -98,7 +98,7 @@ class Queue: def _consume_done_putters(self): # Delete waiters at the head of the put() queue who've timed out. - while self._putters and self._putters[0][1].done(): + while self._putters and self._putters[0].done(): self._putters.popleft() def qsize(self): @@ -148,8 +148,9 @@ class Queue: elif self._maxsize > 0 and self._maxsize <= self.qsize(): waiter = futures.Future(loop=self._loop) - self._putters.append((item, waiter)) + self._putters.append(waiter) yield from waiter + self._put(item) else: self.__put_internal(item) @@ -186,8 +187,7 @@ class Queue: self._consume_done_putters() if self._putters: assert self.full(), 'queue not full, why are putters waiting?' - item, putter = self._putters.popleft() - self.__put_internal(item) + putter = self._putters.popleft() # When a getter runs and frees up a slot so this putter can # run, we need to defer the put for a tick to ensure that @@ -201,9 +201,39 @@ class Queue: return self._get() else: waiter = futures.Future(loop=self._loop) - self._getters.append(waiter) - return (yield from waiter) + try: + return (yield from waiter) + except futures.CancelledError: + # if we get CancelledError, it means someone cancelled this + # get() coroutine. But there is a chance that the waiter + # already is ready and contains an item that has just been + # removed from the queue. In this case, we need to put the item + # back into the front of the queue. This get() must either + # succeed without fault or, if it gets cancelled, it must be as + # if it never happened. + if waiter.done(): + self._put_it_back(waiter.result()) + raise + + def _put_it_back(self, item): + """ + This is called when we have a waiter to get() an item and this waiter + gets cancelled. In this case, we put the item back: wake up another + waiter or put it in the _queue. + """ + self._consume_done_getters() + if self._getters: + assert not self._queue, ( + 'queue non-empty, why are getters waiting?') + + getter = self._getters.popleft() + self._put_internal(item) + + # getter cannot be cancelled, we just removed done getters + getter.set_result(item) + else: + self._queue.appendleft(item) def get_nowait(self): """Remove and return an item from the queue. @@ -213,8 +243,7 @@ class Queue: self._consume_done_putters() if self._putters: assert self.full(), 'queue not full, why are putters waiting?' - item, putter = self._putters.popleft() - self.__put_internal(item) + putter = self._putters.popleft() # Wake putter on next tick. # getter cannot be cancelled, we just removed done putters diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 88b4f07..7c7d0ea 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -171,7 +171,7 @@ class QueueGetTests(_QueueTestBase): q.put_nowait(1) waiter = asyncio.Future(loop=self.loop) - q._putters.append((2, waiter)) + q._putters.append(waiter) res = self.loop.run_until_complete(q.get()) self.assertEqual(1, res) @@ -322,6 +322,64 @@ class QueuePutTests(_QueueTestBase): q.put_nowait(1) self.assertEqual(1, q.get_nowait()) + def test_get_cancel_drop(self): + def gen(): + yield 0.01 + yield 0.1 + + loop = self.new_test_loop(gen) + + q = asyncio.Queue(loop=loop) + + reader = loop.create_task(q.get()) + + loop.run_until_complete(asyncio.sleep(0.01, loop=loop)) + + q.put_nowait(1) + q.put_nowait(2) + reader.cancel() + + try: + loop.run_until_complete(reader) + except asyncio.CancelledError: + # try again + reader = loop.create_task(q.get()) + loop.run_until_complete(reader) + + result = reader.result() + # if we get 2, it means 1 got dropped! + self.assertEqual(1, result) + + def test_put_cancel_drop(self): + + def gen(): + yield 0.01 + yield 0.1 + + loop = self.new_test_loop(gen) + q = asyncio.Queue(1, loop=loop) + + q.put_nowait(1) + + # putting a second item in the queue has to block (qsize=1) + writer = loop.create_task(q.put(2)) + loop.run_until_complete(asyncio.sleep(0.01, loop=loop)) + + value1 = q.get_nowait() + self.assertEqual(value1, 1) + + writer.cancel() + try: + loop.run_until_complete(writer) + except asyncio.CancelledError: + # try again + writer = loop.create_task(q.put(2)) + loop.run_until_complete(writer) + + value2 = q.get_nowait() + self.assertEqual(value2, 2) + self.assertEqual(q.qsize(), 0) + def test_nonblocking_put_exception(self): q = asyncio.Queue(maxsize=1, loop=self.loop) q.put_nowait(1) @@ -374,6 +432,7 @@ class QueuePutTests(_QueueTestBase): test_utils.run_briefly(self.loop) self.assertTrue(put_c.done()) self.assertEqual(q.get_nowait(), 'a') + test_utils.run_briefly(self.loop) self.assertEqual(q.get_nowait(), 'b') self.loop.run_until_complete(put_b) |