diff options
-rw-r--r-- | Doc/lib/libqueue.tex | 6 | ||||
-rw-r--r-- | Lib/Queue.py | 147 | ||||
-rw-r--r-- | Misc/NEWS | 11 |
3 files changed, 76 insertions, 88 deletions
diff --git a/Doc/lib/libqueue.tex b/Doc/lib/libqueue.tex index 52c1125..f1d892a 100644 --- a/Doc/lib/libqueue.tex +++ b/Doc/lib/libqueue.tex @@ -26,13 +26,13 @@ zero, the queue size is infinite. \begin{excdesc}{Empty} Exception raised when non-blocking \method{get()} (or \method{get_nowait()}) is called on a \class{Queue} object which is -empty or locked. +empty. \end{excdesc} \begin{excdesc}{Full} Exception raised when non-blocking \method{put()} (or \method{put_nowait()}) is called on a \class{Queue} object which is -full or locked. +full. \end{excdesc} \subsection{Queue Objects} @@ -51,7 +51,7 @@ semantics, this number is not reliable. \begin{methoddesc}{empty}{} Return \code{True} if the queue is empty, \code{False} otherwise. -Becauseof multithreading semantics, this is not reliable. +Because of multithreading semantics, this is not reliable. \end{methoddesc} \begin{methoddesc}{full}{} diff --git a/Lib/Queue.py b/Lib/Queue.py index 44c9ca3..65d1eea 100644 --- a/Lib/Queue.py +++ b/Lib/Queue.py @@ -1,6 +1,6 @@ """A multi-producer, multi-consumer queue.""" -from time import time as _time, sleep as _sleep +from time import time as _time from collections import deque __all__ = ['Empty', 'Full', 'Queue'] @@ -20,14 +20,21 @@ class Queue: If maxsize is <= 0, the queue size is infinite. """ try: - import thread + import threading except ImportError: - import dummy_thread as thread + import dummy_threading as threading self._init(maxsize) - self.mutex = thread.allocate_lock() - self.esema = thread.allocate_lock() - self.esema.acquire() - self.fsema = thread.allocate_lock() + # mutex must be held whenever the queue is mutating. All methods + # that acquire mutex must release it before returning. mutex + # is shared between the two conditions, so acquiring and + # releasing the conditions also acquires and releases mutex. + self.mutex = threading.Lock() + # Notify not_empty whenever an item is added to the queue; a + # thread waiting to get is notified then. + self.not_empty = threading.Condition(self.mutex) + # Notify not_full whenever an item is removed from the queue; + # a thread waiting to put is notified then. + self.not_full = threading.Condition(self.mutex) def qsize(self): """Return the approximate size of the queue (not reliable!).""" @@ -61,51 +68,26 @@ class Queue: is immediately available, else raise the Full exception ('timeout' is ignored in that case). """ - if block: + if not block: + return self.put_nowait(item) + self.not_full.acquire() + try: if timeout is None: - # blocking, w/o timeout, i.e. forever - self.fsema.acquire() - elif timeout >= 0: - # waiting max. 'timeout' seconds. - # this code snipped is from threading.py: _Event.wait(): - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). - delay = 0.0005 # 500 us -> initial delay of 1 ms + while self._full(): + self.not_full.wait() + else: + if timeout < 0: + raise ValueError("'timeout' must be a positive number") endtime = _time() + timeout - while True: - if self.fsema.acquire(0): - break + while self._full(): remaining = endtime - _time() - if remaining <= 0: #time is over and no slot was free + if remaining < 0.0: raise Full - delay = min(delay * 2, remaining, .05) - _sleep(delay) #reduce CPU usage by using a sleep - else: - raise ValueError("'timeout' must be a positive number") - elif not self.fsema.acquire(0): - raise Full - self.mutex.acquire() - release_fsema = True - try: - was_empty = self._empty() + self.not_full.wait(remaining) self._put(item) - # If we fail before here, the empty state has - # not changed, so we can skip the release of esema - if was_empty: - self.esema.release() - # If we fail before here, the queue can not be full, so - # release_full_sema remains True - release_fsema = not self._full() + self.not_empty.notify() finally: - # Catching system level exceptions here (RecursionDepth, - # OutOfMemory, etc) - so do as little as possible in terms - # of Python calls. - if release_fsema: - self.fsema.release() - self.mutex.release() + self.not_full.release() def put_nowait(self, item): """Put an item into the queue without blocking. @@ -113,7 +95,15 @@ class Queue: Only enqueue the item if a free slot is immediately available. Otherwise raise the Full exception. """ - return self.put(item, False) + self.not_full.acquire() + try: + if self._full(): + raise Full + else: + self._put(item) + self.not_empty.notify() + finally: + self.not_full.release() def get(self, block=True, timeout=None): """Remove and return an item from the queue. @@ -126,49 +116,27 @@ class Queue: available, else raise the Empty exception ('timeout' is ignored in that case). """ - if block: + if not block: + return self.get_nowait() + self.not_empty.acquire() + try: if timeout is None: - # blocking, w/o timeout, i.e. forever - self.esema.acquire() - elif timeout >= 0: - # waiting max. 'timeout' seconds. - # this code snipped is from threading.py: _Event.wait(): - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). - delay = 0.0005 # 500 us -> initial delay of 1 ms + while self._empty(): + self.not_empty.wait() + else: + if timeout < 0: + raise ValueError("'timeout' must be a positive number") endtime = _time() + timeout - while 1: - if self.esema.acquire(0): - break + while self._empty(): remaining = endtime - _time() - if remaining <= 0: #time is over and no element arrived + if remaining < 0.0: raise Empty - delay = min(delay * 2, remaining, .05) - _sleep(delay) #reduce CPU usage by using a sleep - else: - raise ValueError("'timeout' must be a positive number") - elif not self.esema.acquire(0): - raise Empty - self.mutex.acquire() - release_esema = True - try: - was_full = self._full() + self.not_empty.wait(remaining) item = self._get() - # If we fail before here, the full state has - # not changed, so we can skip the release of fsema - if was_full: - self.fsema.release() - # Failure means empty state also unchanged - release_esema - # remains True. - release_esema = not self._empty() + self.not_full.notify() + return item finally: - if release_esema: - self.esema.release() - self.mutex.release() - return item + self.not_empty.release() def get_nowait(self): """Remove and return an item from the queue without blocking. @@ -176,7 +144,16 @@ class Queue: Only get an item if one is immediately available. Otherwise raise the Empty exception. """ - return self.get(False) + self.not_empty.acquire() + try: + if self._empty(): + raise Empty + else: + item = self._get() + self.not_full.notify() + return item + finally: + self.not_empty.release() # Override these methods to implement other queue organizations # (e.g. stack or priority queue). @@ -29,6 +29,17 @@ Extension modules Library ------- +- Bug #788520. Queue.{get, get_nowait, put, put_nowait} have new + implementations, exploiting Conditions (which didn't exist at the time + Queue was introduced). A minor semantic change is that the Full and + Empty exceptions raised by non-blocking calls now occur only if the + queue truly was full or empty at the instant the queue was checked (of + course the Queue may no longer be full or empty by the time a calling + thread sees those exceptions, though). Before, the exceptions could + also be raised if it was "merely inconvenient" for the implementation + to determine the true state of the Queue (because the Queue was locked + by some other method in progress). + - Bugs #979794 and #980117: difflib.get_grouped_opcodes() now handles the case of comparing two empty lists. This affected both context_diff() and unified_diff(), |