diff options
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/queues.py | 47 |
1 files changed, 38 insertions, 9 deletions
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index c55dd8b..b26edfb 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -47,7 +47,7 @@ class Queue: # Futures. self._getters = collections.deque() - # Pairs of (item, Future). + # Futures self._putters = collections.deque() self._unfinished_tasks = 0 self._finished = locks.Event(loop=self._loop) @@ -98,7 +98,7 @@ class Queue: def _consume_done_putters(self): # Delete waiters at the head of the put() queue who've timed out. - while self._putters and self._putters[0][1].done(): + while self._putters and self._putters[0].done(): self._putters.popleft() def qsize(self): @@ -148,8 +148,9 @@ class Queue: elif self._maxsize > 0 and self._maxsize <= self.qsize(): waiter = futures.Future(loop=self._loop) - self._putters.append((item, waiter)) + self._putters.append(waiter) yield from waiter + self._put(item) else: self.__put_internal(item) @@ -186,8 +187,7 @@ class Queue: self._consume_done_putters() if self._putters: assert self.full(), 'queue not full, why are putters waiting?' - item, putter = self._putters.popleft() - self.__put_internal(item) + putter = self._putters.popleft() # When a getter runs and frees up a slot so this putter can # run, we need to defer the put for a tick to ensure that @@ -201,9 +201,39 @@ class Queue: return self._get() else: waiter = futures.Future(loop=self._loop) - self._getters.append(waiter) - return (yield from waiter) + try: + return (yield from waiter) + except futures.CancelledError: + # if we get CancelledError, it means someone cancelled this + # get() coroutine. But there is a chance that the waiter + # already is ready and contains an item that has just been + # removed from the queue. In this case, we need to put the item + # back into the front of the queue. This get() must either + # succeed without fault or, if it gets cancelled, it must be as + # if it never happened. + if waiter.done(): + self._put_it_back(waiter.result()) + raise + + def _put_it_back(self, item): + """ + This is called when we have a waiter to get() an item and this waiter + gets cancelled. In this case, we put the item back: wake up another + waiter or put it in the _queue. + """ + self._consume_done_getters() + if self._getters: + assert not self._queue, ( + 'queue non-empty, why are getters waiting?') + + getter = self._getters.popleft() + self._put_internal(item) + + # getter cannot be cancelled, we just removed done getters + getter.set_result(item) + else: + self._queue.appendleft(item) def get_nowait(self): """Remove and return an item from the queue. @@ -213,8 +243,7 @@ class Queue: self._consume_done_putters() if self._putters: assert self.full(), 'queue not full, why are putters waiting?' - item, putter = self._putters.popleft() - self.__put_internal(item) + putter = self._putters.popleft() # Wake putter on next tick. # getter cannot be cancelled, we just removed done putters |