summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorXiao Chen <chenxiao_7@163.com>2022-11-12 20:16:44 (GMT)
committerGitHub <noreply@github.com>2022-11-12 20:16:44 (GMT)
commit99972dc7450f1266e39202012827f4f3c995b0ca (patch)
tree74234bd71fd72007f72441f10b8adcd3de20c44e
parentdfc1b17a23fed933cffa09eec125a7e8c90ea867 (diff)
downloadcpython-99972dc7450f1266e39202012827f4f3c995b0ca.zip
cpython-99972dc7450f1266e39202012827f4f3c995b0ca.tar.gz
cpython-99972dc7450f1266e39202012827f4f3c995b0ca.tar.bz2
gh-99357: Close the event loop when it is no longer used in test_uncancel_structured_blocks (#99414)
-rw-r--r--Lib/test/test_asyncio/test_tasks.py45
1 files changed, 24 insertions, 21 deletions
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index 0e38e6a..d8ba2f4 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -635,27 +635,30 @@ class BaseTaskTests:
await asyncio.sleep(0)
return timed_out, structured_block_finished, outer_code_reached
- # Test which timed out.
- t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1))
- timed_out, structured_block_finished, outer_code_reached = (
- loop.run_until_complete(t1)
- )
- self.assertTrue(timed_out)
- self.assertFalse(structured_block_finished) # it was cancelled
- self.assertTrue(outer_code_reached) # task got uncancelled after leaving
- # the structured block and continued until
- # completion
- self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task
-
- # Test which did not time out.
- t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0))
- timed_out, structured_block_finished, outer_code_reached = (
- loop.run_until_complete(t2)
- )
- self.assertFalse(timed_out)
- self.assertTrue(structured_block_finished)
- self.assertTrue(outer_code_reached)
- self.assertEqual(t2.cancelling(), 0)
+ try:
+ # Test which timed out.
+ t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1))
+ timed_out, structured_block_finished, outer_code_reached = (
+ loop.run_until_complete(t1)
+ )
+ self.assertTrue(timed_out)
+ self.assertFalse(structured_block_finished) # it was cancelled
+ self.assertTrue(outer_code_reached) # task got uncancelled after leaving
+ # the structured block and continued until
+ # completion
+ self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task
+
+ # Test which did not time out.
+ t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0))
+ timed_out, structured_block_finished, outer_code_reached = (
+ loop.run_until_complete(t2)
+ )
+ self.assertFalse(timed_out)
+ self.assertTrue(structured_block_finished)
+ self.assertTrue(outer_code_reached)
+ self.assertEqual(t2.cancelling(), 0)
+ finally:
+ loop.close()
def test_cancel(self):