diff options
Diffstat (limited to 'Lib/Queue.py')
| -rw-r--r-- | Lib/Queue.py | 95 |
1 files changed, 62 insertions, 33 deletions
diff --git a/Lib/Queue.py b/Lib/Queue.py index 79b0abf..773b680 100644 --- a/Lib/Queue.py +++ b/Lib/Queue.py @@ -2,8 +2,9 @@ from time import time as _time from collections import deque +import heapq -__all__ = ['Empty', 'Full', 'Queue'] +__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] class Empty(Exception): "Exception raised by Queue.get(block=0)/get_nowait()." @@ -23,6 +24,7 @@ class Queue: import threading except ImportError: import dummy_threading as threading + self.maxsize = maxsize self._init(maxsize) # mutex must be held whenever the queue is mutating. All methods # that acquire mutex must release it before returning. mutex @@ -60,7 +62,7 @@ class Queue: if unfinished <= 0: if unfinished < 0: raise ValueError('task_done() called too many times') - self.all_tasks_done.notifyAll() + self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished finally: self.all_tasks_done.release() @@ -91,14 +93,14 @@ class Queue: def empty(self): """Return True if the queue is empty, False otherwise (not reliable!).""" self.mutex.acquire() - n = self._empty() + n = not self._qsize() self.mutex.release() return n def full(self): """Return True if the queue is full, False otherwise (not reliable!).""" self.mutex.acquire() - n = self._full() + n = 0 < self.maxsize == self._qsize() self.mutex.release() return n @@ -115,21 +117,22 @@ class Queue: """ self.not_full.acquire() try: - if not block: - if self._full(): - raise Full - elif timeout is None: - while self._full(): - self.not_full.wait() - else: - if timeout < 0: - raise ValueError("'timeout' must be a positive number") - endtime = _time() + timeout - while self._full(): - remaining = endtime - _time() - if remaining <= 0.0: + if self.maxsize > 0: + if not block: + if self._qsize() == self.maxsize: raise Full - self.not_full.wait(remaining) + elif timeout is None: + while self._qsize() == self.maxsize: + self.not_full.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a positive number") + else: + endtime = _time() + timeout + while self._qsize() == self.maxsize: + remaining = endtime - _time() + if remaining <= 0.0: + raise Full + self.not_full.wait(remaining) self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() @@ -158,16 +161,16 @@ class Queue: self.not_empty.acquire() try: if not block: - if self._empty(): + if not self._qsize(): raise Empty elif timeout is None: - while self._empty(): + while not self._qsize(): self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a positive number") else: - if timeout < 0: - raise ValueError("'timeout' must be a positive number") endtime = _time() + timeout - while self._empty(): + while not self._qsize(): remaining = endtime - _time() if remaining <= 0.0: raise Empty @@ -192,20 +195,11 @@ class Queue: # Initialize the queue representation def _init(self, maxsize): - self.maxsize = maxsize self.queue = deque() - def _qsize(self): + def _qsize(self, len=len): return len(self.queue) - # Check whether the queue is empty - def _empty(self): - return not self.queue - - # Check whether the queue is full - def _full(self): - return self.maxsize > 0 and len(self.queue) == self.maxsize - # Put a new item in the queue def _put(self, item): self.queue.append(item) @@ -213,3 +207,38 @@ class Queue: # Get an item from the queue def _get(self): return self.queue.popleft() + + +class PriorityQueue(Queue): + '''Variant of Queue that retrieves open entries in priority order (lowest first). + + Entries are typically tuples of the form: (priority number, data). + ''' + + def _init(self, maxsize): + self.queue = [] + + def _qsize(self, len=len): + return len(self.queue) + + def _put(self, item, heappush=heapq.heappush): + heappush(self.queue, item) + + def _get(self, heappop=heapq.heappop): + return heappop(self.queue) + + +class LifoQueue(Queue): + '''Variant of Queue that retrieves most recently added entries first.''' + + def _init(self, maxsize): + self.queue = [] + + def _qsize(self, len=len): + return len(self.queue) + + def _put(self, item): + self.queue.append(item) + + def _get(self): + return self.queue.pop() |
