diff options
Diffstat (limited to 'Lib/Queue.py')
-rw-r--r-- | Lib/Queue.py | 45 |
1 files changed, 45 insertions, 0 deletions
diff --git a/Lib/Queue.py b/Lib/Queue.py index c6c608b..285cd17 100644 --- a/Lib/Queue.py +++ b/Lib/Queue.py @@ -35,6 +35,50 @@ class Queue: # Notify not_full whenever an item is removed from the queue; # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex) + # Notify all_tasks_done whenever the number of unfinished tasks + # drops to zero; thread waiting to join() is notified to resume + self.all_tasks_done = threading.Condition(self.mutex) + self.unfinished_tasks = 0 + + def task_done(self): + """Indicate that a formerly enqueued task is complete. + + Used by Queue consumer threads. For each get() used to fetch a task, + a subsequent call to task_done() tells the queue that the processing + on the task is complete. + + If a join() is currently blocking, it will resume when all items + have been processed (meaning that a task_done() call was received + for every item that had been put() into the queue). + + Raises a ValueError if called more times than there were items + placed in the queue. + """ + self.all_tasks_done.acquire() + try: + self.unfinished_tasks = unfinished = self.unfinished_tasks - 1 + if unfinished <= 0: + if unfinished < 0: + raise ValueError('task_done() called too many times') + self.all_tasks_done.notifyAll() + finally: + self.all_tasks_done.release() + + def join(self): + """Blocks until all items in the Queue have been gotten and processed. + + The count of unfinished tasks goes up whenever an item is added to the + queue. The count goes down whenever a consumer thread calls task_done() + to indicate the item was retrieved and all work on it is complete. + + When the count of unfinished tasks drops to zero, join() unblocks. + """ + self.all_tasks_done.acquire() + try: + while self.unfinished_tasks: + self.all_tasks_done.wait() + finally: + self.all_tasks_done.release() def qsize(self): """Return the approximate size of the queue (not reliable!).""" @@ -86,6 +130,7 @@ class Queue: raise Full self.not_full.wait(remaining) self._put(item) + self.unfinished_tasks += 1 self.not_empty.notify() finally: self.not_full.release() |