summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_queue.py
diff options
context:
space:
mode:
authorGeorg Brandl <georg@python.org>2008-02-05 18:48:51 (GMT)
committerGeorg Brandl <georg@python.org>2008-02-05 18:48:51 (GMT)
commit0e3b0ec29c4a5f14c8872ff93e2476ca99567da9 (patch)
tree4a9bc6bdebf2cb3e944ad28a1c2eb9877ef9eb42 /Lib/test/test_queue.py
parent4b130e9fa8f6101c2b044dc7695659001afe4710 (diff)
downloadcpython-0e3b0ec29c4a5f14c8872ff93e2476ca99567da9.zip
cpython-0e3b0ec29c4a5f14c8872ff93e2476ca99567da9.tar.gz
cpython-0e3b0ec29c4a5f14c8872ff93e2476ca99567da9.tar.bz2
Merge r60528, r60534 and r60539 from trunk.
Diffstat (limited to 'Lib/test/test_queue.py')
-rw-r--r--Lib/test/test_queue.py497
1 files changed, 265 insertions, 232 deletions
diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py
index e15ee39..06e32ec 100644
--- a/Lib/test/test_queue.py
+++ b/Lib/test/test_queue.py
@@ -4,8 +4,8 @@ import Queue
import sys
import threading
import time
-
-from test.test_support import verify, TestFailed, verbose
+import unittest
+from test import test_support
QUEUE_SIZE = 5
@@ -33,50 +33,177 @@ class _TriggerThread(threading.Thread):
self.startedEvent.set()
self.fn(*self.args)
+
# Execute a function that blocks, and in a separate thread, a function that
-# triggers the release. Returns the result of the blocking function.
-# Caution: block_func must guarantee to block until trigger_func is
-# called, and trigger_func must guarantee to change queue state so that
-# block_func can make enough progress to return. In particular, a
-# block_func that just raises an exception regardless of whether trigger_func
-# is called will lead to timing-dependent sporadic failures, and one of
-# those went rarely seen but undiagnosed for years. Now block_func
-# must be unexceptional. If block_func is supposed to raise an exception,
-# call _doExceptionalBlockingTest() instead.
-def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
- t = _TriggerThread(trigger_func, trigger_args)
- t.start()
- result = block_func(*block_args)
- # If block_func returned before our thread made the call, we failed!
- if not t.startedEvent.isSet():
- raise TestFailed("blocking function '%r' appeared not to block" %
- block_func)
- t.join(10) # make sure the thread terminates
- if t.isAlive():
- raise TestFailed("trigger function '%r' appeared to not return" %
- trigger_func)
- return result
-
-# Call this instead if block_func is supposed to raise an exception.
-def _doExceptionalBlockingTest(block_func, block_args, trigger_func,
- trigger_args, expected_exception_class):
- t = _TriggerThread(trigger_func, trigger_args)
- t.start()
- try:
+# triggers the release. Returns the result of the blocking function. Caution:
+# block_func must guarantee to block until trigger_func is called, and
+# trigger_func must guarantee to change queue state so that block_func can make
+# enough progress to return. In particular, a block_func that just raises an
+# exception regardless of whether trigger_func is called will lead to
+# timing-dependent sporadic failures, and one of those went rarely seen but
+# undiagnosed for years. Now block_func must be unexceptional. If block_func
+# is supposed to raise an exception, call do_exceptional_blocking_test()
+# instead.
+
+class BlockingTestMixin:
+
+ def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
+ self.t = _TriggerThread(trigger_func, trigger_args)
+ self.t.start()
+ self.result = block_func(*block_args)
+ # If block_func returned before our thread made the call, we failed!
+ if not self.t.startedEvent.isSet():
+ self.fail("blocking function '%r' appeared not to block" %
+ block_func)
+ self.t.join(10) # make sure the thread terminates
+ if self.t.isAlive():
+ self.fail("trigger function '%r' appeared to not return" %
+ trigger_func)
+ return self.result
+
+ # Call this instead if block_func is supposed to raise an exception.
+ def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
+ trigger_args, expected_exception_class):
+ self.t = _TriggerThread(trigger_func, trigger_args)
+ self.t.start()
+ try:
+ try:
+ block_func(*block_args)
+ except expected_exception_class:
+ raise
+ else:
+ self.fail("expected exception of kind %r" %
+ expected_exception_class)
+ finally:
+ self.t.join(10) # make sure the thread terminates
+ if self.t.isAlive():
+ self.fail("trigger function '%r' appeared to not return" %
+ trigger_func)
+ if not self.t.startedEvent.isSet():
+ self.fail("trigger thread ended but event never set")
+
+
+class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
+ def setUp(self):
+ self.cum = 0
+ self.cumlock = threading.Lock()
+
+ def simple_queue_test(self, q):
+ if q.qsize():
+ raise RuntimeError("Call this function with an empty queue")
+ # I guess we better check things actually queue correctly a little :)
+ q.put(111)
+ q.put(333)
+ q.put(222)
+ target_order = dict(Queue = [111, 333, 222],
+ LifoQueue = [222, 333, 111],
+ PriorityQueue = [111, 222, 333])
+ actual_order = [q.get(), q.get(), q.get()]
+ self.assertEquals(actual_order, target_order[q.__class__.__name__],
+ "Didn't seem to queue the correct data!")
+ for i in range(QUEUE_SIZE-1):
+ q.put(i)
+ self.assert_(q.qsize(), "Queue should not be empty")
+ self.assert_(not qfull(q), "Queue should not be full")
+ last = 2 * QUEUE_SIZE
+ full = 3 * 2 * QUEUE_SIZE
+ q.put(last)
+ self.assert_(qfull(q), "Queue should be full")
+ try:
+ q.put(full, block=0)
+ self.fail("Didn't appear to block with a full queue")
+ except Queue.Full:
+ pass
+ try:
+ q.put(full, timeout=0.01)
+ self.fail("Didn't appear to time-out with a full queue")
+ except Queue.Full:
+ pass
+ # Test a blocking put
+ self.do_blocking_test(q.put, (full,), q.get, ())
+ self.do_blocking_test(q.put, (full, True, 10), q.get, ())
+ # Empty it
+ for i in range(QUEUE_SIZE):
+ q.get()
+ self.assert_(not q.qsize(), "Queue should be empty")
try:
- block_func(*block_args)
- except expected_exception_class:
- raise
+ q.get(block=0)
+ self.fail("Didn't appear to block with an empty queue")
+ except Queue.Empty:
+ pass
+ try:
+ q.get(timeout=0.01)
+ self.fail("Didn't appear to time-out with an empty queue")
+ except Queue.Empty:
+ pass
+ # Test a blocking get
+ self.do_blocking_test(q.get, (), q.put, ('empty',))
+ self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
+
+
+ def worker(self, q):
+ while True:
+ x = q.get()
+ if x is None:
+ q.task_done()
+ return
+ with self.cumlock:
+ self.cum += x
+ q.task_done()
+
+ def queue_join_test(self, q):
+ self.cum = 0
+ for i in (0,1):
+ threading.Thread(target=self.worker, args=(q,)).start()
+ for i in range(100):
+ q.put(i)
+ q.join()
+ self.assertEquals(self.cum, sum(range(100)),
+ "q.join() did not block until all tasks were done")
+ q.put(None) # instruct the threads to close
+ q.join() # verify that you can join twice
+
+ def test_queue_task_done(self):
+ # Test to make sure a queue task completed successfully.
+ q = self.type2test()
+ try:
+ q.task_done()
+ except ValueError:
+ pass
+ else:
+ self.fail("Did not detect task count going negative")
+
+ def test_queue_join(self):
+ # Test that a queue join()s successfully, and before anything else
+ # (done twice for insurance).
+ q = self.type2test()
+ self.queue_join_test(q)
+ self.queue_join_test(q)
+ try:
+ q.task_done()
+ except ValueError:
+ pass
else:
- raise TestFailed("expected exception of kind %r" %
- expected_exception_class)
- finally:
- t.join(10) # make sure the thread terminates
- if t.isAlive():
- raise TestFailed("trigger function '%r' appeared to not return" %
- trigger_func)
- if not t.startedEvent.isSet():
- raise TestFailed("trigger thread ended but event never set")
+ self.fail("Did not detect task count going negative")
+
+ def test_simple_queue(self):
+ # Do it a couple of times on the same queue.
+ # Done twice to make sure works with same instance reused.
+ q = self.type2test(QUEUE_SIZE)
+ self.simple_queue_test(q)
+ self.simple_queue_test(q)
+
+
+class QueueTest(BaseQueueTest):
+ type2test = Queue.Queue
+
+class LifoQueueTest(BaseQueueTest):
+ type2test = Queue.LifoQueue
+
+class PriorityQueueTest(BaseQueueTest):
+ type2test = Queue.PriorityQueue
+
+
# A Queue subclass that can provoke failure at a moment's notice :)
class FailingQueueException(Exception):
@@ -98,195 +225,101 @@ class FailingQueue(Queue.Queue):
raise FailingQueueException("You Lose")
return Queue.Queue._get(self)
-def FailingQueueTest(q):
- if q.qsize():
- raise RuntimeError("Call this function with an empty queue")
- for i in range(QUEUE_SIZE-1):
- q.put(i)
- # Test a failing non-blocking put.
- q.fail_next_put = True
- try:
- q.put("oops", block=0)
- raise TestFailed("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- q.fail_next_put = True
- try:
- q.put("oops", timeout=0.1)
- raise TestFailed("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- q.put("last")
- verify(qfull(q), "Queue should be full")
- # Test a failing blocking put
- q.fail_next_put = True
- try:
- _doBlockingTest(q.put, ("full",), q.get, ())
- raise TestFailed("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- # Check the Queue isn't damaged.
- # put failed, but get succeeded - re-add
- q.put("last")
- # Test a failing timeout put
- q.fail_next_put = True
- try:
- _doExceptionalBlockingTest(q.put, ("full", True, 10), q.get, (),
- FailingQueueException)
- raise TestFailed("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- # Check the Queue isn't damaged.
- # put failed, but get succeeded - re-add
- q.put("last")
- verify(qfull(q), "Queue should be full")
- q.get()
- verify(not qfull(q), "Queue should not be full")
- q.put("last")
- verify(qfull(q), "Queue should be full")
- # Test a blocking put
- _doBlockingTest( q.put, ("full",), q.get, ())
- # Empty it
- for i in range(QUEUE_SIZE):
- q.get()
- verify(not q.qsize(), "Queue should be empty")
- q.put("first")
- q.fail_next_get = True
- try:
+class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
+
+ def failing_queue_test(self, q):
+ if q.qsize():
+ raise RuntimeError("Call this function with an empty queue")
+ for i in range(QUEUE_SIZE-1):
+ q.put(i)
+ # Test a failing non-blocking put.
+ q.fail_next_put = True
+ try:
+ q.put("oops", block=0)
+ self.fail("The queue didn't fail when it should have")
+ except FailingQueueException:
+ pass
+ q.fail_next_put = True
+ try:
+ q.put("oops", timeout=0.1)
+ self.fail("The queue didn't fail when it should have")
+ except FailingQueueException:
+ pass
+ q.put("last")
+ self.assert_(qfull(q), "Queue should be full")
+ # Test a failing blocking put
+ q.fail_next_put = True
+ try:
+ self.do_blocking_test(q.put, ("full",), q.get, ())
+ self.fail("The queue didn't fail when it should have")
+ except FailingQueueException:
+ pass
+ # Check the Queue isn't damaged.
+ # put failed, but get succeeded - re-add
+ q.put("last")
+ # Test a failing timeout put
+ q.fail_next_put = True
+ try:
+ self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
+ FailingQueueException)
+ self.fail("The queue didn't fail when it should have")
+ except FailingQueueException:
+ pass
+ # Check the Queue isn't damaged.
+ # put failed, but get succeeded - re-add
+ q.put("last")
+ self.assert_(qfull(q), "Queue should be full")
q.get()
- raise TestFailed("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- verify(q.qsize(), "Queue should not be empty")
- q.fail_next_get = True
- try:
- q.get(timeout=0.1)
- raise TestFailed("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- verify(q.qsize(), "Queue should not be empty")
- q.get()
- verify(not q.qsize(), "Queue should be empty")
- q.fail_next_get = True
- try:
- _doExceptionalBlockingTest(q.get, (), q.put, ('empty',),
- FailingQueueException)
- raise TestFailed("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- # put succeeded, but get failed.
- verify(q.qsize(), "Queue should not be empty")
- q.get()
- verify(not q.qsize(), "Queue should be empty")
-
-def SimpleQueueTest(q):
- if q.qsize():
- raise RuntimeError("Call this function with an empty queue")
- # I guess we better check things actually queue correctly a little :)
- q.put(111)
- q.put(333)
- q.put(222)
- target_order = dict(Queue = [111, 333, 222],
- LifoQueue = [222, 333, 111],
- PriorityQueue = [111, 222, 333])
- actual_order = [q.get(), q.get(), q.get()]
- verify(actual_order == target_order[q.__class__.__name__],
- "Didn't seem to queue the correct data!")
- for i in range(QUEUE_SIZE-1):
- q.put(i)
- verify(q.qsize(), "Queue should not be empty")
- verify(not qfull(q), "Queue should not be full")
- last = 2*QUEUE_SIZE
- full = 3*2*QUEUE_SIZE
- q.put(last)
- verify(qfull(q), "Queue should be full")
- try:
- q.put(full, block=0)
- raise TestFailed("Didn't appear to block with a full queue")
- except Queue.Full:
- pass
- try:
- q.put(full, timeout=0.01)
- raise TestFailed("Didn't appear to time-out with a full queue")
- except Queue.Full:
- pass
- # Test a blocking put
- _doBlockingTest(q.put, (full,), q.get, ())
- _doBlockingTest(q.put, (full, True, 10), q.get, ())
- # Empty it
- for i in range(QUEUE_SIZE):
+ self.assert_(not qfull(q), "Queue should not be full")
+ q.put("last")
+ self.assert_(qfull(q), "Queue should be full")
+ # Test a blocking put
+ self.do_blocking_test(q.put, ("full",), q.get, ())
+ # Empty it
+ for i in range(QUEUE_SIZE):
+ q.get()
+ self.assert_(not q.qsize(), "Queue should be empty")
+ q.put("first")
+ q.fail_next_get = True
+ try:
+ q.get()
+ self.fail("The queue didn't fail when it should have")
+ except FailingQueueException:
+ pass
+ self.assert_(q.qsize(), "Queue should not be empty")
+ q.fail_next_get = True
+ try:
+ q.get(timeout=0.1)
+ self.fail("The queue didn't fail when it should have")
+ except FailingQueueException:
+ pass
+ self.assert_(q.qsize(), "Queue should not be empty")
q.get()
- verify(not q.qsize(), "Queue should be empty")
- try:
- q.get(block=0)
- raise TestFailed("Didn't appear to block with an empty queue")
- except Queue.Empty:
- pass
- try:
- q.get(timeout=0.01)
- raise TestFailed("Didn't appear to time-out with an empty queue")
- except Queue.Empty:
- pass
- # Test a blocking get
- _doBlockingTest(q.get, (), q.put, ('empty',))
- _doBlockingTest(q.get, (True, 10), q.put, ('empty',))
-
-cum = 0
-cumlock = threading.Lock()
-
-def worker(q):
- global cum
- while True:
- x = q.get()
- if x is None:
- q.task_done()
- return
- cumlock.acquire()
+ self.assert_(not q.qsize(), "Queue should be empty")
+ q.fail_next_get = True
try:
- cum += x
- finally:
- cumlock.release()
- q.task_done()
-
-def QueueJoinTest(q):
- global cum
- cum = 0
- for i in (0,1):
- threading.Thread(target=worker, args=(q,)).start()
- for i in range(100):
- q.put(i)
- q.join()
- verify(cum==sum(range(100)), "q.join() did not block until all tasks were done")
- q.put(None) # instruct the threads to close
- q.join() # verify that you can join twice
-
-def QueueTaskDoneTest(q):
- try:
- q.task_done()
- except ValueError:
- pass
- else:
- raise TestFailed("Did not detect task count going negative")
-
-def test():
- for Q in Queue.Queue, Queue.LifoQueue, Queue.PriorityQueue:
- q = Q()
- QueueTaskDoneTest(q)
- QueueJoinTest(q)
- QueueJoinTest(q)
- QueueTaskDoneTest(q)
-
- q = Q(QUEUE_SIZE)
- # Do it a couple of times on the same queue
- SimpleQueueTest(q)
- SimpleQueueTest(q)
- if verbose:
- print("Simple Queue tests seemed to work for", Q.__name__)
-
- q = FailingQueue(QUEUE_SIZE)
- FailingQueueTest(q)
- FailingQueueTest(q)
- if verbose:
- print("Failing Queue tests seemed to work")
-
-test()
+ self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
+ FailingQueueException)
+ self.fail("The queue didn't fail when it should have")
+ except FailingQueueException:
+ pass
+ # put succeeded, but get failed.
+ self.assert_(q.qsize(), "Queue should not be empty")
+ q.get()
+ self.assert_(not q.qsize(), "Queue should be empty")
+
+ def test_failing_queue(self):
+ # Test to make sure a queue is functioning correctly.
+ # Done twice to the same instance.
+ q = FailingQueue(QUEUE_SIZE)
+ self.failing_queue_test(q)
+ self.failing_queue_test(q)
+
+
+def test_main():
+ test_support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest,
+ FailingQueueTest)
+
+
+if __name__ == "__main__":
+ test_main()