summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/selector_events.py
diff options
context:
space:
mode:
authorYury Selivanov <yury@magic.io>2018-01-28 21:30:26 (GMT)
committerGitHub <noreply@github.com>2018-01-28 21:30:26 (GMT)
commit631fd38dbf04dbf0127881f3977982e401a849e4 (patch)
tree1d252eeef90eced0770b2d906b8540ec28012060 /Lib/asyncio/selector_events.py
parent0ceb717689b04c0540d78c1ba93c0572c66c0994 (diff)
downloadcpython-631fd38dbf04dbf0127881f3977982e401a849e4.zip
cpython-631fd38dbf04dbf0127881f3977982e401a849e4.tar.gz
cpython-631fd38dbf04dbf0127881f3977982e401a849e4.tar.bz2
bpo-32251: Implement asyncio.BufferedProtocol. (#4755)
Diffstat (limited to 'Lib/asyncio/selector_events.py')
-rw-r--r--Lib/asyncio/selector_events.py84
1 files changed, 68 insertions, 16 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 5956f2d..354bf9d 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -22,8 +22,9 @@ from . import base_events
from . import constants
from . import events
from . import futures
-from . import transports
+from . import protocols
from . import sslproto
+from . import transports
from .log import logger
@@ -713,6 +714,12 @@ class _SelectorSocketTransport(_SelectorTransport):
def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
+
+ if protocols._is_buffered_protocol(protocol):
+ self._read_ready = self._read_ready__get_buffer
+ else:
+ self._read_ready = self._read_ready__data_received
+
super().__init__(loop, sock, protocol, extra, server)
self._eof = False
self._paused = False
@@ -751,29 +758,74 @@ class _SelectorSocketTransport(_SelectorTransport):
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
- def _read_ready(self):
+ def _read_ready__get_buffer(self):
+ if self._conn_lost:
+ return
+
+ try:
+ buf = self._protocol.get_buffer()
+ except Exception as exc:
+ self._fatal_error(
+ exc, 'Fatal error: protocol.get_buffer() call failed.')
+ return
+
+ try:
+ nbytes = self._sock.recv_into(buf)
+ except (BlockingIOError, InterruptedError):
+ return
+ except Exception as exc:
+ self._fatal_error(exc, 'Fatal read error on socket transport')
+ return
+
+ if not nbytes:
+ self._read_ready__on_eof()
+ return
+
+ try:
+ self._protocol.buffer_updated(nbytes)
+ except Exception as exc:
+ self._fatal_error(
+ exc, 'Fatal error: protocol.buffer_updated() call failed.')
+
+ def _read_ready__data_received(self):
if self._conn_lost:
return
try:
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError):
- pass
+ return
except Exception as exc:
self._fatal_error(exc, 'Fatal read error on socket transport')
+ return
+
+ if not data:
+ self._read_ready__on_eof()
+ return
+
+ try:
+ self._protocol.data_received(data)
+ except Exception as exc:
+ self._fatal_error(
+ exc, 'Fatal error: protocol.data_received() call failed.')
+
+ def _read_ready__on_eof(self):
+ if self._loop.get_debug():
+ logger.debug("%r received EOF", self)
+
+ try:
+ keep_open = self._protocol.eof_received()
+ except Exception as exc:
+ self._fatal_error(
+ exc, 'Fatal error: protocol.eof_received() call failed.')
+ return
+
+ if keep_open:
+ # We're keeping the connection open so the
+ # protocol can write more, but we still can't
+ # receive more, so remove the reader callback.
+ self._loop._remove_reader(self._sock_fd)
else:
- if data:
- self._protocol.data_received(data)
- else:
- if self._loop.get_debug():
- logger.debug("%r received EOF", self)
- keep_open = self._protocol.eof_received()
- if keep_open:
- # We're keeping the connection open so the
- # protocol can write more, but we still can't
- # receive more, so remove the reader callback.
- self._loop._remove_reader(self._sock_fd)
- else:
- self.close()
+ self.close()
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):