diff options
Diffstat (limited to 'Lib/test/test_asyncio')
-rw-r--r-- | Lib/test/test_asyncio/test_base_events.py | 140 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_events.py | 52 |
2 files changed, 190 insertions, 2 deletions
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index b1f1e56..1568440 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -3,6 +3,7 @@ import errno import logging import math +import os import socket import sys import threading @@ -790,11 +791,11 @@ class MyProto(asyncio.Protocol): class MyDatagramProto(asyncio.DatagramProtocol): done = None - def __init__(self, create_future=False): + def __init__(self, create_future=False, loop=None): self.state = 'INITIAL' self.nbytes = 0 if create_future: - self.done = asyncio.Future() + self.done = asyncio.Future(loop=loop) def connection_made(self, transport): self.transport = transport @@ -1100,6 +1101,19 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): self.assertRaises(OSError, self.loop.run_until_complete, f) @mock.patch('asyncio.base_events.socket') + def test_create_server_nosoreuseport(self, m_socket): + m_socket.getaddrinfo = socket.getaddrinfo + m_socket.SOCK_STREAM = socket.SOCK_STREAM + m_socket.SOL_SOCKET = socket.SOL_SOCKET + del m_socket.SO_REUSEPORT + m_socket.socket.return_value = mock.Mock() + + f = self.loop.create_server( + MyProto, '0.0.0.0', 0, reuse_port=True) + + self.assertRaises(ValueError, self.loop.run_until_complete, f) + + @mock.patch('asyncio.base_events.socket') def test_create_server_cant_bind(self, m_socket): class Err(OSError): @@ -1199,6 +1213,128 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): self.assertRaises(Err, self.loop.run_until_complete, fut) self.assertTrue(m_sock.close.called) + def test_create_datagram_endpoint_sock(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + fut = self.loop.create_datagram_endpoint( + lambda: MyDatagramProto(create_future=True, loop=self.loop), + sock=sock) + transport, protocol = self.loop.run_until_complete(fut) + transport.close() + self.loop.run_until_complete(protocol.done) + self.assertEqual('CLOSED', protocol.state) + + def test_create_datagram_endpoint_sock_sockopts(self): + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, local_addr=('127.0.0.1', 0), sock=object()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) + + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=object()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) + + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, family=1, sock=object()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) + + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, proto=1, sock=object()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) + + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, flags=1, sock=object()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) + + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, reuse_address=True, sock=object()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) + + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, reuse_port=True, sock=object()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) + + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, allow_broadcast=True, sock=object()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) + + def test_create_datagram_endpoint_sockopts(self): + # Socket options should not be applied unless asked for. + # SO_REUSEADDR defaults to on for UNIX. + # SO_REUSEPORT is not available on all platforms. + + coro = self.loop.create_datagram_endpoint( + lambda: MyDatagramProto(create_future=True, loop=self.loop), + local_addr=('127.0.0.1', 0)) + transport, protocol = self.loop.run_until_complete(coro) + sock = transport.get_extra_info('socket') + + reuse_address_default_on = ( + os.name == 'posix' and sys.platform != 'cygwin') + reuseport_supported = hasattr(socket, 'SO_REUSEPORT') + + if reuse_address_default_on: + self.assertTrue( + sock.getsockopt( + socket.SOL_SOCKET, socket.SO_REUSEADDR)) + else: + self.assertFalse( + sock.getsockopt( + socket.SOL_SOCKET, socket.SO_REUSEADDR)) + if reuseport_supported: + self.assertFalse( + sock.getsockopt( + socket.SOL_SOCKET, socket.SO_REUSEPORT)) + self.assertFalse( + sock.getsockopt( + socket.SOL_SOCKET, socket.SO_BROADCAST)) + + transport.close() + self.loop.run_until_complete(protocol.done) + self.assertEqual('CLOSED', protocol.state) + + coro = self.loop.create_datagram_endpoint( + lambda: MyDatagramProto(create_future=True, loop=self.loop), + local_addr=('127.0.0.1', 0), + reuse_address=True, + reuse_port=reuseport_supported, + allow_broadcast=True) + transport, protocol = self.loop.run_until_complete(coro) + sock = transport.get_extra_info('socket') + + self.assertTrue( + sock.getsockopt( + socket.SOL_SOCKET, socket.SO_REUSEADDR)) + if reuseport_supported: + self.assertTrue( + sock.getsockopt( + socket.SOL_SOCKET, socket.SO_REUSEPORT)) + else: + self.assertFalse( + sock.getsockopt( + socket.SOL_SOCKET, socket.SO_REUSEPORT)) + self.assertTrue( + sock.getsockopt( + socket.SOL_SOCKET, socket.SO_BROADCAST)) + + transport.close() + self.loop.run_until_complete(protocol.done) + self.assertEqual('CLOSED', protocol.state) + + @mock.patch('asyncio.base_events.socket') + def test_create_datagram_endpoint_nosoreuseport(self, m_socket): + m_socket.getaddrinfo = socket.getaddrinfo + m_socket.SOCK_DGRAM = socket.SOCK_DGRAM + m_socket.SOL_SOCKET = socket.SOL_SOCKET + del m_socket.SO_REUSEPORT + m_socket.socket.return_value = mock.Mock() + + coro = self.loop.create_datagram_endpoint( + lambda: MyDatagramProto(loop=self.loop), + local_addr=('127.0.0.1', 0), + reuse_address=False, + reuse_port=True) + + self.assertRaises(ValueError, self.loop.run_until_complete, coro) + def test_accept_connection_retry(self): sock = mock.Mock() sock.accept.side_effect = BlockingIOError() diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 9801d22..141fde7 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -814,6 +814,32 @@ class EventLoopTestsMixin: # close server server.close() + @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT') + def test_create_server_reuse_port(self): + proto = MyProto(self.loop) + f = self.loop.create_server( + lambda: proto, '0.0.0.0', 0) + server = self.loop.run_until_complete(f) + self.assertEqual(len(server.sockets), 1) + sock = server.sockets[0] + self.assertFalse( + sock.getsockopt( + socket.SOL_SOCKET, socket.SO_REUSEPORT)) + server.close() + + test_utils.run_briefly(self.loop) + + proto = MyProto(self.loop) + f = self.loop.create_server( + lambda: proto, '0.0.0.0', 0, reuse_port=True) + server = self.loop.run_until_complete(f) + self.assertEqual(len(server.sockets), 1) + sock = server.sockets[0] + self.assertTrue( + sock.getsockopt( + socket.SOL_SOCKET, socket.SO_REUSEPORT)) + server.close() + def _make_unix_server(self, factory, **kwargs): path = test_utils.gen_unix_socket_path() self.addCleanup(lambda: os.path.exists(path) and os.unlink(path)) @@ -1264,6 +1290,32 @@ class EventLoopTestsMixin: self.assertEqual('CLOSED', client.state) server.transport.close() + def test_create_datagram_endpoint_sock(self): + sock = None + local_address = ('127.0.0.1', 0) + infos = self.loop.run_until_complete( + self.loop.getaddrinfo( + *local_address, type=socket.SOCK_DGRAM)) + for family, type, proto, cname, address in infos: + try: + sock = socket.socket(family=family, type=type, proto=proto) + sock.setblocking(False) + sock.bind(address) + except: + pass + else: + break + else: + assert False, 'Can not create socket.' + + f = self.loop.create_connection( + lambda: MyDatagramProto(loop=self.loop), sock=sock) + tr, pr = self.loop.run_until_complete(f) + self.assertIsInstance(tr, asyncio.Transport) + self.assertIsInstance(pr, MyDatagramProto) + tr.close() + self.loop.run_until_complete(pr.done) + def test_internal_fds(self): loop = self.create_event_loop() if not isinstance(loop, selector_events.BaseSelectorEventLoop): |