From fd3fcf0b35a479c3df4999d9bad2337a5e3af140 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Fri, 24 Mar 2006 20:43:29 +0000 Subject: SF Patch #1455676: Simplify using Queues with daemon consumer threads Adds join() and task_done() methods to track when all enqueued tasks have been gotten and fully processed by daemon consumer threads. --- Doc/lib/libqueue.tex | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ Lib/Queue.py | 45 +++++++++++++++++++++++++++++++++++++++++++++ Lib/test/test_queue.py | 30 ++++++++++++++++++++++++++++++ Misc/NEWS | 4 ++++ 4 files changed, 128 insertions(+) diff --git a/Doc/lib/libqueue.tex b/Doc/lib/libqueue.tex index f1d892a..95ad47f 100644 --- a/Doc/lib/libqueue.tex +++ b/Doc/lib/libqueue.tex @@ -1,3 +1,4 @@ + \section{\module{Queue} --- A synchronized queue class} @@ -94,3 +95,51 @@ immediately available, else raise the \exception{Empty} exception \begin{methoddesc}{get_nowait}{} Equivalent to \code{get(False)}. \end{methoddesc} + +Two methods are offered to support tracking whether enqueued tasks have +been fully processed by daemon consumer threads. + +\begin{methoddesc}{task_done}{} +Indicate that a formerly enqueued task is complete. Used by queue consumer +threads. For each \method{get()} used to fetch a task, a subsequent call to +\method{task_done()} tells the queue that the processing on the task is complete. + +If a \method{join()} is currently blocking, it will resume when all items +have been processed (meaning that a \method{task_done()} call was received +for every item that had been \method{put()} into the queue). + +Raises a \exception{ValueError} if called more times than there were items +placed in the queue. +\versionadded{2.5} +\end{methoddesc} + +\begin{methoddesc}{join}{} +Blocks until all items in the queue have been gotten and processed. + +The count of unfinished tasks goes up whenever an item is added to the +queue. The count goes down whenever a consumer thread calls \method{task_done()} +to indicate that the item was retrieved and all work on it is complete. +When the count of unfinished tasks drops to zero, join() unblocks. +\versionadded{2.5} +\end{methoddesc} + +Example of how to wait for enqueued tasks to be completed: + +\begin{verbatim} + def worker(): + while True: + item = q.get() + do_work(item) + q.task_done() + + q = Queue() + for i in range(num_worker_threads): + t = Thread(target=worker) + t.setDaemon(True) + t.start() + + for item in source(): + q.put(item) + + q.join() # block until all tasks are done +\end{verbatim} diff --git a/Lib/Queue.py b/Lib/Queue.py index c6c608b..285cd17 100644 --- a/Lib/Queue.py +++ b/Lib/Queue.py @@ -35,6 +35,50 @@ class Queue: # Notify not_full whenever an item is removed from the queue; # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex) + # Notify all_tasks_done whenever the number of unfinished tasks + # drops to zero; thread waiting to join() is notified to resume + self.all_tasks_done = threading.Condition(self.mutex) + self.unfinished_tasks = 0 + + def task_done(self): + """Indicate that a formerly enqueued task is complete. + + Used by Queue consumer threads. For each get() used to fetch a task, + a subsequent call to task_done() tells the queue that the processing + on the task is complete. + + If a join() is currently blocking, it will resume when all items + have been processed (meaning that a task_done() call was received + for every item that had been put() into the queue). + + Raises a ValueError if called more times than there were items + placed in the queue. + """ + self.all_tasks_done.acquire() + try: + self.unfinished_tasks = unfinished = self.unfinished_tasks - 1 + if unfinished <= 0: + if unfinished < 0: + raise ValueError('task_done() called too many times') + self.all_tasks_done.notifyAll() + finally: + self.all_tasks_done.release() + + def join(self): + """Blocks until all items in the Queue have been gotten and processed. + + The count of unfinished tasks goes up whenever an item is added to the + queue. The count goes down whenever a consumer thread calls task_done() + to indicate the item was retrieved and all work on it is complete. + + When the count of unfinished tasks drops to zero, join() unblocks. + """ + self.all_tasks_done.acquire() + try: + while self.unfinished_tasks: + self.all_tasks_done.wait() + finally: + self.all_tasks_done.release() def qsize(self): """Return the approximate size of the queue (not reliable!).""" @@ -86,6 +130,7 @@ class Queue: raise Full self.not_full.wait(remaining) self._put(item) + self.unfinished_tasks += 1 self.not_empty.notify() finally: self.not_full.release() diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index b55dd01..17c1def 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -221,7 +221,37 @@ def SimpleQueueTest(q): _doBlockingTest(q.get, (), q.put, ('empty',)) _doBlockingTest(q.get, (True, 10), q.put, ('empty',)) +cum = 0 +cumlock = threading.Lock() + +def worker(q): + global cum + while True: + x = q.get() + cumlock.acquire() + try: + cum += x + finally: + cumlock.release() + q.task_done() + +def QueueJoinTest(q): + global cum + cum = 0 + for i in (0,1): + t = threading.Thread(target=worker, args=(q,)) + t.setDaemon(True) + t.start() + for i in xrange(100): + q.put(i) + q.join() + verify(cum==sum(range(100)), "q.join() did not block until all tasks were done") + def test(): + q = Queue.Queue() + QueueJoinTest(q) + QueueJoinTest(q) + q = Queue.Queue(QUEUE_SIZE) # Do it a couple of times on the same queue SimpleQueueTest(q) diff --git a/Misc/NEWS b/Misc/NEWS index ba078b2..1d75424 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -483,6 +483,10 @@ Extension Modules Library ------- +- Queue.Queue objects now support .task_done() and .join() methods + to make it easier to monitor when daemon threads have completed + processing all enqueued tasks. Patch #1455676. + - popen2.Popen objects now preserve the command in a .cmd attribute. - Added the ctypes ffi package. -- cgit v0.12