diff options
author | Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> | 2022-12-24 05:51:11 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-24 05:51:11 (GMT) |
commit | c122390a5557b042ca6e0aece22149b972d68c9b (patch) | |
tree | b3fba638c85b29b4166bc3efd21e4820503ac087 /Lib/asyncio | |
parent | 0f6420640c0f3462e6b76b01a392844676de1fb9 (diff) | |
download | cpython-c122390a5557b042ca6e0aece22149b972d68c9b.zip cpython-c122390a5557b042ca6e0aece22149b972d68c9b.tar.gz cpython-c122390a5557b042ca6e0aece22149b972d68c9b.tar.bz2 |
GH-91166: Implement zero copy writes for `SelectorSocketTransport` in asyncio (#31871)
Co-authored-by: Guido van Rossum <gvanrossum@gmail.com>
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/selector_events.py | 86 |
1 files changed, 75 insertions, 11 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 74f289f..de5076a 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -9,6 +9,8 @@ __all__ = 'BaseSelectorEventLoop', import collections import errno import functools +import itertools +import os import selectors import socket import warnings @@ -28,6 +30,14 @@ from . import transports from . import trsock from .log import logger +_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg') + +if _HAS_SENDMSG: + try: + SC_IOV_MAX = os.sysconf('SC_IOV_MAX') + except OSError: + # Fallback to send + _HAS_SENDMSG = False def _test_selector_event(selector, fd, event): # Test if the selector is monitoring 'event' events @@ -757,8 +767,6 @@ class _SelectorTransport(transports._FlowControlMixin, max_size = 256 * 1024 # Buffer size passed to recv(). - _buffer_factory = bytearray # Constructs initial value for self._buffer. - # Attribute used in the destructor: it must be set even if the constructor # is not called (see _SelectorSslTransport which may start by raising an # exception) @@ -783,7 +791,7 @@ class _SelectorTransport(transports._FlowControlMixin, self.set_protocol(protocol) self._server = server - self._buffer = self._buffer_factory() + self._buffer = collections.deque() self._conn_lost = 0 # Set when call to connection_lost scheduled. self._closing = False # Set when close() called. if self._server is not None: @@ -887,7 +895,7 @@ class _SelectorTransport(transports._FlowControlMixin, self._server = None def get_write_buffer_size(self): - return len(self._buffer) + return sum(map(len, self._buffer)) def _add_reader(self, fd, callback, *args): if self._closing: @@ -909,7 +917,10 @@ class _SelectorSocketTransport(_SelectorTransport): self._eof = False self._paused = False self._empty_waiter = None - + if _HAS_SENDMSG: + self._write_ready = self._write_sendmsg + else: + self._write_ready = self._write_send # Disable the Nagle algorithm -- small writes will be # sent without waiting for the TCP ACK. This generally # decreases the latency (in some cases significantly.) @@ -1066,23 +1077,68 @@ class _SelectorSocketTransport(_SelectorTransport): self._fatal_error(exc, 'Fatal write error on socket transport') return else: - data = data[n:] + data = memoryview(data)[n:] if not data: return # Not all was written; register write handler. self._loop._add_writer(self._sock_fd, self._write_ready) # Add it to the buffer. - self._buffer.extend(data) + self._buffer.append(data) self._maybe_pause_protocol() - def _write_ready(self): + def _get_sendmsg_buffer(self): + return itertools.islice(self._buffer, SC_IOV_MAX) + + def _write_sendmsg(self): assert self._buffer, 'Data should not be empty' + if self._conn_lost: + return + try: + nbytes = self._sock.sendmsg(self._get_sendmsg_buffer()) + self._adjust_leftover_buffer(nbytes) + except (BlockingIOError, InterruptedError): + pass + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: + self._loop._remove_writer(self._sock_fd) + self._buffer.clear() + self._fatal_error(exc, 'Fatal write error on socket transport') + if self._empty_waiter is not None: + self._empty_waiter.set_exception(exc) + else: + self._maybe_resume_protocol() # May append to buffer. + if not self._buffer: + self._loop._remove_writer(self._sock_fd) + if self._empty_waiter is not None: + self._empty_waiter.set_result(None) + if self._closing: + self._call_connection_lost(None) + elif self._eof: + self._sock.shutdown(socket.SHUT_WR) + def _adjust_leftover_buffer(self, nbytes: int) -> None: + buffer = self._buffer + while nbytes: + b = buffer.popleft() + b_len = len(b) + if b_len <= nbytes: + nbytes -= b_len + else: + buffer.appendleft(b[nbytes:]) + break + + def _write_send(self): + assert self._buffer, 'Data should not be empty' if self._conn_lost: return try: - n = self._sock.send(self._buffer) + buffer = self._buffer.popleft() + n = self._sock.send(buffer) + if n != len(buffer): + # Not all data was written + self._buffer.appendleft(buffer[n:]) except (BlockingIOError, InterruptedError): pass except (SystemExit, KeyboardInterrupt): @@ -1094,8 +1150,6 @@ class _SelectorSocketTransport(_SelectorTransport): if self._empty_waiter is not None: self._empty_waiter.set_exception(exc) else: - if n: - del self._buffer[:n] self._maybe_resume_protocol() # May append to buffer. if not self._buffer: self._loop._remove_writer(self._sock_fd) @@ -1113,6 +1167,16 @@ class _SelectorSocketTransport(_SelectorTransport): if not self._buffer: self._sock.shutdown(socket.SHUT_WR) + def writelines(self, list_of_data): + if self._eof: + raise RuntimeError('Cannot call writelines() after write_eof()') + if self._empty_waiter is not None: + raise RuntimeError('unable to writelines; sendfile is in progress') + if not list_of_data: + return + self._buffer.extend([memoryview(data) for data in list_of_data]) + self._write_ready() + def can_write_eof(self): return True |