summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2015-09-28 14:42:34 (GMT)
committerGuido van Rossum <guido@python.org>2015-09-28 14:42:34 (GMT)
commit99f96c54517b8a130b2c7080e9b7ef93e166f77e (patch)
treeef3478a406121737be2f811d350578043436325d /Lib/asyncio
parent16a1f281942a5bd1f00d038187cad970c1d173e1 (diff)
downloadcpython-99f96c54517b8a130b2c7080e9b7ef93e166f77e.zip
cpython-99f96c54517b8a130b2c7080e9b7ef93e166f77e.tar.gz
cpython-99f96c54517b8a130b2c7080e9b7ef93e166f77e.tar.bz2
Issue #25233: Rewrite the guts of Queue to be more understandable and correct.
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/queues.py152
1 files changed, 41 insertions, 111 deletions
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py
index 021043d..e3a1d5e 100644
--- a/Lib/asyncio/queues.py
+++ b/Lib/asyncio/queues.py
@@ -47,7 +47,7 @@ class Queue:
# Futures.
self._getters = collections.deque()
- # Futures
+ # Futures.
self._putters = collections.deque()
self._unfinished_tasks = 0
self._finished = locks.Event(loop=self._loop)
@@ -67,10 +67,13 @@ class Queue:
# End of the overridable methods.
- def __put_internal(self, item):
- self._put(item)
- self._unfinished_tasks += 1
- self._finished.clear()
+ def _wakeup_next(self, waiters):
+ # Wake up the next waiter (if any) that isn't cancelled.
+ while waiters:
+ waiter = waiters.popleft()
+ if not waiter.done():
+ waiter.set_result(None)
+ break
def __repr__(self):
return '<{} at {:#x} {}>'.format(
@@ -91,16 +94,6 @@ class Queue:
result += ' tasks={}'.format(self._unfinished_tasks)
return result
- def _consume_done_getters(self):
- # Delete waiters at the head of the get() queue who've timed out.
- while self._getters and self._getters[0].done():
- self._getters.popleft()
-
- def _consume_done_putters(self):
- # Delete waiters at the head of the put() queue who've timed out.
- while self._putters and self._putters[0].done():
- self._putters.popleft()
-
def qsize(self):
"""Number of items in the queue."""
return len(self._queue)
@@ -134,47 +127,31 @@ class Queue:
This method is a coroutine.
"""
- self._consume_done_getters()
- if self._getters:
- assert not self._queue, (
- 'queue non-empty, why are getters waiting?')
-
- getter = self._getters.popleft()
- self.__put_internal(item)
-
- # getter cannot be cancelled, we just removed done getters
- getter.set_result(self._get())
-
- elif self._maxsize > 0 and self._maxsize <= self.qsize():
- waiter = futures.Future(loop=self._loop)
-
- self._putters.append(waiter)
- yield from waiter
- self._put(item)
-
- else:
- self.__put_internal(item)
+ while self.full():
+ putter = futures.Future(loop=self._loop)
+ self._putters.append(putter)
+ try:
+ yield from putter
+ except:
+ putter.cancel() # Just in case putter is not done yet.
+ if not self.full() and not putter.cancelled():
+ # We were woken up by get_nowait(), but can't take
+ # the call. Wake up the next in line.
+ self._wakeup_next(self._putters)
+ raise
+ return self.put_nowait(item)
def put_nowait(self, item):
"""Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
"""
- self._consume_done_getters()
- if self._getters:
- assert not self._queue, (
- 'queue non-empty, why are getters waiting?')
-
- getter = self._getters.popleft()
- self.__put_internal(item)
-
- # getter cannot be cancelled, we just removed done getters
- getter.set_result(self._get())
-
- elif self._maxsize > 0 and self._maxsize <= self.qsize():
+ if self.full():
raise QueueFull
- else:
- self.__put_internal(item)
+ self._put(item)
+ self._unfinished_tasks += 1
+ self._finished.clear()
+ self._wakeup_next(self._getters)
@coroutine
def get(self):
@@ -184,77 +161,30 @@ class Queue:
This method is a coroutine.
"""
- self._consume_done_putters()
- if self._putters:
- assert self.full(), 'queue not full, why are putters waiting?'
- putter = self._putters.popleft()
-
- # When a getter runs and frees up a slot so this putter can
- # run, we need to defer the put for a tick to ensure that
- # getters and putters alternate perfectly. See
- # ChannelTest.test_wait.
- self._loop.call_soon(putter._set_result_unless_cancelled, None)
-
- return self._get()
-
- elif self.qsize():
- return self._get()
- else:
- waiter = futures.Future(loop=self._loop)
- self._getters.append(waiter)
+ while self.empty():
+ getter = futures.Future(loop=self._loop)
+ self._getters.append(getter)
try:
- return (yield from waiter)
- except futures.CancelledError:
- # if we get CancelledError, it means someone cancelled this
- # get() coroutine. But there is a chance that the waiter
- # already is ready and contains an item that has just been
- # removed from the queue. In this case, we need to put the item
- # back into the front of the queue. This get() must either
- # succeed without fault or, if it gets cancelled, it must be as
- # if it never happened.
- if waiter.done():
- self._put_it_back(waiter.result())
+ yield from getter
+ except:
+ getter.cancel() # Just in case getter is not done yet.
+ if not self.empty() and not getter.cancelled():
+ # We were woken up by put_nowait(), but can't take
+ # the call. Wake up the next in line.
+ self._wakeup_next(self._getters)
raise
-
- def _put_it_back(self, item):
- """
- This is called when we have a waiter to get() an item and this waiter
- gets cancelled. In this case, we put the item back: wake up another
- waiter or put it in the _queue.
- """
- self._consume_done_getters()
- if self._getters:
- assert not self._queue, (
- 'queue non-empty, why are getters waiting?')
-
- getter = self._getters.popleft()
- self.__put_internal(item)
-
- # getter cannot be cancelled, we just removed done getters
- getter.set_result(item)
- else:
- self._queue.appendleft(item)
+ return self.get_nowait()
def get_nowait(self):
"""Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
"""
- self._consume_done_putters()
- if self._putters:
- assert self.full(), 'queue not full, why are putters waiting?'
- putter = self._putters.popleft()
- # Wake putter on next tick.
-
- # getter cannot be cancelled, we just removed done putters
- putter.set_result(None)
-
- return self._get()
-
- elif self.qsize():
- return self._get()
- else:
+ if self.empty():
raise QueueEmpty
+ item = self._get()
+ self._wakeup_next(self._putters)
+ return item
def task_done(self):
"""Indicate that a formerly enqueued task is complete.