From 30ece44f2e5397e8501380349fd5278e6f64f555 Mon Sep 17 00:00:00 2001 From: Alexandre Vassalotti Date: Sun, 11 May 2008 19:39:48 +0000 Subject: Added stub for the Queue module to be renamed in 3.0. Use the 3.0 module name to avoid spurious warnings. --- Lib/Queue.py | 248 +----------------------------------------- Lib/idlelib/rpc.py | 8 +- Lib/idlelib/run.py | 4 +- Lib/queue.py | 244 +++++++++++++++++++++++++++++++++++++++++ Lib/test/test_dummy_thread.py | 6 +- Lib/test/test_queue.py | 26 ++--- Lib/test/test_socket.py | 4 +- Tools/webchecker/wsgui.py | 4 +- 8 files changed, 276 insertions(+), 268 deletions(-) create mode 100644 Lib/queue.py diff --git a/Lib/Queue.py b/Lib/Queue.py index 7b0b328..e358a04 100644 --- a/Lib/Queue.py +++ b/Lib/Queue.py @@ -1,244 +1,8 @@ -"""A multi-producer, multi-consumer queue.""" +import sys +from warnings import warnpy3k -from time import time as _time -from collections import deque -import heapq +warnpy3k("the Queue module has been renamed " + "to 'queue' in Python 3.0", stacklevel=2) -__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] - -class Empty(Exception): - "Exception raised by Queue.get(block=0)/get_nowait()." - pass - -class Full(Exception): - "Exception raised by Queue.put(block=0)/put_nowait()." - pass - -class Queue: - """Create a queue object with a given maximum size. - - If maxsize is <= 0, the queue size is infinite. - """ - def __init__(self, maxsize=0): - try: - import threading - except ImportError: - import dummy_threading as threading - self.maxsize = maxsize - self._init(maxsize) - # mutex must be held whenever the queue is mutating. All methods - # that acquire mutex must release it before returning. mutex - # is shared between the three conditions, so acquiring and - # releasing the conditions also acquires and releases mutex. - self.mutex = threading.Lock() - # Notify not_empty whenever an item is added to the queue; a - # thread waiting to get is notified then. - self.not_empty = threading.Condition(self.mutex) - # Notify not_full whenever an item is removed from the queue; - # a thread waiting to put is notified then. - self.not_full = threading.Condition(self.mutex) - # Notify all_tasks_done whenever the number of unfinished tasks - # drops to zero; thread waiting to join() is notified to resume - self.all_tasks_done = threading.Condition(self.mutex) - self.unfinished_tasks = 0 - - def task_done(self): - """Indicate that a formerly enqueued task is complete. - - Used by Queue consumer threads. For each get() used to fetch a task, - a subsequent call to task_done() tells the queue that the processing - on the task is complete. - - If a join() is currently blocking, it will resume when all items - have been processed (meaning that a task_done() call was received - for every item that had been put() into the queue). - - Raises a ValueError if called more times than there were items - placed in the queue. - """ - self.all_tasks_done.acquire() - try: - unfinished = self.unfinished_tasks - 1 - if unfinished <= 0: - if unfinished < 0: - raise ValueError('task_done() called too many times') - self.all_tasks_done.notifyAll() - self.unfinished_tasks = unfinished - finally: - self.all_tasks_done.release() - - def join(self): - """Blocks until all items in the Queue have been gotten and processed. - - The count of unfinished tasks goes up whenever an item is added to the - queue. The count goes down whenever a consumer thread calls task_done() - to indicate the item was retrieved and all work on it is complete. - - When the count of unfinished tasks drops to zero, join() unblocks. - """ - self.all_tasks_done.acquire() - try: - while self.unfinished_tasks: - self.all_tasks_done.wait() - finally: - self.all_tasks_done.release() - - def qsize(self): - """Return the approximate size of the queue (not reliable!).""" - self.mutex.acquire() - n = self._qsize() - self.mutex.release() - return n - - def empty(self): - """Return True if the queue is empty, False otherwise (not reliable!).""" - self.mutex.acquire() - n = not self._qsize() - self.mutex.release() - return n - - def full(self): - """Return True if the queue is full, False otherwise (not reliable!).""" - self.mutex.acquire() - n = 0 < self.maxsize == self._qsize() - self.mutex.release() - return n - - def put(self, item, block=True, timeout=None): - """Put an item into the queue. - - If optional args 'block' is true and 'timeout' is None (the default), - block if necessary until a free slot is available. If 'timeout' is - a positive number, it blocks at most 'timeout' seconds and raises - the Full exception if no free slot was available within that time. - Otherwise ('block' is false), put an item on the queue if a free slot - is immediately available, else raise the Full exception ('timeout' - is ignored in that case). - """ - self.not_full.acquire() - try: - if self.maxsize > 0: - if not block: - if self._qsize() == self.maxsize: - raise Full - elif timeout is None: - while self._qsize() == self.maxsize: - self.not_full.wait() - elif timeout < 0: - raise ValueError("'timeout' must be a positive number") - else: - endtime = _time() + timeout - while self._qsize() == self.maxsize: - remaining = endtime - _time() - if remaining <= 0.0: - raise Full - self.not_full.wait(remaining) - self._put(item) - self.unfinished_tasks += 1 - self.not_empty.notify() - finally: - self.not_full.release() - - def put_nowait(self, item): - """Put an item into the queue without blocking. - - Only enqueue the item if a free slot is immediately available. - Otherwise raise the Full exception. - """ - return self.put(item, False) - - def get(self, block=True, timeout=None): - """Remove and return an item from the queue. - - If optional args 'block' is true and 'timeout' is None (the default), - block if necessary until an item is available. If 'timeout' is - a positive number, it blocks at most 'timeout' seconds and raises - the Empty exception if no item was available within that time. - Otherwise ('block' is false), return an item if one is immediately - available, else raise the Empty exception ('timeout' is ignored - in that case). - """ - self.not_empty.acquire() - try: - if not block: - if not self._qsize(): - raise Empty - elif timeout is None: - while not self._qsize(): - self.not_empty.wait() - elif timeout < 0: - raise ValueError("'timeout' must be a positive number") - else: - endtime = _time() + timeout - while not self._qsize(): - remaining = endtime - _time() - if remaining <= 0.0: - raise Empty - self.not_empty.wait(remaining) - item = self._get() - self.not_full.notify() - return item - finally: - self.not_empty.release() - - def get_nowait(self): - """Remove and return an item from the queue without blocking. - - Only get an item if one is immediately available. Otherwise - raise the Empty exception. - """ - return self.get(False) - - # Override these methods to implement other queue organizations - # (e.g. stack or priority queue). - # These will only be called with appropriate locks held - - # Initialize the queue representation - def _init(self, maxsize): - self.queue = deque() - - def _qsize(self, len=len): - return len(self.queue) - - # Put a new item in the queue - def _put(self, item): - self.queue.append(item) - - # Get an item from the queue - def _get(self): - return self.queue.popleft() - - -class PriorityQueue(Queue): - '''Variant of Queue that retrieves open entries in priority order (lowest first). - - Entries are typically tuples of the form: (priority number, data). - ''' - - def _init(self, maxsize): - self.queue = [] - - def _qsize(self, len=len): - return len(self.queue) - - def _put(self, item, heappush=heapq.heappush): - heappush(self.queue, item) - - def _get(self, heappop=heapq.heappop): - return heappop(self.queue) - - -class LifoQueue(Queue): - '''Variant of Queue that retrieves most recently added entries first.''' - - def _init(self, maxsize): - self.queue = [] - - def _qsize(self, len=len): - return len(self.queue) - - def _put(self, item): - self.queue.append(item) - - def _get(self): - return self.queue.pop() +import queue +sys.modules[__name__] = queue diff --git a/Lib/idlelib/rpc.py b/Lib/idlelib/rpc.py index 52a2eaf..7d36b2a 100644 --- a/Lib/idlelib/rpc.py +++ b/Lib/idlelib/rpc.py @@ -35,7 +35,7 @@ import SocketServer import struct import cPickle as pickle import threading -import Queue +import queue import traceback import copyreg import types @@ -117,8 +117,8 @@ class RPCServer(SocketServer.TCPServer): #----------------- end class RPCServer -------------------- objecttable = {} -request_queue = Queue.Queue(0) -response_queue = Queue.Queue(0) +request_queue = queue.Queue(0) +response_queue = queue.Queue(0) class SocketIO(object): @@ -413,7 +413,7 @@ class SocketIO(object): # send queued response if there is one available try: qmsg = response_queue.get(0) - except Queue.Empty: + except queue.Empty: pass else: seq, response = qmsg diff --git a/Lib/idlelib/run.py b/Lib/idlelib/run.py index 7827c74..8abbe50 100644 --- a/Lib/idlelib/run.py +++ b/Lib/idlelib/run.py @@ -5,7 +5,7 @@ import socket import traceback import thread import threading -import Queue +import queue import CallTips import AutoComplete @@ -85,7 +85,7 @@ def main(del_exitfunc=False): continue try: seq, request = rpc.request_queue.get(block=True, timeout=0.05) - except Queue.Empty: + except queue.Empty: continue method, args, kwargs = request ret = method(*args, **kwargs) diff --git a/Lib/queue.py b/Lib/queue.py new file mode 100644 index 0000000..7b0b328 --- /dev/null +++ b/Lib/queue.py @@ -0,0 +1,244 @@ +"""A multi-producer, multi-consumer queue.""" + +from time import time as _time +from collections import deque +import heapq + +__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] + +class Empty(Exception): + "Exception raised by Queue.get(block=0)/get_nowait()." + pass + +class Full(Exception): + "Exception raised by Queue.put(block=0)/put_nowait()." + pass + +class Queue: + """Create a queue object with a given maximum size. + + If maxsize is <= 0, the queue size is infinite. + """ + def __init__(self, maxsize=0): + try: + import threading + except ImportError: + import dummy_threading as threading + self.maxsize = maxsize + self._init(maxsize) + # mutex must be held whenever the queue is mutating. All methods + # that acquire mutex must release it before returning. mutex + # is shared between the three conditions, so acquiring and + # releasing the conditions also acquires and releases mutex. + self.mutex = threading.Lock() + # Notify not_empty whenever an item is added to the queue; a + # thread waiting to get is notified then. + self.not_empty = threading.Condition(self.mutex) + # Notify not_full whenever an item is removed from the queue; + # a thread waiting to put is notified then. + self.not_full = threading.Condition(self.mutex) + # Notify all_tasks_done whenever the number of unfinished tasks + # drops to zero; thread waiting to join() is notified to resume + self.all_tasks_done = threading.Condition(self.mutex) + self.unfinished_tasks = 0 + + def task_done(self): + """Indicate that a formerly enqueued task is complete. + + Used by Queue consumer threads. For each get() used to fetch a task, + a subsequent call to task_done() tells the queue that the processing + on the task is complete. + + If a join() is currently blocking, it will resume when all items + have been processed (meaning that a task_done() call was received + for every item that had been put() into the queue). + + Raises a ValueError if called more times than there were items + placed in the queue. + """ + self.all_tasks_done.acquire() + try: + unfinished = self.unfinished_tasks - 1 + if unfinished <= 0: + if unfinished < 0: + raise ValueError('task_done() called too many times') + self.all_tasks_done.notifyAll() + self.unfinished_tasks = unfinished + finally: + self.all_tasks_done.release() + + def join(self): + """Blocks until all items in the Queue have been gotten and processed. + + The count of unfinished tasks goes up whenever an item is added to the + queue. The count goes down whenever a consumer thread calls task_done() + to indicate the item was retrieved and all work on it is complete. + + When the count of unfinished tasks drops to zero, join() unblocks. + """ + self.all_tasks_done.acquire() + try: + while self.unfinished_tasks: + self.all_tasks_done.wait() + finally: + self.all_tasks_done.release() + + def qsize(self): + """Return the approximate size of the queue (not reliable!).""" + self.mutex.acquire() + n = self._qsize() + self.mutex.release() + return n + + def empty(self): + """Return True if the queue is empty, False otherwise (not reliable!).""" + self.mutex.acquire() + n = not self._qsize() + self.mutex.release() + return n + + def full(self): + """Return True if the queue is full, False otherwise (not reliable!).""" + self.mutex.acquire() + n = 0 < self.maxsize == self._qsize() + self.mutex.release() + return n + + def put(self, item, block=True, timeout=None): + """Put an item into the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until a free slot is available. If 'timeout' is + a positive number, it blocks at most 'timeout' seconds and raises + the Full exception if no free slot was available within that time. + Otherwise ('block' is false), put an item on the queue if a free slot + is immediately available, else raise the Full exception ('timeout' + is ignored in that case). + """ + self.not_full.acquire() + try: + if self.maxsize > 0: + if not block: + if self._qsize() == self.maxsize: + raise Full + elif timeout is None: + while self._qsize() == self.maxsize: + self.not_full.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a positive number") + else: + endtime = _time() + timeout + while self._qsize() == self.maxsize: + remaining = endtime - _time() + if remaining <= 0.0: + raise Full + self.not_full.wait(remaining) + self._put(item) + self.unfinished_tasks += 1 + self.not_empty.notify() + finally: + self.not_full.release() + + def put_nowait(self, item): + """Put an item into the queue without blocking. + + Only enqueue the item if a free slot is immediately available. + Otherwise raise the Full exception. + """ + return self.put(item, False) + + def get(self, block=True, timeout=None): + """Remove and return an item from the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until an item is available. If 'timeout' is + a positive number, it blocks at most 'timeout' seconds and raises + the Empty exception if no item was available within that time. + Otherwise ('block' is false), return an item if one is immediately + available, else raise the Empty exception ('timeout' is ignored + in that case). + """ + self.not_empty.acquire() + try: + if not block: + if not self._qsize(): + raise Empty + elif timeout is None: + while not self._qsize(): + self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a positive number") + else: + endtime = _time() + timeout + while not self._qsize(): + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + item = self._get() + self.not_full.notify() + return item + finally: + self.not_empty.release() + + def get_nowait(self): + """Remove and return an item from the queue without blocking. + + Only get an item if one is immediately available. Otherwise + raise the Empty exception. + """ + return self.get(False) + + # Override these methods to implement other queue organizations + # (e.g. stack or priority queue). + # These will only be called with appropriate locks held + + # Initialize the queue representation + def _init(self, maxsize): + self.queue = deque() + + def _qsize(self, len=len): + return len(self.queue) + + # Put a new item in the queue + def _put(self, item): + self.queue.append(item) + + # Get an item from the queue + def _get(self): + return self.queue.popleft() + + +class PriorityQueue(Queue): + '''Variant of Queue that retrieves open entries in priority order (lowest first). + + Entries are typically tuples of the form: (priority number, data). + ''' + + def _init(self, maxsize): + self.queue = [] + + def _qsize(self, len=len): + return len(self.queue) + + def _put(self, item, heappush=heapq.heappush): + heappush(self.queue, item) + + def _get(self, heappop=heapq.heappop): + return heappop(self.queue) + + +class LifoQueue(Queue): + '''Variant of Queue that retrieves most recently added entries first.''' + + def _init(self, maxsize): + self.queue = [] + + def _qsize(self, len=len): + return len(self.queue) + + def _put(self, item): + self.queue.append(item) + + def _get(self): + return self.queue.pop() diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py index f274e0a..fa87af9 100644 --- a/Lib/test/test_dummy_thread.py +++ b/Lib/test/test_dummy_thread.py @@ -7,7 +7,7 @@ implementation as its sole argument. """ import dummy_thread as _thread import time -import Queue +import queue import random import unittest from test import test_support @@ -124,7 +124,7 @@ class ThreadTests(unittest.TestCase): """Use to test _thread.start_new_thread() passes args properly.""" queue.put((arg1, arg2)) - testing_queue = Queue.Queue(1) + testing_queue = queue.Queue(1) _thread.start_new_thread(arg_tester, (testing_queue, True, True)) result = testing_queue.get() self.failUnless(result[0] and result[1], @@ -148,7 +148,7 @@ class ThreadTests(unittest.TestCase): queue.put(_thread.get_ident()) thread_count = 5 - testing_queue = Queue.Queue(thread_count) + testing_queue = queue.Queue(thread_count) if test_support.verbose: print print "*** Testing multiple thread creation "\ diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index fdee1fa..7cc9329 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -1,6 +1,6 @@ -# Some simple Queue module tests, plus some failure conditions +# Some simple queue module tests, plus some failure conditions # to ensure the Queue locks remain stable. -import Queue +import queue import sys import threading import time @@ -107,12 +107,12 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin): try: q.put("full", block=0) self.fail("Didn't appear to block with a full queue") - except Queue.Full: + except queue.Full: pass try: q.put("full", timeout=0.01) self.fail("Didn't appear to time-out with a full queue") - except Queue.Full: + except queue.Full: pass # Test a blocking put self.do_blocking_test(q.put, ("full",), q.get, ()) @@ -124,12 +124,12 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin): try: q.get(block=0) self.fail("Didn't appear to block with an empty queue") - except Queue.Empty: + except queue.Empty: pass try: q.get(timeout=0.01) self.fail("Didn't appear to time-out with an empty queue") - except Queue.Empty: + except queue.Empty: pass # Test a blocking get self.do_blocking_test(q.get, (), q.put, ('empty',)) @@ -191,13 +191,13 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin): class QueueTest(BaseQueueTest): - type2test = Queue.Queue + type2test = queue.Queue class LifoQueueTest(BaseQueueTest): - type2test = Queue.LifoQueue + type2test = queue.LifoQueue class PriorityQueueTest(BaseQueueTest): - type2test = Queue.PriorityQueue + type2test = queue.PriorityQueue @@ -205,21 +205,21 @@ class PriorityQueueTest(BaseQueueTest): class FailingQueueException(Exception): pass -class FailingQueue(Queue.Queue): +class FailingQueue(queue.Queue): def __init__(self, *args): self.fail_next_put = False self.fail_next_get = False - Queue.Queue.__init__(self, *args) + queue.Queue.__init__(self, *args) def _put(self, item): if self.fail_next_put: self.fail_next_put = False raise FailingQueueException, "You Lose" - return Queue.Queue._put(self, item) + return queue.Queue._put(self, item) def _get(self): if self.fail_next_get: self.fail_next_get = False raise FailingQueueException, "You Lose" - return Queue.Queue._get(self) + return queue.Queue._get(self) class FailingQueueTest(unittest.TestCase, BlockingTestMixin): diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 48c9346..81ef6d3 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -9,7 +9,7 @@ import select import thread, threading import time import traceback -import Queue +import queue import sys import os import array @@ -96,7 +96,7 @@ class ThreadableTest: self.server_ready = threading.Event() self.client_ready = threading.Event() self.done = threading.Event() - self.queue = Queue.Queue(1) + self.queue = queue.Queue(1) # Do some munging to start the client test. methodname = self.id() diff --git a/Tools/webchecker/wsgui.py b/Tools/webchecker/wsgui.py index 95c7ab9..b2223c4 100755 --- a/Tools/webchecker/wsgui.py +++ b/Tools/webchecker/wsgui.py @@ -10,7 +10,7 @@ from Tkinter import * import websucker import os import threading -import Queue +import queue import time VERBOSE = 2 @@ -139,7 +139,7 @@ class App: def go(self, event=None): if not self.msgq: - self.msgq = Queue.Queue(0) + self.msgq = queue.Queue(0) self.check_msgq() if not self.sucker: self.sucker = SuckerThread(self.msgq) -- cgit v0.12