1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
|
import asyncio
import unittest
from unittest import mock
from . import utils as test_utils
class TestPolicy(asyncio.AbstractEventLoopPolicy):
def __init__(self, loop_factory):
self.loop_factory = loop_factory
self.loop = None
def get_event_loop(self):
# shouldn't ever be called by asyncio.run()
raise RuntimeError
def new_event_loop(self):
return self.loop_factory()
def set_event_loop(self, loop):
if loop is not None:
# we want to check if the loop is closed
# in BaseTest.tearDown
self.loop = loop
class BaseTest(unittest.TestCase):
def new_loop(self):
loop = asyncio.BaseEventLoop()
loop._process_events = mock.Mock()
loop._selector = mock.Mock()
loop._selector.select.return_value = ()
loop.shutdown_ag_run = False
async def shutdown_asyncgens():
loop.shutdown_ag_run = True
loop.shutdown_asyncgens = shutdown_asyncgens
return loop
def setUp(self):
super().setUp()
policy = TestPolicy(self.new_loop)
asyncio.set_event_loop_policy(policy)
def tearDown(self):
policy = asyncio.get_event_loop_policy()
if policy.loop is not None:
self.assertTrue(policy.loop.is_closed())
self.assertTrue(policy.loop.shutdown_ag_run)
asyncio.set_event_loop_policy(None)
super().tearDown()
class RunTests(BaseTest):
def test_asyncio_run_return(self):
async def main():
await asyncio.sleep(0)
return 42
self.assertEqual(asyncio.run(main()), 42)
def test_asyncio_run_raises(self):
async def main():
await asyncio.sleep(0)
raise ValueError('spam')
with self.assertRaisesRegex(ValueError, 'spam'):
asyncio.run(main())
def test_asyncio_run_only_coro(self):
for o in {1, lambda: None}:
with self.subTest(obj=o), \
self.assertRaisesRegex(ValueError,
'a coroutine was expected'):
asyncio.run(o)
def test_asyncio_run_debug(self):
async def main(expected):
loop = asyncio.get_event_loop()
self.assertIs(loop.get_debug(), expected)
asyncio.run(main(False))
asyncio.run(main(True), debug=True)
def test_asyncio_run_from_running_loop(self):
async def main():
coro = main()
try:
asyncio.run(coro)
finally:
coro.close() # Suppress ResourceWarning
with self.assertRaisesRegex(RuntimeError,
'cannot be called from a running'):
asyncio.run(main())
def test_asyncio_run_cancels_hanging_tasks(self):
lo_task = None
async def leftover():
await asyncio.sleep(0.1)
async def main():
nonlocal lo_task
lo_task = asyncio.create_task(leftover())
return 123
self.assertEqual(asyncio.run(main()), 123)
self.assertTrue(lo_task.done())
def test_asyncio_run_reports_hanging_tasks_errors(self):
lo_task = None
call_exc_handler_mock = mock.Mock()
async def leftover():
try:
await asyncio.sleep(0.1)
except asyncio.CancelledError:
1 / 0
async def main():
loop = asyncio.get_running_loop()
loop.call_exception_handler = call_exc_handler_mock
nonlocal lo_task
lo_task = asyncio.create_task(leftover())
return 123
self.assertEqual(asyncio.run(main()), 123)
self.assertTrue(lo_task.done())
call_exc_handler_mock.assert_called_with({
'message': test_utils.MockPattern(r'asyncio.run.*shutdown'),
'task': lo_task,
'exception': test_utils.MockInstanceOf(ZeroDivisionError)
})
def test_asyncio_run_closes_gens_after_hanging_tasks_errors(self):
spinner = None
lazyboy = None
class FancyExit(Exception):
pass
async def fidget():
while True:
yield 1
await asyncio.sleep(1)
async def spin():
nonlocal spinner
spinner = fidget()
try:
async for the_meaning_of_life in spinner: # NoQA
pass
except asyncio.CancelledError:
1 / 0
async def main():
loop = asyncio.get_running_loop()
loop.call_exception_handler = mock.Mock()
nonlocal lazyboy
lazyboy = asyncio.create_task(spin())
raise FancyExit
with self.assertRaises(FancyExit):
asyncio.run(main())
self.assertTrue(lazyboy.done())
self.assertIsNone(spinner.ag_frame)
self.assertFalse(spinner.ag_running)
|