diff options
author | Andrew Svetlov <andrew.svetlov@gmail.com> | 2018-10-09 04:52:57 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-09 04:52:57 (GMT) |
commit | 2b2758d0b30f4ed7d37319d6c18552eccbc8e7b7 (patch) | |
tree | c75330f29ba7fc380dc182bfd16657c51e2c84bb /Lib/test/test_asyncio/test_events.py | |
parent | 199a280af540e3194405eb250ca1a8d487f6a4f7 (diff) | |
download | cpython-2b2758d0b30f4ed7d37319d6c18552eccbc8e7b7.zip cpython-2b2758d0b30f4ed7d37319d6c18552eccbc8e7b7.tar.gz cpython-2b2758d0b30f4ed7d37319d6c18552eccbc8e7b7.tar.bz2 |
Extract sendfile tests into a separate test file (#9757)
Diffstat (limited to 'Lib/test/test_asyncio/test_events.py')
-rw-r--r-- | Lib/test/test_asyncio/test_events.py | 451 |
1 files changed, 1 insertions, 450 deletions
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 607c195..b76cfb7 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -15,7 +15,6 @@ except ImportError: ssl = None import subprocess import sys -import tempfile import threading import time import errno @@ -1987,461 +1986,15 @@ class SubprocessTestsMixin: self.loop.run_until_complete(connect(shell=False)) -class SendfileBase: - - DATA = b"SendfileBaseData" * (1024 * 8) # 128 KiB - - # Reduce socket buffer size to test on relative small data sets. - BUF_SIZE = 4 * 1024 # 4 KiB - - @classmethod - def setUpClass(cls): - with open(support.TESTFN, 'wb') as fp: - fp.write(cls.DATA) - super().setUpClass() - - @classmethod - def tearDownClass(cls): - support.unlink(support.TESTFN) - super().tearDownClass() - - def setUp(self): - self.file = open(support.TESTFN, 'rb') - self.addCleanup(self.file.close) - super().setUp() - - def run_loop(self, coro): - return self.loop.run_until_complete(coro) - - -class SockSendfileMixin(SendfileBase): - - class MyProto(asyncio.Protocol): - - def __init__(self, loop): - self.started = False - self.closed = False - self.data = bytearray() - self.fut = loop.create_future() - self.transport = None - - def connection_made(self, transport): - self.started = True - self.transport = transport - - def data_received(self, data): - self.data.extend(data) - - def connection_lost(self, exc): - self.closed = True - self.fut.set_result(None) - - async def wait_closed(self): - await self.fut - - @classmethod - def setUpClass(cls): - cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE - constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16 - super().setUpClass() - - @classmethod - def tearDownClass(cls): - constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize - super().tearDownClass() - - def make_socket(self, cleanup=True): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setblocking(False) - if cleanup: - self.addCleanup(sock.close) - return sock - - def reduce_receive_buffer_size(self, sock): - # Reduce receive socket buffer size to test on relative - # small data sets. - sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.BUF_SIZE) - - def reduce_send_buffer_size(self, sock, transport=None): - # Reduce send socket buffer size to test on relative small data sets. - - # On macOS, SO_SNDBUF is reset by connect(). So this method - # should be called after the socket is connected. - sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.BUF_SIZE) - - if transport is not None: - transport.set_write_buffer_limits(high=self.BUF_SIZE) - - def prepare_socksendfile(self): - proto = self.MyProto(self.loop) - port = support.find_unused_port() - srv_sock = self.make_socket(cleanup=False) - srv_sock.bind((support.HOST, port)) - server = self.run_loop(self.loop.create_server( - lambda: proto, sock=srv_sock)) - self.reduce_receive_buffer_size(srv_sock) - - sock = self.make_socket() - self.run_loop(self.loop.sock_connect(sock, ('127.0.0.1', port))) - self.reduce_send_buffer_size(sock) - - def cleanup(): - if proto.transport is not None: - # can be None if the task was cancelled before - # connection_made callback - proto.transport.close() - self.run_loop(proto.wait_closed()) - - server.close() - self.run_loop(server.wait_closed()) - - self.addCleanup(cleanup) - - return sock, proto - - def test_sock_sendfile_success(self): - sock, proto = self.prepare_socksendfile() - ret = self.run_loop(self.loop.sock_sendfile(sock, self.file)) - sock.close() - self.run_loop(proto.wait_closed()) - - self.assertEqual(ret, len(self.DATA)) - self.assertEqual(proto.data, self.DATA) - self.assertEqual(self.file.tell(), len(self.DATA)) - - def test_sock_sendfile_with_offset_and_count(self): - sock, proto = self.prepare_socksendfile() - ret = self.run_loop(self.loop.sock_sendfile(sock, self.file, - 1000, 2000)) - sock.close() - self.run_loop(proto.wait_closed()) - - self.assertEqual(proto.data, self.DATA[1000:3000]) - self.assertEqual(self.file.tell(), 3000) - self.assertEqual(ret, 2000) - - def test_sock_sendfile_zero_size(self): - sock, proto = self.prepare_socksendfile() - with tempfile.TemporaryFile() as f: - ret = self.run_loop(self.loop.sock_sendfile(sock, f, - 0, None)) - sock.close() - self.run_loop(proto.wait_closed()) - - self.assertEqual(ret, 0) - self.assertEqual(self.file.tell(), 0) - - def test_sock_sendfile_mix_with_regular_send(self): - buf = b"mix_regular_send" * (4 * 1024) # 64 KiB - sock, proto = self.prepare_socksendfile() - self.run_loop(self.loop.sock_sendall(sock, buf)) - ret = self.run_loop(self.loop.sock_sendfile(sock, self.file)) - self.run_loop(self.loop.sock_sendall(sock, buf)) - sock.close() - self.run_loop(proto.wait_closed()) - - self.assertEqual(ret, len(self.DATA)) - expected = buf + self.DATA + buf - self.assertEqual(proto.data, expected) - self.assertEqual(self.file.tell(), len(self.DATA)) - - -class SendfileMixin(SendfileBase): - - class MySendfileProto(MyBaseProto): - - def __init__(self, loop=None, close_after=0): - super().__init__(loop) - self.data = bytearray() - self.close_after = close_after - - def data_received(self, data): - self.data.extend(data) - super().data_received(data) - if self.close_after and self.nbytes >= self.close_after: - self.transport.close() - - - # Note: sendfile via SSL transport is equal to sendfile fallback - - def prepare_sendfile(self, *, is_ssl=False, close_after=0): - port = support.find_unused_port() - srv_proto = self.MySendfileProto(loop=self.loop, - close_after=close_after) - if is_ssl: - if not ssl: - self.skipTest("No ssl module") - srv_ctx = test_utils.simple_server_sslcontext() - cli_ctx = test_utils.simple_client_sslcontext() - else: - srv_ctx = None - cli_ctx = None - srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - srv_sock.bind((support.HOST, port)) - server = self.run_loop(self.loop.create_server( - lambda: srv_proto, sock=srv_sock, ssl=srv_ctx)) - self.reduce_receive_buffer_size(srv_sock) - - if is_ssl: - server_hostname = support.HOST - else: - server_hostname = None - cli_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - cli_sock.connect((support.HOST, port)) - - cli_proto = self.MySendfileProto(loop=self.loop) - tr, pr = self.run_loop(self.loop.create_connection( - lambda: cli_proto, sock=cli_sock, - ssl=cli_ctx, server_hostname=server_hostname)) - self.reduce_send_buffer_size(cli_sock, transport=tr) - - def cleanup(): - srv_proto.transport.close() - cli_proto.transport.close() - self.run_loop(srv_proto.done) - self.run_loop(cli_proto.done) - - server.close() - self.run_loop(server.wait_closed()) - - self.addCleanup(cleanup) - return srv_proto, cli_proto - - @unittest.skipIf(sys.platform == 'win32', "UDP sockets are not supported") - def test_sendfile_not_supported(self): - tr, pr = self.run_loop( - self.loop.create_datagram_endpoint( - lambda: MyDatagramProto(loop=self.loop), - family=socket.AF_INET)) - try: - with self.assertRaisesRegex(RuntimeError, "not supported"): - self.run_loop( - self.loop.sendfile(tr, self.file)) - self.assertEqual(0, self.file.tell()) - finally: - # don't use self.addCleanup because it produces resource warning - tr.close() - - def test_sendfile(self): - srv_proto, cli_proto = self.prepare_sendfile() - ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.file)) - cli_proto.transport.close() - self.run_loop(srv_proto.done) - self.assertEqual(ret, len(self.DATA)) - self.assertEqual(srv_proto.nbytes, len(self.DATA)) - self.assertEqual(srv_proto.data, self.DATA) - self.assertEqual(self.file.tell(), len(self.DATA)) - - def test_sendfile_force_fallback(self): - srv_proto, cli_proto = self.prepare_sendfile() - - def sendfile_native(transp, file, offset, count): - # to raise SendfileNotAvailableError - return base_events.BaseEventLoop._sendfile_native( - self.loop, transp, file, offset, count) - - self.loop._sendfile_native = sendfile_native - - ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.file)) - cli_proto.transport.close() - self.run_loop(srv_proto.done) - self.assertEqual(ret, len(self.DATA)) - self.assertEqual(srv_proto.nbytes, len(self.DATA)) - self.assertEqual(srv_proto.data, self.DATA) - self.assertEqual(self.file.tell(), len(self.DATA)) - - def test_sendfile_force_unsupported_native(self): - if sys.platform == 'win32': - if isinstance(self.loop, asyncio.ProactorEventLoop): - self.skipTest("Fails on proactor event loop") - srv_proto, cli_proto = self.prepare_sendfile() - - def sendfile_native(transp, file, offset, count): - # to raise SendfileNotAvailableError - return base_events.BaseEventLoop._sendfile_native( - self.loop, transp, file, offset, count) - - self.loop._sendfile_native = sendfile_native - - with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, - "not supported"): - self.run_loop( - self.loop.sendfile(cli_proto.transport, self.file, - fallback=False)) - - cli_proto.transport.close() - self.run_loop(srv_proto.done) - self.assertEqual(srv_proto.nbytes, 0) - self.assertEqual(self.file.tell(), 0) - - def test_sendfile_ssl(self): - srv_proto, cli_proto = self.prepare_sendfile(is_ssl=True) - ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.file)) - cli_proto.transport.close() - self.run_loop(srv_proto.done) - self.assertEqual(ret, len(self.DATA)) - self.assertEqual(srv_proto.nbytes, len(self.DATA)) - self.assertEqual(srv_proto.data, self.DATA) - self.assertEqual(self.file.tell(), len(self.DATA)) - - def test_sendfile_for_closing_transp(self): - srv_proto, cli_proto = self.prepare_sendfile() - cli_proto.transport.close() - with self.assertRaisesRegex(RuntimeError, "is closing"): - self.run_loop(self.loop.sendfile(cli_proto.transport, self.file)) - self.run_loop(srv_proto.done) - self.assertEqual(srv_proto.nbytes, 0) - self.assertEqual(self.file.tell(), 0) - - def test_sendfile_pre_and_post_data(self): - srv_proto, cli_proto = self.prepare_sendfile() - PREFIX = b'PREFIX__' * 1024 # 8 KiB - SUFFIX = b'--SUFFIX' * 1024 # 8 KiB - cli_proto.transport.write(PREFIX) - ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.file)) - cli_proto.transport.write(SUFFIX) - cli_proto.transport.close() - self.run_loop(srv_proto.done) - self.assertEqual(ret, len(self.DATA)) - self.assertEqual(srv_proto.data, PREFIX + self.DATA + SUFFIX) - self.assertEqual(self.file.tell(), len(self.DATA)) - - def test_sendfile_ssl_pre_and_post_data(self): - srv_proto, cli_proto = self.prepare_sendfile(is_ssl=True) - PREFIX = b'zxcvbnm' * 1024 - SUFFIX = b'0987654321' * 1024 - cli_proto.transport.write(PREFIX) - ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.file)) - cli_proto.transport.write(SUFFIX) - cli_proto.transport.close() - self.run_loop(srv_proto.done) - self.assertEqual(ret, len(self.DATA)) - self.assertEqual(srv_proto.data, PREFIX + self.DATA + SUFFIX) - self.assertEqual(self.file.tell(), len(self.DATA)) - - def test_sendfile_partial(self): - srv_proto, cli_proto = self.prepare_sendfile() - ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.file, 1000, 100)) - cli_proto.transport.close() - self.run_loop(srv_proto.done) - self.assertEqual(ret, 100) - self.assertEqual(srv_proto.nbytes, 100) - self.assertEqual(srv_proto.data, self.DATA[1000:1100]) - self.assertEqual(self.file.tell(), 1100) - - def test_sendfile_ssl_partial(self): - srv_proto, cli_proto = self.prepare_sendfile(is_ssl=True) - ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.file, 1000, 100)) - cli_proto.transport.close() - self.run_loop(srv_proto.done) - self.assertEqual(ret, 100) - self.assertEqual(srv_proto.nbytes, 100) - self.assertEqual(srv_proto.data, self.DATA[1000:1100]) - self.assertEqual(self.file.tell(), 1100) - - def test_sendfile_close_peer_after_receiving(self): - srv_proto, cli_proto = self.prepare_sendfile( - close_after=len(self.DATA)) - ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.file)) - cli_proto.transport.close() - self.run_loop(srv_proto.done) - self.assertEqual(ret, len(self.DATA)) - self.assertEqual(srv_proto.nbytes, len(self.DATA)) - self.assertEqual(srv_proto.data, self.DATA) - self.assertEqual(self.file.tell(), len(self.DATA)) - - def test_sendfile_ssl_close_peer_after_receiving(self): - srv_proto, cli_proto = self.prepare_sendfile( - is_ssl=True, close_after=len(self.DATA)) - ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.file)) - self.run_loop(srv_proto.done) - self.assertEqual(ret, len(self.DATA)) - self.assertEqual(srv_proto.nbytes, len(self.DATA)) - self.assertEqual(srv_proto.data, self.DATA) - self.assertEqual(self.file.tell(), len(self.DATA)) - - def test_sendfile_close_peer_in_the_middle_of_receiving(self): - srv_proto, cli_proto = self.prepare_sendfile(close_after=1024) - with self.assertRaises(ConnectionError): - self.run_loop( - self.loop.sendfile(cli_proto.transport, self.file)) - self.run_loop(srv_proto.done) - - self.assertTrue(1024 <= srv_proto.nbytes < len(self.DATA), - srv_proto.nbytes) - self.assertTrue(1024 <= self.file.tell() < len(self.DATA), - self.file.tell()) - self.assertTrue(cli_proto.transport.is_closing()) - - def test_sendfile_fallback_close_peer_in_the_middle_of_receiving(self): - - def sendfile_native(transp, file, offset, count): - # to raise SendfileNotAvailableError - return base_events.BaseEventLoop._sendfile_native( - self.loop, transp, file, offset, count) - - self.loop._sendfile_native = sendfile_native - - srv_proto, cli_proto = self.prepare_sendfile(close_after=1024) - with self.assertRaises(ConnectionError): - self.run_loop( - self.loop.sendfile(cli_proto.transport, self.file)) - self.run_loop(srv_proto.done) - - self.assertTrue(1024 <= srv_proto.nbytes < len(self.DATA), - srv_proto.nbytes) - self.assertTrue(1024 <= self.file.tell() < len(self.DATA), - self.file.tell()) - - @unittest.skipIf(not hasattr(os, 'sendfile'), - "Don't have native sendfile support") - def test_sendfile_prevents_bare_write(self): - srv_proto, cli_proto = self.prepare_sendfile() - fut = self.loop.create_future() - - async def coro(): - fut.set_result(None) - return await self.loop.sendfile(cli_proto.transport, self.file) - - t = self.loop.create_task(coro()) - self.run_loop(fut) - with self.assertRaisesRegex(RuntimeError, - "sendfile is in progress"): - cli_proto.transport.write(b'data') - ret = self.run_loop(t) - self.assertEqual(ret, len(self.DATA)) - - def test_sendfile_no_fallback_for_fallback_transport(self): - transport = mock.Mock() - transport.is_closing.side_effect = lambda: False - transport._sendfile_compatible = constants._SendfileMode.FALLBACK - with self.assertRaisesRegex(RuntimeError, 'fallback is disabled'): - self.loop.run_until_complete( - self.loop.sendfile(transport, None, fallback=False)) - - if sys.platform == 'win32': class SelectEventLoopTests(EventLoopTestsMixin, - SendfileMixin, - SockSendfileMixin, test_utils.TestCase): def create_event_loop(self): return asyncio.SelectorEventLoop() class ProactorEventLoopTests(EventLoopTestsMixin, - SendfileMixin, - SockSendfileMixin, SubprocessTestsMixin, test_utils.TestCase): @@ -2469,9 +2022,7 @@ if sys.platform == 'win32': else: import selectors - class UnixEventLoopTestsMixin(EventLoopTestsMixin, - SendfileMixin, - SockSendfileMixin): + class UnixEventLoopTestsMixin(EventLoopTestsMixin): def setUp(self): super().setUp() watcher = asyncio.SafeChildWatcher() |