diff options
author | Georg Brandl <georg@python.org> | 2008-05-25 07:20:14 (GMT) |
---|---|---|
committer | Georg Brandl <georg@python.org> | 2008-05-25 07:20:14 (GMT) |
commit | a6168f9e0a0ff77910e783df36f5dd4b5dfa7372 (patch) | |
tree | 5424d6e36503bda00ee3a63638737a9973b4a4fb /Lib/Queue.py | |
parent | 8107290fa186bd5efa2a9c158000fd578d228a6c (diff) | |
download | cpython-a6168f9e0a0ff77910e783df36f5dd4b5dfa7372.zip cpython-a6168f9e0a0ff77910e783df36f5dd4b5dfa7372.tar.gz cpython-a6168f9e0a0ff77910e783df36f5dd4b5dfa7372.tar.bz2 |
Queue renaming reversal part 3: move module into place and
change imports and other references. Closes #2925.
Diffstat (limited to 'Lib/Queue.py')
-rw-r--r-- | Lib/Queue.py | 244 |
1 files changed, 244 insertions, 0 deletions
diff --git a/Lib/Queue.py b/Lib/Queue.py new file mode 100644 index 0000000..7b0b328 --- /dev/null +++ b/Lib/Queue.py @@ -0,0 +1,244 @@ +"""A multi-producer, multi-consumer queue.""" + +from time import time as _time +from collections import deque +import heapq + +__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] + +class Empty(Exception): + "Exception raised by Queue.get(block=0)/get_nowait()." + pass + +class Full(Exception): + "Exception raised by Queue.put(block=0)/put_nowait()." + pass + +class Queue: + """Create a queue object with a given maximum size. + + If maxsize is <= 0, the queue size is infinite. + """ + def __init__(self, maxsize=0): + try: + import threading + except ImportError: + import dummy_threading as threading + self.maxsize = maxsize + self._init(maxsize) + # mutex must be held whenever the queue is mutating. All methods + # that acquire mutex must release it before returning. mutex + # is shared between the three conditions, so acquiring and + # releasing the conditions also acquires and releases mutex. + self.mutex = threading.Lock() + # Notify not_empty whenever an item is added to the queue; a + # thread waiting to get is notified then. + self.not_empty = threading.Condition(self.mutex) + # Notify not_full whenever an item is removed from the queue; + # a thread waiting to put is notified then. + self.not_full = threading.Condition(self.mutex) + # Notify all_tasks_done whenever the number of unfinished tasks + # drops to zero; thread waiting to join() is notified to resume + self.all_tasks_done = threading.Condition(self.mutex) + self.unfinished_tasks = 0 + + def task_done(self): + """Indicate that a formerly enqueued task is complete. + + Used by Queue consumer threads. For each get() used to fetch a task, + a subsequent call to task_done() tells the queue that the processing + on the task is complete. + + If a join() is currently blocking, it will resume when all items + have been processed (meaning that a task_done() call was received + for every item that had been put() into the queue). + + Raises a ValueError if called more times than there were items + placed in the queue. + """ + self.all_tasks_done.acquire() + try: + unfinished = self.unfinished_tasks - 1 + if unfinished <= 0: + if unfinished < 0: + raise ValueError('task_done() called too many times') + self.all_tasks_done.notifyAll() + self.unfinished_tasks = unfinished + finally: + self.all_tasks_done.release() + + def join(self): + """Blocks until all items in the Queue have been gotten and processed. + + The count of unfinished tasks goes up whenever an item is added to the + queue. The count goes down whenever a consumer thread calls task_done() + to indicate the item was retrieved and all work on it is complete. + + When the count of unfinished tasks drops to zero, join() unblocks. + """ + self.all_tasks_done.acquire() + try: + while self.unfinished_tasks: + self.all_tasks_done.wait() + finally: + self.all_tasks_done.release() + + def qsize(self): + """Return the approximate size of the queue (not reliable!).""" + self.mutex.acquire() + n = self._qsize() + self.mutex.release() + return n + + def empty(self): + """Return True if the queue is empty, False otherwise (not reliable!).""" + self.mutex.acquire() + n = not self._qsize() + self.mutex.release() + return n + + def full(self): + """Return True if the queue is full, False otherwise (not reliable!).""" + self.mutex.acquire() + n = 0 < self.maxsize == self._qsize() + self.mutex.release() + return n + + def put(self, item, block=True, timeout=None): + """Put an item into the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until a free slot is available. If 'timeout' is + a positive number, it blocks at most 'timeout' seconds and raises + the Full exception if no free slot was available within that time. + Otherwise ('block' is false), put an item on the queue if a free slot + is immediately available, else raise the Full exception ('timeout' + is ignored in that case). + """ + self.not_full.acquire() + try: + if self.maxsize > 0: + if not block: + if self._qsize() == self.maxsize: + raise Full + elif timeout is None: + while self._qsize() == self.maxsize: + self.not_full.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a positive number") + else: + endtime = _time() + timeout + while self._qsize() == self.maxsize: + remaining = endtime - _time() + if remaining <= 0.0: + raise Full + self.not_full.wait(remaining) + self._put(item) + self.unfinished_tasks += 1 + self.not_empty.notify() + finally: + self.not_full.release() + + def put_nowait(self, item): + """Put an item into the queue without blocking. + + Only enqueue the item if a free slot is immediately available. + Otherwise raise the Full exception. + """ + return self.put(item, False) + + def get(self, block=True, timeout=None): + """Remove and return an item from the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until an item is available. If 'timeout' is + a positive number, it blocks at most 'timeout' seconds and raises + the Empty exception if no item was available within that time. + Otherwise ('block' is false), return an item if one is immediately + available, else raise the Empty exception ('timeout' is ignored + in that case). + """ + self.not_empty.acquire() + try: + if not block: + if not self._qsize(): + raise Empty + elif timeout is None: + while not self._qsize(): + self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a positive number") + else: + endtime = _time() + timeout + while not self._qsize(): + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + item = self._get() + self.not_full.notify() + return item + finally: + self.not_empty.release() + + def get_nowait(self): + """Remove and return an item from the queue without blocking. + + Only get an item if one is immediately available. Otherwise + raise the Empty exception. + """ + return self.get(False) + + # Override these methods to implement other queue organizations + # (e.g. stack or priority queue). + # These will only be called with appropriate locks held + + # Initialize the queue representation + def _init(self, maxsize): + self.queue = deque() + + def _qsize(self, len=len): + return len(self.queue) + + # Put a new item in the queue + def _put(self, item): + self.queue.append(item) + + # Get an item from the queue + def _get(self): + return self.queue.popleft() + + +class PriorityQueue(Queue): + '''Variant of Queue that retrieves open entries in priority order (lowest first). + + Entries are typically tuples of the form: (priority number, data). + ''' + + def _init(self, maxsize): + self.queue = [] + + def _qsize(self, len=len): + return len(self.queue) + + def _put(self, item, heappush=heapq.heappush): + heappush(self.queue, item) + + def _get(self, heappop=heapq.heappop): + return heappop(self.queue) + + +class LifoQueue(Queue): + '''Variant of Queue that retrieves most recently added entries first.''' + + def _init(self, maxsize): + self.queue = [] + + def _qsize(self, len=len): + return len(self.queue) + + def _put(self, item): + self.queue.append(item) + + def _get(self): + return self.queue.pop() |