summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio
diff options
context:
space:
mode:
authorYury Selivanov <yury@magic.io>2018-05-28 18:31:28 (GMT)
committerGitHub <noreply@github.com>2018-05-28 18:31:28 (GMT)
commitdbf102271fcc316f353c7e0a283811b661d128f2 (patch)
tree8807a0305490616dc3b480fae5e50e98c80b4fa8 /Lib/asyncio
parente549c4be5fb010f5faf12236af8faa720a1429be (diff)
downloadcpython-dbf102271fcc316f353c7e0a283811b661d128f2.zip
cpython-dbf102271fcc316f353c7e0a283811b661d128f2.tar.gz
cpython-dbf102271fcc316f353c7e0a283811b661d128f2.tar.bz2
bpo-33654: Support BufferedProtocol in set_protocol() and start_tls() (GH-7130)
In this commit: * Support BufferedProtocol in set_protocol() and start_tls() * Fix proactor to cancel readers reliably * Update tests to be compatible with OpenSSL 1.1.1 * Clarify BufferedProtocol docs * Bump TLS tests timeouts to 60 seconds; eliminate possible race from start_serving * Rewrite test_start_tls_server_1
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/base_events.py7
-rw-r--r--Lib/asyncio/proactor_events.py52
-rw-r--r--Lib/asyncio/protocols.py10
-rw-r--r--Lib/asyncio/selector_events.py28
-rw-r--r--Lib/asyncio/sslproto.py32
-rw-r--r--Lib/asyncio/unix_events.py4
6 files changed, 107 insertions, 26 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index 09eb440..a0243f5 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -157,7 +157,6 @@ def _run_until_complete_cb(fut):
futures._get_loop(fut).stop()
-
class _SendfileFallbackProtocol(protocols.Protocol):
def __init__(self, transp):
if not isinstance(transp, transports._FlowControlMixin):
@@ -304,6 +303,9 @@ class Server(events.AbstractServer):
async def start_serving(self):
self._start_serving()
+ # Skip one loop iteration so that all 'loop.add_reader'
+ # go through.
+ await tasks.sleep(0, loop=self._loop)
async def serve_forever(self):
if self._serving_forever_fut is not None:
@@ -1363,6 +1365,9 @@ class BaseEventLoop(events.AbstractEventLoop):
ssl, backlog, ssl_handshake_timeout)
if start_serving:
server._start_serving()
+ # Skip one loop iteration so that all 'loop.add_reader'
+ # go through.
+ await tasks.sleep(0, loop=self)
if self._debug:
logger.info("%r is serving", server)
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index 877dfb0..337ed0f 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -30,7 +30,7 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
super().__init__(extra, loop)
self._set_extra(sock)
self._sock = sock
- self._protocol = protocol
+ self.set_protocol(protocol)
self._server = server
self._buffer = None # None or bytearray.
self._read_fut = None
@@ -159,16 +159,26 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
+ self._loop_reading_cb = None
+ self._paused = True
super().__init__(loop, sock, protocol, waiter, extra, server)
- self._paused = False
+
self._reschedule_on_resume = False
+ self._loop.call_soon(self._loop_reading)
+ self._paused = False
- if protocols._is_buffered_protocol(protocol):
- self._loop_reading = self._loop_reading__get_buffer
+ def set_protocol(self, protocol):
+ if isinstance(protocol, protocols.BufferedProtocol):
+ self._loop_reading_cb = self._loop_reading__get_buffer
else:
- self._loop_reading = self._loop_reading__data_received
+ self._loop_reading_cb = self._loop_reading__data_received
- self._loop.call_soon(self._loop_reading)
+ super().set_protocol(protocol)
+
+ if self.is_reading():
+ # reset reading callback / buffers / self._read_fut
+ self.pause_reading()
+ self.resume_reading()
def is_reading(self):
return not self._paused and not self._closing
@@ -179,6 +189,13 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
self._paused = True
if self._read_fut is not None and not self._read_fut.done():
+ # TODO: This is an ugly hack to cancel the current read future
+ # *and* avoid potential race conditions, as read cancellation
+ # goes through `future.cancel()` and `loop.call_soon()`.
+ # We then use this special attribute in the reader callback to
+ # exit *immediately* without doing any cleanup/rescheduling.
+ self._read_fut.__asyncio_cancelled_on_pause__ = True
+
self._read_fut.cancel()
self._read_fut = None
self._reschedule_on_resume = True
@@ -210,7 +227,14 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if not keep_open:
self.close()
- def _loop_reading__data_received(self, fut=None):
+ def _loop_reading(self, fut=None):
+ self._loop_reading_cb(fut)
+
+ def _loop_reading__data_received(self, fut):
+ if (fut is not None and
+ getattr(fut, '__asyncio_cancelled_on_pause__', False)):
+ return
+
if self._paused:
self._reschedule_on_resume = True
return
@@ -253,14 +277,18 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if not self._closing:
raise
else:
- self._read_fut.add_done_callback(self._loop_reading)
+ self._read_fut.add_done_callback(self._loop_reading__data_received)
finally:
if data:
self._protocol.data_received(data)
elif data == b'':
self._loop_reading__on_eof()
- def _loop_reading__get_buffer(self, fut=None):
+ def _loop_reading__get_buffer(self, fut):
+ if (fut is not None and
+ getattr(fut, '__asyncio_cancelled_on_pause__', False)):
+ return
+
if self._paused:
self._reschedule_on_resume = True
return
@@ -310,7 +338,9 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
return
try:
- buf = self._protocol.get_buffer()
+ buf = self._protocol.get_buffer(-1)
+ if not len(buf):
+ raise RuntimeError('get_buffer() returned an empty buffer')
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.get_buffer() call failed.')
@@ -319,7 +349,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
try:
# schedule a new read
self._read_fut = self._loop._proactor.recv_into(self._sock, buf)
- self._read_fut.add_done_callback(self._loop_reading)
+ self._read_fut.add_done_callback(self._loop_reading__get_buffer)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
diff --git a/Lib/asyncio/protocols.py b/Lib/asyncio/protocols.py
index dc298a8..b8d2e6b 100644
--- a/Lib/asyncio/protocols.py
+++ b/Lib/asyncio/protocols.py
@@ -130,11 +130,15 @@ class BufferedProtocol(BaseProtocol):
* CL: connection_lost()
"""
- def get_buffer(self):
+ def get_buffer(self, sizehint):
"""Called to allocate a new receive buffer.
+ *sizehint* is a recommended minimal size for the returned
+ buffer. When set to -1, the buffer size can be arbitrary.
+
Must return an object that implements the
:ref:`buffer protocol <bufferobjects>`.
+ It is an error to return a zero-sized buffer.
"""
def buffer_updated(self, nbytes):
@@ -185,7 +189,3 @@ class SubprocessProtocol(BaseProtocol):
def process_exited(self):
"""Called when subprocess has exited."""
-
-
-def _is_buffered_protocol(proto):
- return hasattr(proto, 'get_buffer') and not hasattr(proto, 'data_received')
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 5473c70..116c08d 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -597,8 +597,10 @@ class _SelectorTransport(transports._FlowControlMixin,
self._extra['peername'] = None
self._sock = sock
self._sock_fd = sock.fileno()
- self._protocol = protocol
- self._protocol_connected = True
+
+ self._protocol_connected = False
+ self.set_protocol(protocol)
+
self._server = server
self._buffer = self._buffer_factory()
self._conn_lost = 0 # Set when call to connection_lost scheduled.
@@ -640,6 +642,7 @@ class _SelectorTransport(transports._FlowControlMixin,
def set_protocol(self, protocol):
self._protocol = protocol
+ self._protocol_connected = True
def get_protocol(self):
return self._protocol
@@ -721,11 +724,7 @@ class _SelectorSocketTransport(_SelectorTransport):
def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
- if protocols._is_buffered_protocol(protocol):
- self._read_ready = self._read_ready__get_buffer
- else:
- self._read_ready = self._read_ready__data_received
-
+ self._read_ready_cb = None
super().__init__(loop, sock, protocol, extra, server)
self._eof = False
self._paused = False
@@ -745,6 +744,14 @@ class _SelectorSocketTransport(_SelectorTransport):
self._loop.call_soon(futures._set_result_unless_cancelled,
waiter, None)
+ def set_protocol(self, protocol):
+ if isinstance(protocol, protocols.BufferedProtocol):
+ self._read_ready_cb = self._read_ready__get_buffer
+ else:
+ self._read_ready_cb = self._read_ready__data_received
+
+ super().set_protocol(protocol)
+
def is_reading(self):
return not self._paused and not self._closing
@@ -764,12 +771,17 @@ class _SelectorSocketTransport(_SelectorTransport):
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
+ def _read_ready(self):
+ self._read_ready_cb()
+
def _read_ready__get_buffer(self):
if self._conn_lost:
return
try:
- buf = self._protocol.get_buffer()
+ buf = self._protocol.get_buffer(-1)
+ if not len(buf):
+ raise RuntimeError('get_buffer() returned an empty buffer')
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.get_buffer() call failed.')
diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py
index 2bbf134..2bfa45d 100644
--- a/Lib/asyncio/sslproto.py
+++ b/Lib/asyncio/sslproto.py
@@ -441,6 +441,8 @@ class SSLProtocol(protocols.Protocol):
self._waiter = waiter
self._loop = loop
self._app_protocol = app_protocol
+ self._app_protocol_is_buffer = \
+ isinstance(app_protocol, protocols.BufferedProtocol)
self._app_transport = _SSLProtocolTransport(self._loop, self)
# _SSLPipe instance (None until the connection is made)
self._sslpipe = None
@@ -522,7 +524,16 @@ class SSLProtocol(protocols.Protocol):
for chunk in appdata:
if chunk:
- self._app_protocol.data_received(chunk)
+ try:
+ if self._app_protocol_is_buffer:
+ _feed_data_to_bufferred_proto(
+ self._app_protocol, chunk)
+ else:
+ self._app_protocol.data_received(chunk)
+ except Exception as ex:
+ self._fatal_error(
+ ex, 'application protocol failed to receive SSL data')
+ return
else:
self._start_shutdown()
break
@@ -709,3 +720,22 @@ class SSLProtocol(protocols.Protocol):
self._transport.abort()
finally:
self._finalize()
+
+
+def _feed_data_to_bufferred_proto(proto, data):
+ data_len = len(data)
+ while data_len:
+ buf = proto.get_buffer(data_len)
+ buf_len = len(buf)
+ if not buf_len:
+ raise RuntimeError('get_buffer() returned an empty buffer')
+
+ if buf_len >= data_len:
+ buf[:data_len] = data
+ proto.buffer_updated(data_len)
+ return
+ else:
+ buf[:buf_len] = data[:buf_len]
+ proto.buffer_updated(buf_len)
+ data = data[buf_len:]
+ data_len = len(data)
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index f64037a..7cad7e3 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -20,6 +20,7 @@ from . import coroutines
from . import events
from . import futures
from . import selector_events
+from . import tasks
from . import transports
from .log import logger
@@ -308,6 +309,9 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
ssl, backlog, ssl_handshake_timeout)
if start_serving:
server._start_serving()
+ # Skip one loop iteration so that all 'loop.add_reader'
+ # go through.
+ await tasks.sleep(0, loop=self)
return server