diff options
| author | Guido van Rossum <guido@dropbox.com> | 2013-10-17 20:40:50 (GMT) |
|---|---|---|
| committer | Guido van Rossum <guido@dropbox.com> | 2013-10-17 20:40:50 (GMT) |
| commit | 27b7c7ebf1039e96cac41b6330cf16b5632d9e49 (patch) | |
| tree | 814505b0f9d02a5cabdec733dcde70250b04ee28 /Lib/test/test_asyncio/test_windows_events.py | |
| parent | 5b37f97ea5ac9f6b33b0e0269c69539cbb478142 (diff) | |
| download | cpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.zip cpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.tar.gz cpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.tar.bz2 | |
Initial checkin of asyncio package (== Tulip, == PEP 3156).
Diffstat (limited to 'Lib/test/test_asyncio/test_windows_events.py')
| -rw-r--r-- | Lib/test/test_asyncio/test_windows_events.py | 95 |
1 files changed, 95 insertions, 0 deletions
diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py new file mode 100644 index 0000000..4b04073 --- /dev/null +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -0,0 +1,95 @@ +import os +import sys +import unittest + +if sys.platform != 'win32': + raise unittest.SkipTest('Windows only') + +import asyncio + +from asyncio import windows_events +from asyncio import protocols +from asyncio import streams +from asyncio import transports +from asyncio import test_utils + + +class UpperProto(protocols.Protocol): + def __init__(self): + self.buf = [] + + def connection_made(self, trans): + self.trans = trans + + def data_received(self, data): + self.buf.append(data) + if b'\n' in data: + self.trans.write(b''.join(self.buf).upper()) + self.trans.close() + + +class ProactorTests(unittest.TestCase): + + def setUp(self): + self.loop = windows_events.ProactorEventLoop() + asyncio.set_event_loop(None) + + def tearDown(self): + self.loop.close() + self.loop = None + + def test_close(self): + a, b = self.loop._socketpair() + trans = self.loop._make_socket_transport(a, protocols.Protocol()) + f = asyncio.async(self.loop.sock_recv(b, 100)) + trans.close() + self.loop.run_until_complete(f) + self.assertEqual(f.result(), b'') + + def test_double_bind(self): + ADDRESS = r'\\.\pipe\test_double_bind-%s' % os.getpid() + server1 = windows_events.PipeServer(ADDRESS) + with self.assertRaises(PermissionError): + server2 = windows_events.PipeServer(ADDRESS) + server1.close() + + def test_pipe(self): + res = self.loop.run_until_complete(self._test_pipe()) + self.assertEqual(res, 'done') + + def _test_pipe(self): + ADDRESS = r'\\.\pipe\_test_pipe-%s' % os.getpid() + + with self.assertRaises(FileNotFoundError): + yield from self.loop.create_pipe_connection( + protocols.Protocol, ADDRESS) + + [server] = yield from self.loop.start_serving_pipe( + UpperProto, ADDRESS) + self.assertIsInstance(server, windows_events.PipeServer) + + clients = [] + for i in range(5): + stream_reader = streams.StreamReader(loop=self.loop) + protocol = streams.StreamReaderProtocol(stream_reader) + trans, proto = yield from self.loop.create_pipe_connection( + lambda:protocol, ADDRESS) + self.assertIsInstance(trans, transports.Transport) + self.assertEqual(protocol, proto) + clients.append((stream_reader, trans)) + + for i, (r, w) in enumerate(clients): + w.write('lower-{}\n'.format(i).encode()) + + for i, (r, w) in enumerate(clients): + response = yield from r.readline() + self.assertEqual(response, 'LOWER-{}\n'.format(i).encode()) + w.close() + + server.close() + + with self.assertRaises(FileNotFoundError): + yield from self.loop.create_pipe_connection( + protocols.Protocol, ADDRESS) + + return 'done' |
