diff options
author | Guido van Rossum <guido@python.org> | 2015-09-28 14:42:34 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2015-09-28 14:42:34 (GMT) |
commit | 99f96c54517b8a130b2c7080e9b7ef93e166f77e (patch) | |
tree | ef3478a406121737be2f811d350578043436325d /Lib/asyncio | |
parent | 16a1f281942a5bd1f00d038187cad970c1d173e1 (diff) | |
download | cpython-99f96c54517b8a130b2c7080e9b7ef93e166f77e.zip cpython-99f96c54517b8a130b2c7080e9b7ef93e166f77e.tar.gz cpython-99f96c54517b8a130b2c7080e9b7ef93e166f77e.tar.bz2 |
Issue #25233: Rewrite the guts of Queue to be more understandable and correct.
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/queues.py | 152 |
1 files changed, 41 insertions, 111 deletions
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index 021043d..e3a1d5e 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -47,7 +47,7 @@ class Queue: # Futures. self._getters = collections.deque() - # Futures + # Futures. self._putters = collections.deque() self._unfinished_tasks = 0 self._finished = locks.Event(loop=self._loop) @@ -67,10 +67,13 @@ class Queue: # End of the overridable methods. - def __put_internal(self, item): - self._put(item) - self._unfinished_tasks += 1 - self._finished.clear() + def _wakeup_next(self, waiters): + # Wake up the next waiter (if any) that isn't cancelled. + while waiters: + waiter = waiters.popleft() + if not waiter.done(): + waiter.set_result(None) + break def __repr__(self): return '<{} at {:#x} {}>'.format( @@ -91,16 +94,6 @@ class Queue: result += ' tasks={}'.format(self._unfinished_tasks) return result - def _consume_done_getters(self): - # Delete waiters at the head of the get() queue who've timed out. - while self._getters and self._getters[0].done(): - self._getters.popleft() - - def _consume_done_putters(self): - # Delete waiters at the head of the put() queue who've timed out. - while self._putters and self._putters[0].done(): - self._putters.popleft() - def qsize(self): """Number of items in the queue.""" return len(self._queue) @@ -134,47 +127,31 @@ class Queue: This method is a coroutine. """ - self._consume_done_getters() - if self._getters: - assert not self._queue, ( - 'queue non-empty, why are getters waiting?') - - getter = self._getters.popleft() - self.__put_internal(item) - - # getter cannot be cancelled, we just removed done getters - getter.set_result(self._get()) - - elif self._maxsize > 0 and self._maxsize <= self.qsize(): - waiter = futures.Future(loop=self._loop) - - self._putters.append(waiter) - yield from waiter - self._put(item) - - else: - self.__put_internal(item) + while self.full(): + putter = futures.Future(loop=self._loop) + self._putters.append(putter) + try: + yield from putter + except: + putter.cancel() # Just in case putter is not done yet. + if not self.full() and not putter.cancelled(): + # We were woken up by get_nowait(), but can't take + # the call. Wake up the next in line. + self._wakeup_next(self._putters) + raise + return self.put_nowait(item) def put_nowait(self, item): """Put an item into the queue without blocking. If no free slot is immediately available, raise QueueFull. """ - self._consume_done_getters() - if self._getters: - assert not self._queue, ( - 'queue non-empty, why are getters waiting?') - - getter = self._getters.popleft() - self.__put_internal(item) - - # getter cannot be cancelled, we just removed done getters - getter.set_result(self._get()) - - elif self._maxsize > 0 and self._maxsize <= self.qsize(): + if self.full(): raise QueueFull - else: - self.__put_internal(item) + self._put(item) + self._unfinished_tasks += 1 + self._finished.clear() + self._wakeup_next(self._getters) @coroutine def get(self): @@ -184,77 +161,30 @@ class Queue: This method is a coroutine. """ - self._consume_done_putters() - if self._putters: - assert self.full(), 'queue not full, why are putters waiting?' - putter = self._putters.popleft() - - # When a getter runs and frees up a slot so this putter can - # run, we need to defer the put for a tick to ensure that - # getters and putters alternate perfectly. See - # ChannelTest.test_wait. - self._loop.call_soon(putter._set_result_unless_cancelled, None) - - return self._get() - - elif self.qsize(): - return self._get() - else: - waiter = futures.Future(loop=self._loop) - self._getters.append(waiter) + while self.empty(): + getter = futures.Future(loop=self._loop) + self._getters.append(getter) try: - return (yield from waiter) - except futures.CancelledError: - # if we get CancelledError, it means someone cancelled this - # get() coroutine. But there is a chance that the waiter - # already is ready and contains an item that has just been - # removed from the queue. In this case, we need to put the item - # back into the front of the queue. This get() must either - # succeed without fault or, if it gets cancelled, it must be as - # if it never happened. - if waiter.done(): - self._put_it_back(waiter.result()) + yield from getter + except: + getter.cancel() # Just in case getter is not done yet. + if not self.empty() and not getter.cancelled(): + # We were woken up by put_nowait(), but can't take + # the call. Wake up the next in line. + self._wakeup_next(self._getters) raise - - def _put_it_back(self, item): - """ - This is called when we have a waiter to get() an item and this waiter - gets cancelled. In this case, we put the item back: wake up another - waiter or put it in the _queue. - """ - self._consume_done_getters() - if self._getters: - assert not self._queue, ( - 'queue non-empty, why are getters waiting?') - - getter = self._getters.popleft() - self.__put_internal(item) - - # getter cannot be cancelled, we just removed done getters - getter.set_result(item) - else: - self._queue.appendleft(item) + return self.get_nowait() def get_nowait(self): """Remove and return an item from the queue. Return an item if one is immediately available, else raise QueueEmpty. """ - self._consume_done_putters() - if self._putters: - assert self.full(), 'queue not full, why are putters waiting?' - putter = self._putters.popleft() - # Wake putter on next tick. - - # getter cannot be cancelled, we just removed done putters - putter.set_result(None) - - return self._get() - - elif self.qsize(): - return self._get() - else: + if self.empty(): raise QueueEmpty + item = self._get() + self._wakeup_next(self._putters) + return item def task_done(self): """Indicate that a formerly enqueued task is complete. |