summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/queues.py68
1 files changed, 65 insertions, 3 deletions
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py
index a9656a6..b815670 100644
--- a/Lib/asyncio/queues.py
+++ b/Lib/asyncio/queues.py
@@ -1,4 +1,11 @@
-__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
+__all__ = (
+ 'Queue',
+ 'PriorityQueue',
+ 'LifoQueue',
+ 'QueueFull',
+ 'QueueEmpty',
+ 'QueueShutDown',
+)
import collections
import heapq
@@ -18,6 +25,11 @@ class QueueFull(Exception):
pass
+class QueueShutDown(Exception):
+ """Raised when putting on to or getting from a shut-down Queue."""
+ pass
+
+
class Queue(mixins._LoopBoundMixin):
"""A queue, useful for coordinating producer and consumer coroutines.
@@ -41,6 +53,7 @@ class Queue(mixins._LoopBoundMixin):
self._finished = locks.Event()
self._finished.set()
self._init(maxsize)
+ self._is_shutdown = False
# These three are overridable in subclasses.
@@ -81,6 +94,8 @@ class Queue(mixins._LoopBoundMixin):
result += f' _putters[{len(self._putters)}]'
if self._unfinished_tasks:
result += f' tasks={self._unfinished_tasks}'
+ if self._is_shutdown:
+ result += ' shutdown'
return result
def qsize(self):
@@ -112,8 +127,12 @@ class Queue(mixins._LoopBoundMixin):
Put an item into the queue. If the queue is full, wait until a free
slot is available before adding item.
+
+ Raises QueueShutDown if the queue has been shut down.
"""
while self.full():
+ if self._is_shutdown:
+ raise QueueShutDown
putter = self._get_loop().create_future()
self._putters.append(putter)
try:
@@ -125,7 +144,7 @@ class Queue(mixins._LoopBoundMixin):
self._putters.remove(putter)
except ValueError:
# The putter could be removed from self._putters by a
- # previous get_nowait call.
+ # previous get_nowait call or a shutdown call.
pass
if not self.full() and not putter.cancelled():
# We were woken up by get_nowait(), but can't take
@@ -138,7 +157,11 @@ class Queue(mixins._LoopBoundMixin):
"""Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
+
+ Raises QueueShutDown if the queue has been shut down.
"""
+ if self._is_shutdown:
+ raise QueueShutDown
if self.full():
raise QueueFull
self._put(item)
@@ -150,8 +173,13 @@ class Queue(mixins._LoopBoundMixin):
"""Remove and return an item from the queue.
If queue is empty, wait until an item is available.
+
+ Raises QueueShutDown if the queue has been shut down and is empty, or
+ if the queue has been shut down immediately.
"""
while self.empty():
+ if self._is_shutdown and self.empty():
+ raise QueueShutDown
getter = self._get_loop().create_future()
self._getters.append(getter)
try:
@@ -163,7 +191,7 @@ class Queue(mixins._LoopBoundMixin):
self._getters.remove(getter)
except ValueError:
# The getter could be removed from self._getters by a
- # previous put_nowait call.
+ # previous put_nowait call, or a shutdown call.
pass
if not self.empty() and not getter.cancelled():
# We were woken up by put_nowait(), but can't take
@@ -176,8 +204,13 @@ class Queue(mixins._LoopBoundMixin):
"""Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
+
+ Raises QueueShutDown if the queue has been shut down and is empty, or
+ if the queue has been shut down immediately.
"""
if self.empty():
+ if self._is_shutdown:
+ raise QueueShutDown
raise QueueEmpty
item = self._get()
self._wakeup_next(self._putters)
@@ -194,6 +227,9 @@ class Queue(mixins._LoopBoundMixin):
been processed (meaning that a task_done() call was received for every
item that had been put() into the queue).
+ shutdown(immediate=True) calls task_done() for each remaining item in
+ the queue.
+
Raises ValueError if called more times than there were items placed in
the queue.
"""
@@ -214,6 +250,32 @@ class Queue(mixins._LoopBoundMixin):
if self._unfinished_tasks > 0:
await self._finished.wait()
+ def shutdown(self, immediate=False):
+ """Shut-down the queue, making queue gets and puts raise QueueShutDown.
+
+ By default, gets will only raise once the queue is empty. Set
+ 'immediate' to True to make gets raise immediately instead.
+
+ All blocked callers of put() will be unblocked, and also get()
+ and join() if 'immediate'.
+ """
+ self._is_shutdown = True
+ if immediate:
+ while not self.empty():
+ self._get()
+ if self._unfinished_tasks > 0:
+ self._unfinished_tasks -= 1
+ if self._unfinished_tasks == 0:
+ self._finished.set()
+ while self._getters:
+ getter = self._getters.popleft()
+ if not getter.done():
+ getter.set_result(None)
+ while self._putters:
+ putter = self._putters.popleft()
+ if not putter.done():
+ putter.set_result(None)
+
class PriorityQueue(Queue):
"""A subclass of Queue; retrieves entries in priority order (lowest first).