summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_unittest/test_async_case.py
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2022-06-21 08:27:59 (GMT)
committerGitHub <noreply@github.com>2022-06-21 08:27:59 (GMT)
commitc735d545343c3ab002c62596b2fb2cfa4488b0af (patch)
treeadbb76c603493bd734f4dcc9cc4871c74f22fb43 /Lib/test/test_unittest/test_async_case.py
parentd82e0bfe8b98a122ca443b356d81998c804b686e (diff)
downloadcpython-c735d545343c3ab002c62596b2fb2cfa4488b0af.zip
cpython-c735d545343c3ab002c62596b2fb2cfa4488b0af.tar.gz
cpython-c735d545343c3ab002c62596b2fb2cfa4488b0af.tar.bz2
gh-93839: Move Lib/unttest/test/ to Lib/test/test_unittest/ (#94043)
* Move Lib/unittest/test/ to Lib/test/test_unittest/ * Remove Lib/test/test_unittest.py * Replace unittest.test with test.test_unittest * Remove unittest.load_tests() * Rewrite unittest __init__.py and __main__.py * Update build system, CODEOWNERS, and wasm_assets.py
Diffstat (limited to 'Lib/test/test_unittest/test_async_case.py')
-rw-r--r--Lib/test/test_unittest/test_async_case.py439
1 files changed, 439 insertions, 0 deletions
diff --git a/Lib/test/test_unittest/test_async_case.py b/Lib/test/test_unittest/test_async_case.py
new file mode 100644
index 0000000..beadcac
--- /dev/null
+++ b/Lib/test/test_unittest/test_async_case.py
@@ -0,0 +1,439 @@
+import asyncio
+import contextvars
+import unittest
+from test import support
+
+support.requires_working_socket(module=True)
+
+
+class MyException(Exception):
+ pass
+
+
+def tearDownModule():
+ asyncio.set_event_loop_policy(None)
+
+
+class TestCM:
+ def __init__(self, ordering, enter_result=None):
+ self.ordering = ordering
+ self.enter_result = enter_result
+
+ async def __aenter__(self):
+ self.ordering.append('enter')
+ return self.enter_result
+
+ async def __aexit__(self, *exc_info):
+ self.ordering.append('exit')
+
+
+class LacksEnterAndExit:
+ pass
+class LacksEnter:
+ async def __aexit__(self, *exc_info):
+ pass
+class LacksExit:
+ async def __aenter__(self):
+ pass
+
+
+VAR = contextvars.ContextVar('VAR', default=())
+
+
+class TestAsyncCase(unittest.TestCase):
+ maxDiff = None
+
+ def tearDown(self):
+ # Ensure that IsolatedAsyncioTestCase instances are destroyed before
+ # starting a new event loop
+ support.gc_collect()
+
+ def test_full_cycle(self):
+ class Test(unittest.IsolatedAsyncioTestCase):
+ def setUp(self):
+ self.assertEqual(events, [])
+ events.append('setUp')
+ VAR.set(VAR.get() + ('setUp',))
+
+ async def asyncSetUp(self):
+ self.assertEqual(events, ['setUp'])
+ events.append('asyncSetUp')
+ VAR.set(VAR.get() + ('asyncSetUp',))
+ self.addAsyncCleanup(self.on_cleanup1)
+
+ async def test_func(self):
+ self.assertEqual(events, ['setUp',
+ 'asyncSetUp'])
+ events.append('test')
+ VAR.set(VAR.get() + ('test',))
+ self.addAsyncCleanup(self.on_cleanup2)
+
+ async def asyncTearDown(self):
+ self.assertEqual(events, ['setUp',
+ 'asyncSetUp',
+ 'test'])
+ VAR.set(VAR.get() + ('asyncTearDown',))
+ events.append('asyncTearDown')
+
+ def tearDown(self):
+ self.assertEqual(events, ['setUp',
+ 'asyncSetUp',
+ 'test',
+ 'asyncTearDown'])
+ events.append('tearDown')
+ VAR.set(VAR.get() + ('tearDown',))
+
+ async def on_cleanup1(self):
+ self.assertEqual(events, ['setUp',
+ 'asyncSetUp',
+ 'test',
+ 'asyncTearDown',
+ 'tearDown',
+ 'cleanup2'])
+ events.append('cleanup1')
+ VAR.set(VAR.get() + ('cleanup1',))
+ nonlocal cvar
+ cvar = VAR.get()
+
+ async def on_cleanup2(self):
+ self.assertEqual(events, ['setUp',
+ 'asyncSetUp',
+ 'test',
+ 'asyncTearDown',
+ 'tearDown'])
+ events.append('cleanup2')
+ VAR.set(VAR.get() + ('cleanup2',))
+
+ events = []
+ cvar = ()
+ test = Test("test_func")
+ result = test.run()
+ self.assertEqual(result.errors, [])
+ self.assertEqual(result.failures, [])
+ expected = ['setUp', 'asyncSetUp', 'test',
+ 'asyncTearDown', 'tearDown', 'cleanup2', 'cleanup1']
+ self.assertEqual(events, expected)
+ self.assertEqual(cvar, tuple(expected))
+
+ events = []
+ cvar = ()
+ test = Test("test_func")
+ test.debug()
+ self.assertEqual(events, expected)
+ self.assertEqual(cvar, tuple(expected))
+ test.doCleanups()
+ self.assertEqual(events, expected)
+ self.assertEqual(cvar, tuple(expected))
+
+ def test_exception_in_setup(self):
+ class Test(unittest.IsolatedAsyncioTestCase):
+ async def asyncSetUp(self):
+ events.append('asyncSetUp')
+ self.addAsyncCleanup(self.on_cleanup)
+ raise MyException()
+
+ async def test_func(self):
+ events.append('test')
+
+ async def asyncTearDown(self):
+ events.append('asyncTearDown')
+
+ async def on_cleanup(self):
+ events.append('cleanup')
+
+
+ events = []
+ test = Test("test_func")
+ result = test.run()
+ self.assertEqual(events, ['asyncSetUp', 'cleanup'])
+ self.assertIs(result.errors[0][0], test)
+ self.assertIn('MyException', result.errors[0][1])
+
+ events = []
+ test = Test("test_func")
+ try:
+ test.debug()
+ except MyException:
+ pass
+ else:
+ self.fail('Expected a MyException exception')
+ self.assertEqual(events, ['asyncSetUp'])
+ test.doCleanups()
+ self.assertEqual(events, ['asyncSetUp', 'cleanup'])
+
+ def test_exception_in_test(self):
+ class Test(unittest.IsolatedAsyncioTestCase):
+ async def asyncSetUp(self):
+ events.append('asyncSetUp')
+
+ async def test_func(self):
+ events.append('test')
+ self.addAsyncCleanup(self.on_cleanup)
+ raise MyException()
+
+ async def asyncTearDown(self):
+ events.append('asyncTearDown')
+
+ async def on_cleanup(self):
+ events.append('cleanup')
+
+ events = []
+ test = Test("test_func")
+ result = test.run()
+ self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
+ self.assertIs(result.errors[0][0], test)
+ self.assertIn('MyException', result.errors[0][1])
+
+ events = []
+ test = Test("test_func")
+ try:
+ test.debug()
+ except MyException:
+ pass
+ else:
+ self.fail('Expected a MyException exception')
+ self.assertEqual(events, ['asyncSetUp', 'test'])
+ test.doCleanups()
+ self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup'])
+
+ def test_exception_in_tear_down(self):
+ class Test(unittest.IsolatedAsyncioTestCase):
+ async def asyncSetUp(self):
+ events.append('asyncSetUp')
+
+ async def test_func(self):
+ events.append('test')
+ self.addAsyncCleanup(self.on_cleanup)
+
+ async def asyncTearDown(self):
+ events.append('asyncTearDown')
+ raise MyException()
+
+ async def on_cleanup(self):
+ events.append('cleanup')
+
+ events = []
+ test = Test("test_func")
+ result = test.run()
+ self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
+ self.assertIs(result.errors[0][0], test)
+ self.assertIn('MyException', result.errors[0][1])
+
+ events = []
+ test = Test("test_func")
+ try:
+ test.debug()
+ except MyException:
+ pass
+ else:
+ self.fail('Expected a MyException exception')
+ self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown'])
+ test.doCleanups()
+ self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
+
+ def test_exception_in_tear_clean_up(self):
+ class Test(unittest.IsolatedAsyncioTestCase):
+ async def asyncSetUp(self):
+ events.append('asyncSetUp')
+
+ async def test_func(self):
+ events.append('test')
+ self.addAsyncCleanup(self.on_cleanup1)
+ self.addAsyncCleanup(self.on_cleanup2)
+
+ async def asyncTearDown(self):
+ events.append('asyncTearDown')
+
+ async def on_cleanup1(self):
+ events.append('cleanup1')
+ raise MyException('some error')
+
+ async def on_cleanup2(self):
+ events.append('cleanup2')
+ raise MyException('other error')
+
+ events = []
+ test = Test("test_func")
+ result = test.run()
+ self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1'])
+ self.assertIs(result.errors[0][0], test)
+ self.assertIn('MyException: other error', result.errors[0][1])
+ self.assertIn('MyException: some error', result.errors[1][1])
+
+ events = []
+ test = Test("test_func")
+ try:
+ test.debug()
+ except MyException:
+ pass
+ else:
+ self.fail('Expected a MyException exception')
+ self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2'])
+ test.doCleanups()
+ self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1'])
+
+ def test_deprecation_of_return_val_from_test(self):
+ # Issue 41322 - deprecate return of value!=None from a test
+ class Test(unittest.IsolatedAsyncioTestCase):
+ async def test1(self):
+ return 1
+ async def test2(self):
+ yield 1
+
+ with self.assertWarns(DeprecationWarning) as w:
+ Test('test1').run()
+ self.assertIn('It is deprecated to return a value!=None', str(w.warning))
+ self.assertIn('test1', str(w.warning))
+ self.assertEqual(w.filename, __file__)
+
+ with self.assertWarns(DeprecationWarning) as w:
+ Test('test2').run()
+ self.assertIn('It is deprecated to return a value!=None', str(w.warning))
+ self.assertIn('test2', str(w.warning))
+ self.assertEqual(w.filename, __file__)
+
+ def test_cleanups_interleave_order(self):
+ events = []
+
+ class Test(unittest.IsolatedAsyncioTestCase):
+ async def test_func(self):
+ self.addAsyncCleanup(self.on_sync_cleanup, 1)
+ self.addAsyncCleanup(self.on_async_cleanup, 2)
+ self.addAsyncCleanup(self.on_sync_cleanup, 3)
+ self.addAsyncCleanup(self.on_async_cleanup, 4)
+
+ async def on_sync_cleanup(self, val):
+ events.append(f'sync_cleanup {val}')
+
+ async def on_async_cleanup(self, val):
+ events.append(f'async_cleanup {val}')
+
+ test = Test("test_func")
+ test.run()
+ self.assertEqual(events, ['async_cleanup 4',
+ 'sync_cleanup 3',
+ 'async_cleanup 2',
+ 'sync_cleanup 1'])
+
+ def test_base_exception_from_async_method(self):
+ events = []
+ class Test(unittest.IsolatedAsyncioTestCase):
+ async def test_base(self):
+ events.append("test_base")
+ raise BaseException()
+ events.append("not it")
+
+ async def test_no_err(self):
+ events.append("test_no_err")
+
+ async def test_cancel(self):
+ raise asyncio.CancelledError()
+
+ test = Test("test_base")
+ output = test.run()
+ self.assertFalse(output.wasSuccessful())
+
+ test = Test("test_no_err")
+ test.run()
+ self.assertEqual(events, ['test_base', 'test_no_err'])
+
+ test = Test("test_cancel")
+ output = test.run()
+ self.assertFalse(output.wasSuccessful())
+
+ def test_cancellation_hanging_tasks(self):
+ cancelled = False
+ class Test(unittest.IsolatedAsyncioTestCase):
+ async def test_leaking_task(self):
+ async def coro():
+ nonlocal cancelled
+ try:
+ await asyncio.sleep(1)
+ except asyncio.CancelledError:
+ cancelled = True
+ raise
+
+ # Leave this running in the background
+ asyncio.create_task(coro())
+
+ test = Test("test_leaking_task")
+ output = test.run()
+ self.assertTrue(cancelled)
+
+ def test_enterAsyncContext(self):
+ events = []
+
+ class Test(unittest.IsolatedAsyncioTestCase):
+ async def test_func(slf):
+ slf.addAsyncCleanup(events.append, 'cleanup1')
+ cm = TestCM(events, 42)
+ self.assertEqual(await slf.enterAsyncContext(cm), 42)
+ slf.addAsyncCleanup(events.append, 'cleanup2')
+ events.append('test')
+
+ test = Test('test_func')
+ output = test.run()
+ self.assertTrue(output.wasSuccessful(), output)
+ self.assertEqual(events, ['enter', 'test', 'cleanup2', 'exit', 'cleanup1'])
+
+ def test_enterAsyncContext_arg_errors(self):
+ class Test(unittest.IsolatedAsyncioTestCase):
+ async def test_func(slf):
+ with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
+ await slf.enterAsyncContext(LacksEnterAndExit())
+ with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
+ await slf.enterAsyncContext(LacksEnter())
+ with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
+ await slf.enterAsyncContext(LacksExit())
+
+ test = Test('test_func')
+ output = test.run()
+ self.assertTrue(output.wasSuccessful())
+
+ def test_debug_cleanup_same_loop(self):
+ class Test(unittest.IsolatedAsyncioTestCase):
+ async def asyncSetUp(self):
+ async def coro():
+ await asyncio.sleep(0)
+ fut = asyncio.ensure_future(coro())
+ self.addAsyncCleanup(self.cleanup, fut)
+ events.append('asyncSetUp')
+
+ async def test_func(self):
+ events.append('test')
+ raise MyException()
+
+ async def asyncTearDown(self):
+ events.append('asyncTearDown')
+
+ async def cleanup(self, fut):
+ try:
+ # Raises an exception if in different loop
+ await asyncio.wait([fut])
+ events.append('cleanup')
+ except:
+ import traceback
+ traceback.print_exc()
+ raise
+
+ events = []
+ test = Test("test_func")
+ result = test.run()
+ self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
+ self.assertIn('MyException', result.errors[0][1])
+
+ events = []
+ test = Test("test_func")
+ try:
+ test.debug()
+ except MyException:
+ pass
+ else:
+ self.fail('Expected a MyException exception')
+ self.assertEqual(events, ['asyncSetUp', 'test'])
+ test.doCleanups()
+ self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup'])
+
+
+if __name__ == "__main__":
+ unittest.main()