diff options
Diffstat (limited to 'Lib/test/test_queue.py')
-rw-r--r-- | Lib/test/test_queue.py | 46 |
1 files changed, 27 insertions, 19 deletions
diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 4d89ed2..e15ee39 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -184,28 +184,35 @@ def SimpleQueueTest(q): raise RuntimeError("Call this function with an empty queue") # I guess we better check things actually queue correctly a little :) q.put(111) + q.put(333) q.put(222) - verify(q.get() == 111 and q.get() == 222, + target_order = dict(Queue = [111, 333, 222], + LifoQueue = [222, 333, 111], + PriorityQueue = [111, 222, 333]) + actual_order = [q.get(), q.get(), q.get()] + verify(actual_order == target_order[q.__class__.__name__], "Didn't seem to queue the correct data!") for i in range(QUEUE_SIZE-1): q.put(i) verify(q.qsize(), "Queue should not be empty") verify(not qfull(q), "Queue should not be full") - q.put("last") + last = 2*QUEUE_SIZE + full = 3*2*QUEUE_SIZE + q.put(last) verify(qfull(q), "Queue should be full") try: - q.put("full", block=0) + q.put(full, block=0) raise TestFailed("Didn't appear to block with a full queue") except Queue.Full: pass try: - q.put("full", timeout=0.01) + q.put(full, timeout=0.01) raise TestFailed("Didn't appear to time-out with a full queue") except Queue.Full: pass # Test a blocking put - _doBlockingTest(q.put, ("full",), q.get, ()) - _doBlockingTest(q.put, ("full", True, 10), q.get, ()) + _doBlockingTest(q.put, (full,), q.get, ()) + _doBlockingTest(q.put, (full, True, 10), q.get, ()) # Empty it for i in range(QUEUE_SIZE): q.get() @@ -250,8 +257,7 @@ def QueueJoinTest(q): q.put(i) q.join() verify(cum==sum(range(100)), "q.join() did not block until all tasks were done") - for i in (0,1): - q.put(None) # instruct the threads to close + q.put(None) # instruct the threads to close q.join() # verify that you can join twice def QueueTaskDoneTest(q): @@ -263,18 +269,20 @@ def QueueTaskDoneTest(q): raise TestFailed("Did not detect task count going negative") def test(): - q = Queue.Queue() - QueueTaskDoneTest(q) - QueueJoinTest(q) - QueueJoinTest(q) - QueueTaskDoneTest(q) + for Q in Queue.Queue, Queue.LifoQueue, Queue.PriorityQueue: + q = Q() + QueueTaskDoneTest(q) + QueueJoinTest(q) + QueueJoinTest(q) + QueueTaskDoneTest(q) + + q = Q(QUEUE_SIZE) + # Do it a couple of times on the same queue + SimpleQueueTest(q) + SimpleQueueTest(q) + if verbose: + print("Simple Queue tests seemed to work for", Q.__name__) - q = Queue.Queue(QUEUE_SIZE) - # Do it a couple of times on the same queue - SimpleQueueTest(q) - SimpleQueueTest(q) - if verbose: - print("Simple Queue tests seemed to work") q = FailingQueue(QUEUE_SIZE) FailingQueueTest(q) FailingQueueTest(q) |