summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_queue.py
diff options
context:
space:
mode:
authorGeorg Brandl <georg@python.org>2008-02-02 11:39:29 (GMT)
committerGeorg Brandl <georg@python.org>2008-02-02 11:39:29 (GMT)
commitd22b4661fdea6bd4aa4375976b058ad534575f1f (patch)
tree1adec6321a21b52ce0e3872113a7eef87bafb818 /Lib/test/test_queue.py
parent593b77cb5a40b4613317fbc765f4d22310661c3b (diff)
downloadcpython-d22b4661fdea6bd4aa4375976b058ad534575f1f.zip
cpython-d22b4661fdea6bd4aa4375976b058ad534575f1f.tar.gz
cpython-d22b4661fdea6bd4aa4375976b058ad534575f1f.tar.bz2
Rewrite test_queue as unittest. Written for GHOP by Ian Seyer.
Diffstat (limited to 'Lib/test/test_queue.py')
-rw-r--r--Lib/test/test_queue.py498
1 files changed, 267 insertions, 231 deletions
diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py
index 2a76cda..7db10c8 100644
--- a/Lib/test/test_queue.py
+++ b/Lib/test/test_queue.py
@@ -4,8 +4,8 @@ import Queue
import sys
import threading
import time
-
-from test.test_support import verify, TestFailed, verbose
+import unittest
+from test import test_support
QUEUE_SIZE = 5
@@ -30,50 +30,179 @@ class _TriggerThread(threading.Thread):
self.startedEvent.set()
self.fn(*self.args)
+
# Execute a function that blocks, and in a separate thread, a function that
-# triggers the release. Returns the result of the blocking function.
-# Caution: block_func must guarantee to block until trigger_func is
-# called, and trigger_func must guarantee to change queue state so that
-# block_func can make enough progress to return. In particular, a
-# block_func that just raises an exception regardless of whether trigger_func
-# is called will lead to timing-dependent sporadic failures, and one of
-# those went rarely seen but undiagnosed for years. Now block_func
-# must be unexceptional. If block_func is supposed to raise an exception,
-# call _doExceptionalBlockingTest() instead.
-def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
- t = _TriggerThread(trigger_func, trigger_args)
- t.start()
- result = block_func(*block_args)
- # If block_func returned before our thread made the call, we failed!
- if not t.startedEvent.isSet():
- raise TestFailed("blocking function '%r' appeared not to block" %
- block_func)
- t.join(10) # make sure the thread terminates
- if t.isAlive():
- raise TestFailed("trigger function '%r' appeared to not return" %
- trigger_func)
- return result
-
-# Call this instead if block_func is supposed to raise an exception.
-def _doExceptionalBlockingTest(block_func, block_args, trigger_func,
- trigger_args, expected_exception_class):
- t = _TriggerThread(trigger_func, trigger_args)
- t.start()
- try:
+# triggers the release. Returns the result of the blocking function. Caution:
+# block_func must guarantee to block until trigger_func is called, and
+# trigger_func must guarantee to change queue state so that block_func can make
+# enough progress to return. In particular, a block_func that just raises an
+# exception regardless of whether trigger_func is called will lead to
+# timing-dependent sporadic failures, and one of those went rarely seen but
+# undiagnosed for years. Now block_func must be unexceptional. If block_func
+# is supposed to raise an exception, call do_exceptional_blocking_test()
+# instead.
+
+class BlockingTestMixin:
+
+ def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
+ self.t = _TriggerThread(trigger_func, trigger_args)
+ self.t.start()
+ self.result = block_func(*block_args)
+ # If block_func returned before our thread made the call, we failed!
+ if not self.t.startedEvent.isSet():
+ self.fail("blocking function '%r' appeared not to block" %
+ block_func)
+ self.t.join(10) # make sure the thread terminates
+ if self.t.isAlive():
+ self.fail("trigger function '%r' appeared to not return" %
+ trigger_func)
+ return self.result
+
+ # Call this instead if block_func is supposed to raise an exception.
+ def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
+ trigger_args, expected_exception_class):
+ self.t = _TriggerThread(trigger_func, trigger_args)
+ self.t.start()
+ try:
+ try:
+ block_func(*block_args)
+ except expected_exception_class:
+ raise
+ else:
+ self.fail("expected exception of kind %r" %
+ expected_exception_class)
+ finally:
+ self.t.join(10) # make sure the thread terminates
+ if self.t.isAlive():
+ self.fail("trigger function '%r' appeared to not return" %
+ trigger_func)
+ if not self.t.startedEvent.isSet():
+ self.fail("trigger thread ended but event never set")
+
+
+class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
+ def setUp(self):
+ self.cum = 0
+ self.cumlock = threading.Lock()
+
+ def simple_queue_test(self, q):
+ if not q.empty():
+ raise RuntimeError, "Call this function with an empty queue"
+ # I guess we better check things actually queue correctly a little :)
+ q.put(111)
+ q.put(333)
+ q.put(222)
+ target_order = dict(Queue = [111, 333, 222],
+ LifoQueue = [222, 333, 111],
+ PriorityQueue = [111, 222, 333])
+ actual_order = [q.get(), q.get(), q.get()]
+ self.assertEquals(actual_order, target_order[q.__class__.__name__],
+ "Didn't seem to queue the correct data!")
+ for i in range(QUEUE_SIZE-1):
+ q.put(i)
+ self.assert_(not q.empty(), "Queue should not be empty")
+ self.assert_(not q.full(), "Queue should not be full")
+ q.put("last")
+ self.assert_(q.full(), "Queue should be full")
+ try:
+ q.put("full", block=0)
+ self.fail("Didn't appear to block with a full queue")
+ except Queue.Full:
+ pass
+ try:
+ q.put("full", timeout=0.01)
+ self.fail("Didn't appear to time-out with a full queue")
+ except Queue.Full:
+ pass
+ # Test a blocking put
+ self.do_blocking_test(q.put, ("full",), q.get, ())
+ self.do_blocking_test(q.put, ("full", True, 10), q.get, ())
+ # Empty it
+ for i in range(QUEUE_SIZE):
+ q.get()
+ self.assert_(q.empty(), "Queue should be empty")
try:
- block_func(*block_args)
- except expected_exception_class:
- raise
+ q.get(block=0)
+ self.fail("Didn't appear to block with an empty queue")
+ except Queue.Empty:
+ pass
+ try:
+ q.get(timeout=0.01)
+ self.fail("Didn't appear to time-out with an empty queue")
+ except Queue.Empty:
+ pass
+ # Test a blocking get
+ self.do_blocking_test(q.get, (), q.put, ('empty',))
+ self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
+
+
+ def worker(self, q):
+ while True:
+ self.x = q.get()
+ if self.x is None:
+ q.task_done()
+ return
+ self.cumlock.acquire()
+ try:
+ self.cum += self.x
+ finally:
+ self.cumlock.release()
+ q.task_done()
+
+ def queue_join_test(self, q):
+ self.cum = 0
+ for i in (0,1):
+ threading.Thread(target=self.worker, args=(q,)).start()
+ for i in xrange(100):
+ q.put(i)
+ q.join()
+ self.assertEquals(self.cum, sum(range(100)),
+ "q.join() did not block until all tasks were done")
+ for i in (0,1):
+ q.put(None) # instruct the threads to close
+ q.join() # verify that you can join twice
+
+ def test_queue_task_done(self):
+ # Test to make sure a queue task completed successfully.
+ q = self.type2test()
+ try:
+ q.task_done()
+ except ValueError:
+ pass
+ else:
+ self.fail("Did not detect task count going negative")
+
+ def test_queue_join(self):
+ # Test that a queue join()s successfully, and before anything else
+ # (done twice for insurance).
+ q = self.type2test()
+ self.queue_join_test(q)
+ self.queue_join_test(q)
+ try:
+ q.task_done()
+ except ValueError:
+ pass
else:
- raise TestFailed("expected exception of kind %r" %
- expected_exception_class)
- finally:
- t.join(10) # make sure the thread terminates
- if t.isAlive():
- raise TestFailed("trigger function '%r' appeared to not return" %
- trigger_func)
- if not t.startedEvent.isSet():
- raise TestFailed("trigger thread ended but event never set")
+ self.fail("Did not detect task count going negative")
+
+ def test_simple_queue(self):
+ # Do it a couple of times on the same queue.
+ # Done twice to make sure works with same instance reused.
+ q = self.type2test(QUEUE_SIZE)
+ self.simple_queue_test(q)
+ self.simple_queue_test(q)
+
+
+class QueueTest(BaseQueueTest):
+ type2test = Queue.Queue
+
+class LifoQueueTest(BaseQueueTest):
+ type2test = Queue.LifoQueue
+
+class PriorityQueueTest(BaseQueueTest):
+ type2test = Queue.PriorityQueue
+
+
# A Queue subclass that can provoke failure at a moment's notice :)
class FailingQueueException(Exception):
@@ -95,194 +224,101 @@ class FailingQueue(Queue.Queue):
raise FailingQueueException, "You Lose"
return Queue.Queue._get(self)
-def FailingQueueTest(q):
- if not q.empty():
- raise RuntimeError, "Call this function with an empty queue"
- for i in range(QUEUE_SIZE-1):
- q.put(i)
- # Test a failing non-blocking put.
- q.fail_next_put = True
- try:
- q.put("oops", block=0)
- raise TestFailed("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- q.fail_next_put = True
- try:
- q.put("oops", timeout=0.1)
- raise TestFailed("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- q.put("last")
- verify(q.full(), "Queue should be full")
- # Test a failing blocking put
- q.fail_next_put = True
- try:
- _doBlockingTest(q.put, ("full",), q.get, ())
- raise TestFailed("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- # Check the Queue isn't damaged.
- # put failed, but get succeeded - re-add
- q.put("last")
- # Test a failing timeout put
- q.fail_next_put = True
- try:
- _doExceptionalBlockingTest(q.put, ("full", True, 10), q.get, (),
- FailingQueueException)
- raise TestFailed("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- # Check the Queue isn't damaged.
- # put failed, but get succeeded - re-add
- q.put("last")
- verify(q.full(), "Queue should be full")
- q.get()
- verify(not q.full(), "Queue should not be full")
- q.put("last")
- verify(q.full(), "Queue should be full")
- # Test a blocking put
- _doBlockingTest( q.put, ("full",), q.get, ())
- # Empty it
- for i in range(QUEUE_SIZE):
- q.get()
- verify(q.empty(), "Queue should be empty")
- q.put("first")
- q.fail_next_get = True
- try:
+class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
+
+ def failing_queue_test(self, q):
+ if not q.empty():
+ raise RuntimeError, "Call this function with an empty queue"
+ for i in range(QUEUE_SIZE-1):
+ q.put(i)
+ # Test a failing non-blocking put.
+ q.fail_next_put = True
+ try:
+ q.put("oops", block=0)
+ self.fail("The queue didn't fail when it should have")
+ except FailingQueueException:
+ pass
+ q.fail_next_put = True
+ try:
+ q.put("oops", timeout=0.1)
+ self.fail("The queue didn't fail when it should have")
+ except FailingQueueException:
+ pass
+ q.put("last")
+ self.assert_(q.full(), "Queue should be full")
+ # Test a failing blocking put
+ q.fail_next_put = True
+ try:
+ self.do_blocking_test(q.put, ("full",), q.get, ())
+ self.fail("The queue didn't fail when it should have")
+ except FailingQueueException:
+ pass
+ # Check the Queue isn't damaged.
+ # put failed, but get succeeded - re-add
+ q.put("last")
+ # Test a failing timeout put
+ q.fail_next_put = True
+ try:
+ self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
+ FailingQueueException)
+ self.fail("The queue didn't fail when it should have")
+ except FailingQueueException:
+ pass
+ # Check the Queue isn't damaged.
+ # put failed, but get succeeded - re-add
+ q.put("last")
+ self.assert_(q.full(), "Queue should be full")
q.get()
- raise TestFailed("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- verify(not q.empty(), "Queue should not be empty")
- q.fail_next_get = True
- try:
- q.get(timeout=0.1)
- raise TestFailed("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- verify(not q.empty(), "Queue should not be empty")
- q.get()
- verify(q.empty(), "Queue should be empty")
- q.fail_next_get = True
- try:
- _doExceptionalBlockingTest(q.get, (), q.put, ('empty',),
- FailingQueueException)
- raise TestFailed("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- # put succeeded, but get failed.
- verify(not q.empty(), "Queue should not be empty")
- q.get()
- verify(q.empty(), "Queue should be empty")
-
-def SimpleQueueTest(q):
- if not q.empty():
- raise RuntimeError, "Call this function with an empty queue"
- # I guess we better check things actually queue correctly a little :)
- q.put(111)
- q.put(333)
- q.put(222)
- target_order = dict(Queue = [111, 333, 222],
- LifoQueue = [222, 333, 111],
- PriorityQueue = [111, 222, 333])
- actual_order = [q.get(), q.get(), q.get()]
- verify(actual_order == target_order[q.__class__.__name__],
- "Didn't seem to queue the correct data!")
- for i in range(QUEUE_SIZE-1):
- q.put(i)
- verify(not q.empty(), "Queue should not be empty")
- verify(not q.full(), "Queue should not be full")
- q.put("last")
- verify(q.full(), "Queue should be full")
- try:
- q.put("full", block=0)
- raise TestFailed("Didn't appear to block with a full queue")
- except Queue.Full:
- pass
- try:
- q.put("full", timeout=0.01)
- raise TestFailed("Didn't appear to time-out with a full queue")
- except Queue.Full:
- pass
- # Test a blocking put
- _doBlockingTest(q.put, ("full",), q.get, ())
- _doBlockingTest(q.put, ("full", True, 10), q.get, ())
- # Empty it
- for i in range(QUEUE_SIZE):
+ self.assert_(not q.full(), "Queue should not be full")
+ q.put("last")
+ self.assert_(q.full(), "Queue should be full")
+ # Test a blocking put
+ self.do_blocking_test(q.put, ("full",), q.get, ())
+ # Empty it
+ for i in range(QUEUE_SIZE):
+ q.get()
+ self.assert_(q.empty(), "Queue should be empty")
+ q.put("first")
+ q.fail_next_get = True
+ try:
+ q.get()
+ self.fail("The queue didn't fail when it should have")
+ except FailingQueueException:
+ pass
+ self.assert_(not q.empty(), "Queue should not be empty")
+ q.fail_next_get = True
+ try:
+ q.get(timeout=0.1)
+ self.fail("The queue didn't fail when it should have")
+ except FailingQueueException:
+ pass
+ self.assert_(not q.empty(), "Queue should not be empty")
q.get()
- verify(q.empty(), "Queue should be empty")
- try:
- q.get(block=0)
- raise TestFailed("Didn't appear to block with an empty queue")
- except Queue.Empty:
- pass
- try:
- q.get(timeout=0.01)
- raise TestFailed("Didn't appear to time-out with an empty queue")
- except Queue.Empty:
- pass
- # Test a blocking get
- _doBlockingTest(q.get, (), q.put, ('empty',))
- _doBlockingTest(q.get, (True, 10), q.put, ('empty',))
-
-cum = 0
-cumlock = threading.Lock()
-
-def worker(q):
- global cum
- while True:
- x = q.get()
- if x is None:
- q.task_done()
- return
- cumlock.acquire()
+ self.assert_(q.empty(), "Queue should be empty")
+ q.fail_next_get = True
try:
- cum += x
- finally:
- cumlock.release()
- q.task_done()
-
-def QueueJoinTest(q):
- global cum
- cum = 0
- for i in (0,1):
- threading.Thread(target=worker, args=(q,)).start()
- for i in xrange(100):
- q.put(i)
- q.join()
- verify(cum==sum(range(100)), "q.join() did not block until all tasks were done")
- for i in (0,1):
- q.put(None) # instruct the threads to close
- q.join() # verify that you can join twice
-
-def QueueTaskDoneTest(q):
- try:
- q.task_done()
- except ValueError:
- pass
- else:
- raise TestFailed("Did not detect task count going negative")
-
-def test():
- for Q in Queue.Queue, Queue.LifoQueue, Queue.PriorityQueue:
- q = Q()
- QueueTaskDoneTest(q)
- QueueJoinTest(q)
- QueueJoinTest(q)
- QueueTaskDoneTest(q)
-
- q = Q(QUEUE_SIZE)
- # Do it a couple of times on the same queue
- SimpleQueueTest(q)
- SimpleQueueTest(q)
- if verbose:
- print "Simple Queue tests seemed to work for", Q.__name__
-
- q = FailingQueue(QUEUE_SIZE)
- FailingQueueTest(q)
- FailingQueueTest(q)
- if verbose:
- print "Failing Queue tests seemed to work"
-
-test()
+ self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
+ FailingQueueException)
+ self.fail("The queue didn't fail when it should have")
+ except FailingQueueException:
+ pass
+ # put succeeded, but get failed.
+ self.assert_(not q.empty(), "Queue should not be empty")
+ q.get()
+ self.assert_(q.empty(), "Queue should be empty")
+
+ def test_failing_queue(self):
+ # Test to make sure a queue is functioning correctly.
+ # Done twice to the same instance.
+ q = FailingQueue(QUEUE_SIZE)
+ self.failing_queue_test(q)
+ self.failing_queue_test(q)
+
+
+def test_main():
+ test_support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest,
+ FailingQueueTest)
+
+
+if __name__ == "__main__":
+ test_main()