summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio/test_windows_events.py
blob: 6b4f65c3376f5d2b7f2ed8544c8f63c466d5e61f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import os
import signal
import socket
import sys
import time
import threading
import unittest
from unittest import mock

if sys.platform != 'win32':
    raise unittest.SkipTest('Windows only')

import _overlapped
import _winapi

import asyncio
from asyncio import windows_events
from test.test_asyncio import utils as test_utils


def tearDownModule():
    asyncio.set_event_loop_policy(None)


class UpperProto(asyncio.Protocol):
    def __init__(self):
        self.buf = []

    def connection_made(self, trans):
        self.trans = trans

    def data_received(self, data):
        self.buf.append(data)
        if b'\n' in data:
            self.trans.write(b''.join(self.buf).upper())
            self.trans.close()


class ProactorLoopCtrlC(test_utils.TestCase):

    def test_ctrl_c(self):

        def SIGINT_after_delay():
            time.sleep(0.1)
            signal.raise_signal(signal.SIGINT)

        thread = threading.Thread(target=SIGINT_after_delay)
        loop = asyncio.new_event_loop()
        try:
            # only start the loop once the event loop is running
            loop.call_soon(thread.start)
            loop.run_forever()
            self.fail("should not fall through 'run_forever'")
        except KeyboardInterrupt:
            pass
        finally:
            self.close_loop(loop)
        thread.join()


class ProactorMultithreading(test_utils.TestCase):
    def test_run_from_nonmain_thread(self):
        finished = False

        async def coro():
            await asyncio.sleep(0)

        def func():
            nonlocal finished
            loop = asyncio.new_event_loop()
            loop.run_until_complete(coro())
            # close() must not call signal.set_wakeup_fd()
            loop.close()
            finished = True

        thread = threading.Thread(target=func)
        thread.start()
        thread.join()
        self.assertTrue(finished)


class ProactorTests(test_utils.TestCase):

    def setUp(self):
        super().setUp()
        self.loop = asyncio.ProactorEventLoop()
        self.set_event_loop(self.loop)

    def test_close(self):
        a, b = socket.socketpair()
        trans = self.loop._make_socket_transport(a, asyncio.Protocol())
        f = asyncio.ensure_future(self.loop.sock_recv(b, 100), loop=self.loop)
        trans.close()
        self.loop.run_until_complete(f)
        self.assertEqual(f.result(), b'')
        b.close()

    def test_double_bind(self):
        ADDRESS = r'\\.\pipe\test_double_bind-%s' % os.getpid()
        server1 = windows_events.PipeServer(ADDRESS)
        with self.assertRaises(PermissionError):
            windows_events.PipeServer(ADDRESS)
        server1.close()

    def test_pipe(self):
        res = self.loop.run_until_complete(self._test_pipe())
        self.assertEqual(res, 'done')

    async def _test_pipe(self):
        ADDRESS = r'\\.\pipe\_test_pipe-%s' % os.getpid()

        with self.assertRaises(FileNotFoundError):
            await self.loop.create_pipe_connection(
                asyncio.Protocol, ADDRESS)

        [server] = await self.loop.start_serving_pipe(
            UpperProto, ADDRESS)
        self.assertIsInstance(server, windows_events.PipeServer)

        clients = []
        for i in range(5):
            stream_reader = asyncio.StreamReader(loop=self.loop)
            protocol = asyncio.StreamReaderProtocol(stream_reader,
                                                    loop=self.loop)
            trans, proto = await self.loop.create_pipe_connection(
                lambda: protocol, ADDRESS)
            self.assertIsInstance(trans, asyncio.Transport)
            self.assertEqual(protocol, proto)
            clients.append((stream_reader, trans))

        for i, (r, w) in enumerate(clients):
            w.write('lower-{}\n'.format(i).encode())

        for i, (r, w) in enumerate(clients):
            response = await r.readline()
            self.assertEqual(response, 'LOWER-{}\n'.format(i).encode())
            w.close()

        server.close()

        with self.assertRaises(FileNotFoundError):
            await self.loop.create_pipe_connection(
                asyncio.Protocol, ADDRESS)

        return 'done'

    def test_connect_pipe_cancel(self):
        exc = OSError()
        exc.winerror = _overlapped.ERROR_PIPE_BUSY
        with mock.patch.object(_overlapped, 'ConnectPipe',
                               side_effect=exc) as connect:
            coro = self.loop._proactor.connect_pipe('pipe_address')
            task = self.loop.create_task(coro)

            # check that it's possible to cancel connect_pipe()
            task.cancel()
            with self.assertRaises(asyncio.CancelledError):
                self.loop.run_until_complete(task)

    def test_wait_for_handle(self):
        event = _overlapped.CreateEvent(None, True, False, None)
        self.addCleanup(_winapi.CloseHandle, event)

        # Wait for unset event with 0.5s timeout;
        # result should be False at timeout
        fut = self.loop._proactor.wait_for_handle(event, 0.5)
        start = self.loop.time()
        done = self.loop.run_until_complete(fut)
        elapsed = self.loop.time() - start

        self.assertEqual(done, False)
        self.assertFalse(fut.result())
        # bpo-31008: Tolerate only 450 ms (at least 500 ms expected),
        # because of bad clock resolution on Windows
        self.assertTrue(0.45 <= elapsed <= 0.9, elapsed)

        _overlapped.SetEvent(event)

        # Wait for set event;
        # result should be True immediately
        fut = self.loop._proactor.wait_for_handle(event, 10)
        start = self.loop.time()
        done = self.loop.run_until_complete(fut)
        elapsed = self.loop.time() - start

        self.assertEqual(done, True)
        self.assertTrue(fut.result())
        self.assertTrue(0 <= elapsed < 0.3, elapsed)

        # asyncio issue #195: cancelling a done _WaitHandleFuture
        # must not crash
        fut.cancel()

    def test_wait_for_handle_cancel(self):
        event = _overlapped.CreateEvent(None, True, False, None)
        self.addCleanup(_winapi.CloseHandle, event)

        # Wait for unset event with a cancelled future;
        # CancelledError should be raised immediately
        fut = self.loop._proactor.wait_for_handle(event, 10)
        fut.cancel()
        start = self.loop.time()
        with self.assertRaises(asyncio.CancelledError):
            self.loop.run_until_complete(fut)
        elapsed = self.loop.time() - start
        self.assertTrue(0 <= elapsed < 0.1, elapsed)

        # asyncio issue #195: cancelling a _WaitHandleFuture twice
        # must not crash
        fut = self.loop._proactor.wait_for_handle(event)
        fut.cancel()
        fut.cancel()

    def test_read_self_pipe_restart(self):
        # Regression test for https://bugs.python.org/issue39010
        # Previously, restarting a proactor event loop in certain states
        # would lead to spurious ConnectionResetErrors being logged.
        self.loop.call_exception_handler = mock.Mock()
        # Start an operation in another thread so that the self-pipe is used.
        # This is theoretically timing-dependent (the task in the executor
        # must complete before our start/stop cycles), but in practice it
        # seems to work every time.
        f = self.loop.run_in_executor(None, lambda: None)
        self.loop.stop()
        self.loop.run_forever()
        self.loop.stop()
        self.loop.run_forever()

        # Shut everything down cleanly. This is an important part of the
        # test - in issue 39010, the error occurred during loop.close(),
        # so we want to close the loop during the test instead of leaving
        # it for tearDown.
        #
        # First wait for f to complete to avoid a "future's result was never
        # retrieved" error.
        self.loop.run_until_complete(f)
        # Now shut down the loop itself (self.close_loop also shuts down the
        # loop's default executor).
        self.close_loop(self.loop)
        self.assertFalse(self.loop.call_exception_handler.called)


class WinPolicyTests(test_utils.TestCase):

    def test_selector_win_policy(self):
        async def main():
            self.assertIsInstance(
                asyncio.get_running_loop(),
                asyncio.SelectorEventLoop)

        old_policy = asyncio.get_event_loop_policy()
        try:
            asyncio.set_event_loop_policy(
                asyncio.WindowsSelectorEventLoopPolicy())
            asyncio.run(main())
        finally:
            asyncio.set_event_loop_policy(old_policy)

    def test_proactor_win_policy(self):
        async def main():
            self.assertIsInstance(
                asyncio.get_running_loop(),
                asyncio.ProactorEventLoop)

        old_policy = asyncio.get_event_loop_policy()
        try:
            asyncio.set_event_loop_policy(
                asyncio.WindowsProactorEventLoopPolicy())
            asyncio.run(main())
        finally:
            asyncio.set_event_loop_policy(old_policy)


if __name__ == '__main__':
    unittest.main()