diff options
author | Yury Selivanov <yury@magic.io> | 2018-05-28 18:31:28 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-05-28 18:31:28 (GMT) |
commit | dbf102271fcc316f353c7e0a283811b661d128f2 (patch) | |
tree | 8807a0305490616dc3b480fae5e50e98c80b4fa8 /Lib/asyncio | |
parent | e549c4be5fb010f5faf12236af8faa720a1429be (diff) | |
download | cpython-dbf102271fcc316f353c7e0a283811b661d128f2.zip cpython-dbf102271fcc316f353c7e0a283811b661d128f2.tar.gz cpython-dbf102271fcc316f353c7e0a283811b661d128f2.tar.bz2 |
bpo-33654: Support BufferedProtocol in set_protocol() and start_tls() (GH-7130)
In this commit:
* Support BufferedProtocol in set_protocol() and start_tls()
* Fix proactor to cancel readers reliably
* Update tests to be compatible with OpenSSL 1.1.1
* Clarify BufferedProtocol docs
* Bump TLS tests timeouts to 60 seconds; eliminate possible race from start_serving
* Rewrite test_start_tls_server_1
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/base_events.py | 7 | ||||
-rw-r--r-- | Lib/asyncio/proactor_events.py | 52 | ||||
-rw-r--r-- | Lib/asyncio/protocols.py | 10 | ||||
-rw-r--r-- | Lib/asyncio/selector_events.py | 28 | ||||
-rw-r--r-- | Lib/asyncio/sslproto.py | 32 | ||||
-rw-r--r-- | Lib/asyncio/unix_events.py | 4 |
6 files changed, 107 insertions, 26 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 09eb440..a0243f5 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -157,7 +157,6 @@ def _run_until_complete_cb(fut): futures._get_loop(fut).stop() - class _SendfileFallbackProtocol(protocols.Protocol): def __init__(self, transp): if not isinstance(transp, transports._FlowControlMixin): @@ -304,6 +303,9 @@ class Server(events.AbstractServer): async def start_serving(self): self._start_serving() + # Skip one loop iteration so that all 'loop.add_reader' + # go through. + await tasks.sleep(0, loop=self._loop) async def serve_forever(self): if self._serving_forever_fut is not None: @@ -1363,6 +1365,9 @@ class BaseEventLoop(events.AbstractEventLoop): ssl, backlog, ssl_handshake_timeout) if start_serving: server._start_serving() + # Skip one loop iteration so that all 'loop.add_reader' + # go through. + await tasks.sleep(0, loop=self) if self._debug: logger.info("%r is serving", server) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 877dfb0..337ed0f 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -30,7 +30,7 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin, super().__init__(extra, loop) self._set_extra(sock) self._sock = sock - self._protocol = protocol + self.set_protocol(protocol) self._server = server self._buffer = None # None or bytearray. self._read_fut = None @@ -159,16 +159,26 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, def __init__(self, loop, sock, protocol, waiter=None, extra=None, server=None): + self._loop_reading_cb = None + self._paused = True super().__init__(loop, sock, protocol, waiter, extra, server) - self._paused = False + self._reschedule_on_resume = False + self._loop.call_soon(self._loop_reading) + self._paused = False - if protocols._is_buffered_protocol(protocol): - self._loop_reading = self._loop_reading__get_buffer + def set_protocol(self, protocol): + if isinstance(protocol, protocols.BufferedProtocol): + self._loop_reading_cb = self._loop_reading__get_buffer else: - self._loop_reading = self._loop_reading__data_received + self._loop_reading_cb = self._loop_reading__data_received - self._loop.call_soon(self._loop_reading) + super().set_protocol(protocol) + + if self.is_reading(): + # reset reading callback / buffers / self._read_fut + self.pause_reading() + self.resume_reading() def is_reading(self): return not self._paused and not self._closing @@ -179,6 +189,13 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, self._paused = True if self._read_fut is not None and not self._read_fut.done(): + # TODO: This is an ugly hack to cancel the current read future + # *and* avoid potential race conditions, as read cancellation + # goes through `future.cancel()` and `loop.call_soon()`. + # We then use this special attribute in the reader callback to + # exit *immediately* without doing any cleanup/rescheduling. + self._read_fut.__asyncio_cancelled_on_pause__ = True + self._read_fut.cancel() self._read_fut = None self._reschedule_on_resume = True @@ -210,7 +227,14 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if not keep_open: self.close() - def _loop_reading__data_received(self, fut=None): + def _loop_reading(self, fut=None): + self._loop_reading_cb(fut) + + def _loop_reading__data_received(self, fut): + if (fut is not None and + getattr(fut, '__asyncio_cancelled_on_pause__', False)): + return + if self._paused: self._reschedule_on_resume = True return @@ -253,14 +277,18 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if not self._closing: raise else: - self._read_fut.add_done_callback(self._loop_reading) + self._read_fut.add_done_callback(self._loop_reading__data_received) finally: if data: self._protocol.data_received(data) elif data == b'': self._loop_reading__on_eof() - def _loop_reading__get_buffer(self, fut=None): + def _loop_reading__get_buffer(self, fut): + if (fut is not None and + getattr(fut, '__asyncio_cancelled_on_pause__', False)): + return + if self._paused: self._reschedule_on_resume = True return @@ -310,7 +338,9 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, return try: - buf = self._protocol.get_buffer() + buf = self._protocol.get_buffer(-1) + if not len(buf): + raise RuntimeError('get_buffer() returned an empty buffer') except Exception as exc: self._fatal_error( exc, 'Fatal error: protocol.get_buffer() call failed.') @@ -319,7 +349,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, try: # schedule a new read self._read_fut = self._loop._proactor.recv_into(self._sock, buf) - self._read_fut.add_done_callback(self._loop_reading) + self._read_fut.add_done_callback(self._loop_reading__get_buffer) except ConnectionAbortedError as exc: if not self._closing: self._fatal_error(exc, 'Fatal read error on pipe transport') diff --git a/Lib/asyncio/protocols.py b/Lib/asyncio/protocols.py index dc298a8..b8d2e6b 100644 --- a/Lib/asyncio/protocols.py +++ b/Lib/asyncio/protocols.py @@ -130,11 +130,15 @@ class BufferedProtocol(BaseProtocol): * CL: connection_lost() """ - def get_buffer(self): + def get_buffer(self, sizehint): """Called to allocate a new receive buffer. + *sizehint* is a recommended minimal size for the returned + buffer. When set to -1, the buffer size can be arbitrary. + Must return an object that implements the :ref:`buffer protocol <bufferobjects>`. + It is an error to return a zero-sized buffer. """ def buffer_updated(self, nbytes): @@ -185,7 +189,3 @@ class SubprocessProtocol(BaseProtocol): def process_exited(self): """Called when subprocess has exited.""" - - -def _is_buffered_protocol(proto): - return hasattr(proto, 'get_buffer') and not hasattr(proto, 'data_received') diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 5473c70..116c08d 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -597,8 +597,10 @@ class _SelectorTransport(transports._FlowControlMixin, self._extra['peername'] = None self._sock = sock self._sock_fd = sock.fileno() - self._protocol = protocol - self._protocol_connected = True + + self._protocol_connected = False + self.set_protocol(protocol) + self._server = server self._buffer = self._buffer_factory() self._conn_lost = 0 # Set when call to connection_lost scheduled. @@ -640,6 +642,7 @@ class _SelectorTransport(transports._FlowControlMixin, def set_protocol(self, protocol): self._protocol = protocol + self._protocol_connected = True def get_protocol(self): return self._protocol @@ -721,11 +724,7 @@ class _SelectorSocketTransport(_SelectorTransport): def __init__(self, loop, sock, protocol, waiter=None, extra=None, server=None): - if protocols._is_buffered_protocol(protocol): - self._read_ready = self._read_ready__get_buffer - else: - self._read_ready = self._read_ready__data_received - + self._read_ready_cb = None super().__init__(loop, sock, protocol, extra, server) self._eof = False self._paused = False @@ -745,6 +744,14 @@ class _SelectorSocketTransport(_SelectorTransport): self._loop.call_soon(futures._set_result_unless_cancelled, waiter, None) + def set_protocol(self, protocol): + if isinstance(protocol, protocols.BufferedProtocol): + self._read_ready_cb = self._read_ready__get_buffer + else: + self._read_ready_cb = self._read_ready__data_received + + super().set_protocol(protocol) + def is_reading(self): return not self._paused and not self._closing @@ -764,12 +771,17 @@ class _SelectorSocketTransport(_SelectorTransport): if self._loop.get_debug(): logger.debug("%r resumes reading", self) + def _read_ready(self): + self._read_ready_cb() + def _read_ready__get_buffer(self): if self._conn_lost: return try: - buf = self._protocol.get_buffer() + buf = self._protocol.get_buffer(-1) + if not len(buf): + raise RuntimeError('get_buffer() returned an empty buffer') except Exception as exc: self._fatal_error( exc, 'Fatal error: protocol.get_buffer() call failed.') diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py index 2bbf134..2bfa45d 100644 --- a/Lib/asyncio/sslproto.py +++ b/Lib/asyncio/sslproto.py @@ -441,6 +441,8 @@ class SSLProtocol(protocols.Protocol): self._waiter = waiter self._loop = loop self._app_protocol = app_protocol + self._app_protocol_is_buffer = \ + isinstance(app_protocol, protocols.BufferedProtocol) self._app_transport = _SSLProtocolTransport(self._loop, self) # _SSLPipe instance (None until the connection is made) self._sslpipe = None @@ -522,7 +524,16 @@ class SSLProtocol(protocols.Protocol): for chunk in appdata: if chunk: - self._app_protocol.data_received(chunk) + try: + if self._app_protocol_is_buffer: + _feed_data_to_bufferred_proto( + self._app_protocol, chunk) + else: + self._app_protocol.data_received(chunk) + except Exception as ex: + self._fatal_error( + ex, 'application protocol failed to receive SSL data') + return else: self._start_shutdown() break @@ -709,3 +720,22 @@ class SSLProtocol(protocols.Protocol): self._transport.abort() finally: self._finalize() + + +def _feed_data_to_bufferred_proto(proto, data): + data_len = len(data) + while data_len: + buf = proto.get_buffer(data_len) + buf_len = len(buf) + if not buf_len: + raise RuntimeError('get_buffer() returned an empty buffer') + + if buf_len >= data_len: + buf[:data_len] = data + proto.buffer_updated(data_len) + return + else: + buf[:buf_len] = data[:buf_len] + proto.buffer_updated(buf_len) + data = data[buf_len:] + data_len = len(data) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index f64037a..7cad7e3 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -20,6 +20,7 @@ from . import coroutines from . import events from . import futures from . import selector_events +from . import tasks from . import transports from .log import logger @@ -308,6 +309,9 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): ssl, backlog, ssl_handshake_timeout) if start_serving: server._start_serving() + # Skip one loop iteration so that all 'loop.add_reader' + # go through. + await tasks.sleep(0, loop=self) return server |