summaryrefslogtreecommitdiffstats
path: root/Lib/queue.py
diff options
context:
space:
mode:
authorLaurie O <laurie_opperman@hotmail.com>2024-02-10 04:58:30 (GMT)
committerGitHub <noreply@github.com>2024-02-10 04:58:30 (GMT)
commitb2d9d134dcb5633deebebf2b0118cd4f7ca598a2 (patch)
treeb683686f4a42bdec2fe0540c585005dc4e5db8c2 /Lib/queue.py
parentd4d5bae1471788b345155e8e93a2fe4ab92d09dc (diff)
downloadcpython-b2d9d134dcb5633deebebf2b0118cd4f7ca598a2.zip
cpython-b2d9d134dcb5633deebebf2b0118cd4f7ca598a2.tar.gz
cpython-b2d9d134dcb5633deebebf2b0118cd4f7ca598a2.tar.bz2
gh-96471: Add shutdown() method to queue.Queue (#104750)
Co-authored-by: Duprat <yduprat@gmail.com>
Diffstat (limited to 'Lib/queue.py')
-rw-r--r--Lib/queue.py50
1 files changed, 50 insertions, 0 deletions
diff --git a/Lib/queue.py b/Lib/queue.py
index 55f5008..467ff4f 100644
--- a/Lib/queue.py
+++ b/Lib/queue.py
@@ -25,6 +25,10 @@ class Full(Exception):
pass
+class ShutDown(Exception):
+ '''Raised when put/get with shut-down queue.'''
+
+
class Queue:
'''Create a queue object with a given maximum size.
@@ -54,6 +58,9 @@ class Queue:
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
+ # Queue shutdown state
+ self.is_shutdown = False
+
def task_done(self):
'''Indicate that a formerly enqueued task is complete.
@@ -67,6 +74,8 @@ class Queue:
Raises a ValueError if called more times than there were items
placed in the queue.
+
+ Raises ShutDown if the queue has been shut down immediately.
'''
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
@@ -84,6 +93,8 @@ class Queue:
to indicate the item was retrieved and all work on it is complete.
When the count of unfinished tasks drops to zero, join() unblocks.
+
+ Raises ShutDown if the queue has been shut down immediately.
'''
with self.all_tasks_done:
while self.unfinished_tasks:
@@ -129,8 +140,12 @@ class Queue:
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
+
+ Raises ShutDown if the queue has been shut down.
'''
with self.not_full:
+ if self.is_shutdown:
+ raise ShutDown
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
@@ -138,6 +153,8 @@ class Queue:
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
+ if self.is_shutdown:
+ raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
@@ -147,6 +164,8 @@ class Queue:
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
+ if self.is_shutdown:
+ raise ShutDown
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
@@ -161,14 +180,21 @@ class Queue:
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
+
+ Raises ShutDown if the queue has been shut down and is empty,
+ or if the queue has been shut down immediately.
'''
with self.not_empty:
+ if self.is_shutdown and not self._qsize():
+ raise ShutDown
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
+ if self.is_shutdown and not self._qsize():
+ raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
@@ -178,6 +204,8 @@ class Queue:
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
+ if self.is_shutdown and not self._qsize():
+ raise ShutDown
item = self._get()
self.not_full.notify()
return item
@@ -198,6 +226,28 @@ class Queue:
'''
return self.get(block=False)
+ def shutdown(self, immediate=False):
+ '''Shut-down the queue, making queue gets and puts raise.
+
+ By default, gets will only raise once the queue is empty. Set
+ 'immediate' to True to make gets raise immediately instead.
+
+ All blocked callers of put() will be unblocked, and also get()
+ and join() if 'immediate'. The ShutDown exception is raised.
+ '''
+ with self.mutex:
+ self.is_shutdown = True
+ if immediate:
+ n_items = self._qsize()
+ while self._qsize():
+ self._get()
+ if self.unfinished_tasks > 0:
+ self.unfinished_tasks -= 1
+ self.not_empty.notify_all()
+ # release all blocked threads in `join()`
+ self.all_tasks_done.notify_all()
+ self.not_full.notify_all()
+
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held