summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_itertools.py3
-rw-r--r--Lib/test/test_queue.py46
2 files changed, 29 insertions, 20 deletions
diff --git a/Lib/test/test_itertools.py b/Lib/test/test_itertools.py
index 17e0058..4c0af07 100644
--- a/Lib/test/test_itertools.py
+++ b/Lib/test/test_itertools.py
@@ -300,7 +300,8 @@ class TestBasicOps(unittest.TestCase):
self.assertEqual(take(3, starmap(operator.pow, izip(count(), count(1)))),
[0**1, 1**2, 2**3])
self.assertEqual(list(starmap(operator.pow, [])), [])
- self.assertRaises(TypeError, list, starmap(operator.pow, [[4,5]]))
+ self.assertEqual(list(starmap(operator.pow, [iter([4,5])])), [4**5])
+ self.assertRaises(TypeError, list, starmap(operator.pow, [None]))
self.assertRaises(TypeError, starmap)
self.assertRaises(TypeError, starmap, operator.pow, [(4,5)], 'extra')
self.assertRaises(TypeError, next, starmap(10, [(4,5)]))
diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py
index 4d89ed2..e15ee39 100644
--- a/Lib/test/test_queue.py
+++ b/Lib/test/test_queue.py
@@ -184,28 +184,35 @@ def SimpleQueueTest(q):
raise RuntimeError("Call this function with an empty queue")
# I guess we better check things actually queue correctly a little :)
q.put(111)
+ q.put(333)
q.put(222)
- verify(q.get() == 111 and q.get() == 222,
+ target_order = dict(Queue = [111, 333, 222],
+ LifoQueue = [222, 333, 111],
+ PriorityQueue = [111, 222, 333])
+ actual_order = [q.get(), q.get(), q.get()]
+ verify(actual_order == target_order[q.__class__.__name__],
"Didn't seem to queue the correct data!")
for i in range(QUEUE_SIZE-1):
q.put(i)
verify(q.qsize(), "Queue should not be empty")
verify(not qfull(q), "Queue should not be full")
- q.put("last")
+ last = 2*QUEUE_SIZE
+ full = 3*2*QUEUE_SIZE
+ q.put(last)
verify(qfull(q), "Queue should be full")
try:
- q.put("full", block=0)
+ q.put(full, block=0)
raise TestFailed("Didn't appear to block with a full queue")
except Queue.Full:
pass
try:
- q.put("full", timeout=0.01)
+ q.put(full, timeout=0.01)
raise TestFailed("Didn't appear to time-out with a full queue")
except Queue.Full:
pass
# Test a blocking put
- _doBlockingTest(q.put, ("full",), q.get, ())
- _doBlockingTest(q.put, ("full", True, 10), q.get, ())
+ _doBlockingTest(q.put, (full,), q.get, ())
+ _doBlockingTest(q.put, (full, True, 10), q.get, ())
# Empty it
for i in range(QUEUE_SIZE):
q.get()
@@ -250,8 +257,7 @@ def QueueJoinTest(q):
q.put(i)
q.join()
verify(cum==sum(range(100)), "q.join() did not block until all tasks were done")
- for i in (0,1):
- q.put(None) # instruct the threads to close
+ q.put(None) # instruct the threads to close
q.join() # verify that you can join twice
def QueueTaskDoneTest(q):
@@ -263,18 +269,20 @@ def QueueTaskDoneTest(q):
raise TestFailed("Did not detect task count going negative")
def test():
- q = Queue.Queue()
- QueueTaskDoneTest(q)
- QueueJoinTest(q)
- QueueJoinTest(q)
- QueueTaskDoneTest(q)
+ for Q in Queue.Queue, Queue.LifoQueue, Queue.PriorityQueue:
+ q = Q()
+ QueueTaskDoneTest(q)
+ QueueJoinTest(q)
+ QueueJoinTest(q)
+ QueueTaskDoneTest(q)
+
+ q = Q(QUEUE_SIZE)
+ # Do it a couple of times on the same queue
+ SimpleQueueTest(q)
+ SimpleQueueTest(q)
+ if verbose:
+ print("Simple Queue tests seemed to work for", Q.__name__)
- q = Queue.Queue(QUEUE_SIZE)
- # Do it a couple of times on the same queue
- SimpleQueueTest(q)
- SimpleQueueTest(q)
- if verbose:
- print("Simple Queue tests seemed to work")
q = FailingQueue(QUEUE_SIZE)
FailingQueueTest(q)
FailingQueueTest(q)