diff options
author | Andrew Svetlov <andrew.svetlov@gmail.com> | 2022-03-16 14:59:12 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-16 14:59:12 (GMT) |
commit | e707ceb6e464b24ddc9fdbdac7bfe15f7eeb43fd (patch) | |
tree | 967fd4011b46d798bdd17432b6b18426fbd153b3 /Lib/test/test_asyncio | |
parent | 7c776521418676c074a483266339d31c950f516e (diff) | |
download | cpython-e707ceb6e464b24ddc9fdbdac7bfe15f7eeb43fd.zip cpython-e707ceb6e464b24ddc9fdbdac7bfe15f7eeb43fd.tar.gz cpython-e707ceb6e464b24ddc9fdbdac7bfe15f7eeb43fd.tar.bz2 |
Rewrite asyncio.Queue tests with IsolatedAsyncioTestCace (#31935)
Diffstat (limited to 'Lib/test/test_asyncio')
-rw-r--r-- | Lib/test/test_asyncio/test_queues.py | 503 |
1 files changed, 179 insertions, 324 deletions
diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index b1a53b8..55588e8 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -1,118 +1,94 @@ """Tests for queues.py""" -import unittest import asyncio +import unittest from types import GenericAlias -from test.test_asyncio import utils as test_utils def tearDownModule(): asyncio.set_event_loop_policy(None) -class _QueueTestBase(test_utils.TestCase): - - def setUp(self): - super().setUp() - self.loop = self.new_test_loop() +class QueueBasicTests(unittest.IsolatedAsyncioTestCase): - -class QueueBasicTests(_QueueTestBase): - - def _test_repr_or_str(self, fn, expect_id): + async def _test_repr_or_str(self, fn, expect_id): """Test Queue's repr or str. fn is repr or str. expect_id is True if we expect the Queue's id to appear in fn(Queue()). """ - def gen(): - when = yield - self.assertAlmostEqual(0.1, when) - when = yield 0.1 - self.assertAlmostEqual(0.2, when) - yield 0.1 - - loop = self.new_test_loop(gen) - q = asyncio.Queue() self.assertTrue(fn(q).startswith('<Queue'), fn(q)) id_is_present = hex(id(q)) in fn(q) self.assertEqual(expect_id, id_is_present) - async def add_getter(): - q = asyncio.Queue() + # getters + q = asyncio.Queue() + async with asyncio.TaskGroup() as tg: # Start a task that waits to get. - loop.create_task(q.get()) + getter = tg.create_task(q.get()) # Let it start waiting. await asyncio.sleep(0.1) self.assertTrue('_getters[1]' in fn(q)) # resume q.get coroutine to finish generator q.put_nowait(0) - loop.run_until_complete(add_getter()) + self.assertEqual(0, await getter) - async def add_putter(): - q = asyncio.Queue(maxsize=1) + # putters + q = asyncio.Queue(maxsize=1) + async with asyncio.TaskGroup() as tg: q.put_nowait(1) # Start a task that waits to put. - loop.create_task(q.put(2)) + putter = tg.create_task(q.put(2)) # Let it start waiting. await asyncio.sleep(0.1) self.assertTrue('_putters[1]' in fn(q)) # resume q.put coroutine to finish generator q.get_nowait() - loop.run_until_complete(add_putter()) + self.assertTrue(putter.done()) + q = asyncio.Queue() q.put_nowait(1) self.assertTrue('_queue=[1]' in fn(q)) - def test_repr(self): - self._test_repr_or_str(repr, True) + async def test_repr(self): + await self._test_repr_or_str(repr, True) - def test_str(self): - self._test_repr_or_str(str, False) + async def test_str(self): + await self._test_repr_or_str(str, False) def test_generic_alias(self): q = asyncio.Queue[int] self.assertEqual(q.__args__, (int,)) self.assertIsInstance(q, GenericAlias) - def test_empty(self): + async def test_empty(self): q = asyncio.Queue() self.assertTrue(q.empty()) - q.put_nowait(1) + await q.put(1) self.assertFalse(q.empty()) - self.assertEqual(1, q.get_nowait()) + self.assertEqual(1, await q.get()) self.assertTrue(q.empty()) - def test_full(self): + async def test_full(self): q = asyncio.Queue() self.assertFalse(q.full()) q = asyncio.Queue(maxsize=1) - q.put_nowait(1) + await q.put(1) self.assertTrue(q.full()) - def test_order(self): + async def test_order(self): q = asyncio.Queue() for i in [1, 3, 2]: - q.put_nowait(i) + await q.put(i) - items = [q.get_nowait() for _ in range(3)] + items = [await q.get() for _ in range(3)] self.assertEqual([1, 3, 2], items) - def test_maxsize(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.01, when) - when = yield 0.01 - self.assertAlmostEqual(0.02, when) - yield 0.01 - - loop = self.new_test_loop(gen) - + async def test_maxsize(self): q = asyncio.Queue(maxsize=2) self.assertEqual(2, q.maxsize) have_been_put = [] @@ -123,60 +99,46 @@ class QueueBasicTests(_QueueTestBase): have_been_put.append(i) return True - async def test(): - t = loop.create_task(putter()) - await asyncio.sleep(0.01) + t = asyncio.create_task(putter()) + await asyncio.sleep(0.01) - # The putter is blocked after putting two items. - self.assertEqual([0, 1], have_been_put) - self.assertEqual(0, q.get_nowait()) + # The putter is blocked after putting two items. + self.assertEqual([0, 1], have_been_put) + self.assertEqual(0, await q.get()) - # Let the putter resume and put last item. - await asyncio.sleep(0.01) - self.assertEqual([0, 1, 2], have_been_put) - self.assertEqual(1, q.get_nowait()) - self.assertEqual(2, q.get_nowait()) + # Let the putter resume and put last item. + await asyncio.sleep(0.01) + self.assertEqual([0, 1, 2], have_been_put) + self.assertEqual(1, await q.get()) + self.assertEqual(2, await q.get()) - self.assertTrue(t.done()) - self.assertTrue(t.result()) - - loop.run_until_complete(test()) - self.assertAlmostEqual(0.02, loop.time()) + self.assertTrue(t.done()) + self.assertTrue(t.result()) -class QueueGetTests(_QueueTestBase): +class QueueGetTests(unittest.IsolatedAsyncioTestCase): - def test_blocking_get(self): + async def test_blocking_get(self): q = asyncio.Queue() q.put_nowait(1) - async def queue_get(): - return await q.get() + self.assertEqual(1, await q.get()) - res = self.loop.run_until_complete(queue_get()) - self.assertEqual(1, res) + async def test_get_with_putters(self): + loop = asyncio.get_running_loop() - def test_get_with_putters(self): q = asyncio.Queue(1) - q.put_nowait(1) + await q.put(1) - waiter = self.loop.create_future() + waiter = loop.create_future() q._putters.append(waiter) - res = self.loop.run_until_complete(q.get()) - self.assertEqual(1, res) + self.assertEqual(1, await q.get()) self.assertTrue(waiter.done()) self.assertIsNone(waiter.result()) - def test_blocking_get_wait(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.01, when) - yield 0.01 - - loop = self.new_test_loop(gen) - + async def test_blocking_get_wait(self): + loop = asyncio.get_running_loop() q = asyncio.Queue() started = asyncio.Event() finished = False @@ -188,18 +150,13 @@ class QueueGetTests(_QueueTestBase): finished = True return res - async def queue_put(): - loop.call_later(0.01, q.put_nowait, 1) - queue_get_task = loop.create_task(queue_get()) - await started.wait() - self.assertFalse(finished) - res = await queue_get_task - self.assertTrue(finished) - return res - - res = loop.run_until_complete(queue_put()) + loop.call_later(0.01, q.put_nowait, 1) + queue_get_task = asyncio.create_task(queue_get()) + await started.wait() + self.assertFalse(finished) + res = await queue_get_task + self.assertTrue(finished) self.assertEqual(1, res) - self.assertAlmostEqual(0.01, loop.time()) def test_nonblocking_get(self): q = asyncio.Queue() @@ -210,57 +167,39 @@ class QueueGetTests(_QueueTestBase): q = asyncio.Queue() self.assertRaises(asyncio.QueueEmpty, q.get_nowait) - def test_get_cancelled(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.01, when) - when = yield 0.01 - self.assertAlmostEqual(0.061, when) - yield 0.05 - - loop = self.new_test_loop(gen) - + async def test_get_cancelled(self): q = asyncio.Queue() async def queue_get(): return await asyncio.wait_for(q.get(), 0.051) - async def test(): - get_task = loop.create_task(queue_get()) - await asyncio.sleep(0.01) # let the task start - q.put_nowait(1) - return await get_task - - self.assertEqual(1, loop.run_until_complete(test())) - self.assertAlmostEqual(0.06, loop.time()) + get_task = asyncio.create_task(queue_get()) + await asyncio.sleep(0.01) # let the task start + q.put_nowait(1) + self.assertEqual(1, await get_task) - def test_get_cancelled_race(self): + async def test_get_cancelled_race(self): q = asyncio.Queue() - t1 = self.loop.create_task(q.get()) - t2 = self.loop.create_task(q.get()) + t1 = asyncio.create_task(q.get()) + t2 = asyncio.create_task(q.get()) - test_utils.run_briefly(self.loop) + await asyncio.sleep(0) t1.cancel() - test_utils.run_briefly(self.loop) + await asyncio.sleep(0) self.assertTrue(t1.done()) - q.put_nowait('a') - test_utils.run_briefly(self.loop) - self.assertEqual(t2.result(), 'a') + await q.put('a') + await asyncio.sleep(0) + self.assertEqual('a', await t2) - def test_get_with_waiting_putters(self): + async def test_get_with_waiting_putters(self): q = asyncio.Queue(maxsize=1) - self.loop.create_task(q.put('a')) - self.loop.create_task(q.put('b')) - test_utils.run_briefly(self.loop) - self.assertEqual(self.loop.run_until_complete(q.get()), 'a') - self.assertEqual(self.loop.run_until_complete(q.get()), 'b') - - def test_why_are_getters_waiting(self): - # From issue #268. - asyncio.set_event_loop(self.loop) + asyncio.create_task(q.put('a')) + asyncio.create_task(q.put('b')) + self.assertEqual(await q.get(), 'a') + self.assertEqual(await q.get(), 'b') + async def test_why_are_getters_waiting(self): async def consumer(queue, num_expected): for _ in range(num_expected): await queue.get() @@ -269,59 +208,32 @@ class QueueGetTests(_QueueTestBase): for i in range(num_items): await queue.put(i) - queue_size = 1 producer_num_items = 5 - async def create_queue(): - queue = asyncio.Queue(queue_size) - queue._get_loop() - return queue - - async def test(): - q = await create_queue() - await asyncio.gather(producer(q, producer_num_items), - consumer(q, producer_num_items)) - - self.loop.run_until_complete(test()) - - def test_cancelled_getters_not_being_held_in_self_getters(self): - def a_generator(): - yield 0.1 - yield 0.2 + q = asyncio.Queue(1) + async with asyncio.TaskGroup() as tg: + tg.create_task(producer(q, producer_num_items)) + tg.create_task(consumer(q, producer_num_items)) - self.loop = self.new_test_loop(a_generator) + async def test_cancelled_getters_not_being_held_in_self_getters(self): + queue = asyncio.Queue(maxsize=5) - async def consumer(queue): - try: - item = await asyncio.wait_for(queue.get(), 0.1) - except asyncio.TimeoutError: - pass + with self.assertRaises(TimeoutError): + await asyncio.wait_for(queue.get(), 0.1) - queue = asyncio.Queue(maxsize=5) - self.loop.run_until_complete(self.loop.create_task(consumer(queue))) self.assertEqual(len(queue._getters), 0) -class QueuePutTests(_QueueTestBase): +class QueuePutTests(unittest.IsolatedAsyncioTestCase): - def test_blocking_put(self): + async def test_blocking_put(self): q = asyncio.Queue() - async def queue_put(): - # No maxsize, won't block. - await q.put(1) - - self.loop.run_until_complete(queue_put()) - - def test_blocking_put_wait(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.01, when) - yield 0.01 - - loop = self.new_test_loop(gen) + # No maxsize, won't block. + await q.put(1) + self.assertEqual(1, await q.get()) + async def test_blocking_put_wait(self): q = asyncio.Queue(maxsize=1) started = asyncio.Event() finished = False @@ -333,106 +245,82 @@ class QueuePutTests(_QueueTestBase): await q.put(2) finished = True - async def queue_get(): - loop.call_later(0.01, q.get_nowait) - queue_put_task = loop.create_task(queue_put()) - await started.wait() - self.assertFalse(finished) - await queue_put_task - self.assertTrue(finished) - - loop.run_until_complete(queue_get()) - self.assertAlmostEqual(0.01, loop.time()) + loop = asyncio.get_running_loop() + loop.call_later(0.01, q.get_nowait) + queue_put_task = asyncio.create_task(queue_put()) + await started.wait() + self.assertFalse(finished) + await queue_put_task + self.assertTrue(finished) def test_nonblocking_put(self): q = asyncio.Queue() q.put_nowait(1) self.assertEqual(1, q.get_nowait()) - def test_get_cancel_drop_one_pending_reader(self): - def gen(): - yield 0.01 - yield 0.1 - - loop = self.new_test_loop(gen) - + async def test_get_cancel_drop_one_pending_reader(self): q = asyncio.Queue() - reader = loop.create_task(q.get()) + reader = asyncio.create_task(q.get()) - loop.run_until_complete(asyncio.sleep(0.01)) + await asyncio.sleep(0.01) q.put_nowait(1) q.put_nowait(2) reader.cancel() try: - loop.run_until_complete(reader) + await reader except asyncio.CancelledError: # try again - reader = loop.create_task(q.get()) - loop.run_until_complete(reader) + reader = asyncio.create_task(q.get()) + await reader result = reader.result() # if we get 2, it means 1 got dropped! self.assertEqual(1, result) - def test_get_cancel_drop_many_pending_readers(self): - def gen(): - yield 0.01 - yield 0.1 - - loop = self.new_test_loop(gen) - loop.set_debug(True) - + async def test_get_cancel_drop_many_pending_readers(self): q = asyncio.Queue() - reader1 = loop.create_task(q.get()) - reader2 = loop.create_task(q.get()) - reader3 = loop.create_task(q.get()) + async with asyncio.TaskGroup() as tg: + reader1 = tg.create_task(q.get()) + reader2 = tg.create_task(q.get()) + reader3 = tg.create_task(q.get()) - loop.run_until_complete(asyncio.sleep(0.01)) + await asyncio.sleep(0.01) - q.put_nowait(1) - q.put_nowait(2) - reader1.cancel() + q.put_nowait(1) + q.put_nowait(2) + reader1.cancel() - try: - loop.run_until_complete(reader1) - except asyncio.CancelledError: - pass + with self.assertRaises(asyncio.CancelledError): + await reader1 - loop.run_until_complete(reader3) + await reader3 # It is undefined in which order concurrent readers receive results. self.assertEqual({reader2.result(), reader3.result()}, {1, 2}) - def test_put_cancel_drop(self): - - def gen(): - yield 0.01 - yield 0.1 - - loop = self.new_test_loop(gen) - + async def test_put_cancel_drop(self): q = asyncio.Queue(1) q.put_nowait(1) # putting a second item in the queue has to block (qsize=1) - writer = loop.create_task(q.put(2)) - loop.run_until_complete(asyncio.sleep(0.01)) + writer = asyncio.create_task(q.put(2)) + await asyncio.sleep(0.01) value1 = q.get_nowait() self.assertEqual(value1, 1) writer.cancel() try: - loop.run_until_complete(writer) + await writer except asyncio.CancelledError: # try again - writer = loop.create_task(q.put(2)) - loop.run_until_complete(writer) + writer = asyncio.create_task(q.put(2)) + await writer value2 = q.get_nowait() self.assertEqual(value2, 2) @@ -443,7 +331,7 @@ class QueuePutTests(_QueueTestBase): q.put_nowait(1) self.assertRaises(asyncio.QueueFull, q.put_nowait, 2) - def test_float_maxsize(self): + async def test_float_maxsize(self): q = asyncio.Queue(maxsize=1.3, ) q.put_nowait(1) q.put_nowait(2) @@ -452,64 +340,52 @@ class QueuePutTests(_QueueTestBase): q = asyncio.Queue(maxsize=1.3, ) - async def queue_put(): - await q.put(1) - await q.put(2) - self.assertTrue(q.full()) - self.loop.run_until_complete(queue_put()) + await q.put(1) + await q.put(2) + self.assertTrue(q.full()) - def test_put_cancelled(self): + async def test_put_cancelled(self): q = asyncio.Queue() async def queue_put(): await q.put(1) return True - async def test(): - return await q.get() + t = asyncio.create_task(queue_put()) - t = self.loop.create_task(queue_put()) - self.assertEqual(1, self.loop.run_until_complete(test())) + self.assertEqual(1, await q.get()) self.assertTrue(t.done()) self.assertTrue(t.result()) - def test_put_cancelled_race(self): + async def test_put_cancelled_race(self): q = asyncio.Queue(maxsize=1) - put_a = self.loop.create_task(q.put('a')) - put_b = self.loop.create_task(q.put('b')) - put_c = self.loop.create_task(q.put('X')) + put_a = asyncio.create_task(q.put('a')) + put_b = asyncio.create_task(q.put('b')) + put_c = asyncio.create_task(q.put('X')) - test_utils.run_briefly(self.loop) + await asyncio.sleep(0) self.assertTrue(put_a.done()) self.assertFalse(put_b.done()) put_c.cancel() - test_utils.run_briefly(self.loop) + await asyncio.sleep(0) self.assertTrue(put_c.done()) self.assertEqual(q.get_nowait(), 'a') - test_utils.run_briefly(self.loop) + await asyncio.sleep(0) self.assertEqual(q.get_nowait(), 'b') - self.loop.run_until_complete(put_b) + await put_b - def test_put_with_waiting_getters(self): + async def test_put_with_waiting_getters(self): q = asyncio.Queue() - t = self.loop.create_task(q.get()) - test_utils.run_briefly(self.loop) - self.loop.run_until_complete(q.put('a')) - self.assertEqual(self.loop.run_until_complete(t), 'a') - - def test_why_are_putters_waiting(self): - # From issue #265. - asyncio.set_event_loop(self.loop) - - async def create_queue(): - q = asyncio.Queue(2) - q._get_loop() - return q + t = asyncio.create_task(q.get()) + await asyncio.sleep(0) + await q.put('a') + self.assertEqual(await t, 'a') - queue = self.loop.run_until_complete(create_queue()) + async def test_why_are_putters_waiting(self): + queue = asyncio.Queue(2) async def putter(item): await queue.put(item) @@ -518,54 +394,40 @@ class QueuePutTests(_QueueTestBase): await asyncio.sleep(0) num = queue.qsize() for _ in range(num): - item = queue.get_nowait() - - async def test(): - t0 = putter(0) - t1 = putter(1) - t2 = putter(2) - t3 = putter(3) - await asyncio.gather(getter(), t0, t1, t2, t3) - - self.loop.run_until_complete(test()) + queue.get_nowait() - def test_cancelled_puts_not_being_held_in_self_putters(self): - def a_generator(): - yield 0.01 - yield 0.1 - - loop = self.new_test_loop(a_generator) + async with asyncio.TaskGroup() as tg: + tg.create_task(getter()) + tg.create_task(putter(0)) + tg.create_task(putter(1)) + tg.create_task(putter(2)) + tg.create_task(putter(3)) + async def test_cancelled_puts_not_being_held_in_self_putters(self): # Full queue. queue = asyncio.Queue(maxsize=1) queue.put_nowait(1) # Task waiting for space to put an item in the queue. - put_task = loop.create_task(queue.put(1)) - loop.run_until_complete(asyncio.sleep(0.01)) + put_task = asyncio.create_task(queue.put(1)) + await asyncio.sleep(0.01) # Check that the putter is correctly removed from queue._putters when # the task is canceled. self.assertEqual(len(queue._putters), 1) put_task.cancel() with self.assertRaises(asyncio.CancelledError): - loop.run_until_complete(put_task) + await put_task self.assertEqual(len(queue._putters), 0) - def test_cancelled_put_silence_value_error_exception(self): - def gen(): - yield 0.01 - yield 0.1 - - loop = self.new_test_loop(gen) - + async def test_cancelled_put_silence_value_error_exception(self): # Full Queue. queue = asyncio.Queue(1) queue.put_nowait(1) # Task waiting for space to put a item in the queue. - put_task = loop.create_task(queue.put(1)) - loop.run_until_complete(asyncio.sleep(0.01)) + put_task = asyncio.create_task(queue.put(1)) + await asyncio.sleep(0.01) # get_nowait() remove the future of put_task from queue._putters. queue.get_nowait() @@ -577,28 +439,28 @@ class QueuePutTests(_QueueTestBase): # inside queue.put should be silenced. # If the ValueError is silenced we should catch a CancelledError. with self.assertRaises(asyncio.CancelledError): - loop.run_until_complete(put_task) + await put_task -class LifoQueueTests(_QueueTestBase): +class LifoQueueTests(unittest.IsolatedAsyncioTestCase): - def test_order(self): + async def test_order(self): q = asyncio.LifoQueue() for i in [1, 3, 2]: - q.put_nowait(i) + await q.put(i) - items = [q.get_nowait() for _ in range(3)] + items = [await q.get() for _ in range(3)] self.assertEqual([2, 3, 1], items) -class PriorityQueueTests(_QueueTestBase): +class PriorityQueueTests(unittest.IsolatedAsyncioTestCase): - def test_order(self): + async def test_order(self): q = asyncio.PriorityQueue() for i in [1, 3, 2]: - q.put_nowait(i) + await q.put(i) - items = [q.get_nowait() for _ in range(3)] + items = [await q.get() for _ in range(3)] self.assertEqual([1, 2, 3], items) @@ -610,7 +472,7 @@ class _QueueJoinTestMixin: q = self.q_class() self.assertRaises(ValueError, q.task_done) - def test_task_done(self): + async def test_task_done(self): q = self.q_class() for i in range(100): q.put_nowait(i) @@ -629,35 +491,28 @@ class _QueueJoinTestMixin: accumulator += item q.task_done() - async def test(): - tasks = [self.loop.create_task(worker()) + async with asyncio.TaskGroup() as tg: + tasks = [tg.create_task(worker()) for index in range(2)] await q.join() - return tasks + self.assertEqual(sum(range(100)), accumulator) - tasks = self.loop.run_until_complete(test()) - self.assertEqual(sum(range(100)), accumulator) + # close running generators + running = False + for i in range(len(tasks)): + q.put_nowait(0) - # close running generators - running = False - for i in range(len(tasks)): - q.put_nowait(0) - self.loop.run_until_complete(asyncio.wait(tasks)) - - def test_join_empty_queue(self): + async def test_join_empty_queue(self): q = self.q_class() # Test that a queue join()s successfully, and before anything else # (done twice for insurance). - async def join(): - await q.join() - await q.join() - - self.loop.run_until_complete(join()) + await q.join() + await q.join() - def test_format(self): + async def test_format(self): q = self.q_class() self.assertEqual(q._format(), 'maxsize=0') @@ -665,15 +520,15 @@ class _QueueJoinTestMixin: self.assertEqual(q._format(), 'maxsize=0 tasks=2') -class QueueJoinTests(_QueueJoinTestMixin, _QueueTestBase): +class QueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCase): q_class = asyncio.Queue -class LifoQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase): +class LifoQueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCase): q_class = asyncio.LifoQueue -class PriorityQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase): +class PriorityQueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCase): q_class = asyncio.PriorityQueue |