diff options
author | Tony Solomonik <tony.solomonik@gmail.com> | 2020-07-14 19:41:24 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-14 19:41:24 (GMT) |
commit | 568fb0ff4aa641329261cdde20795b0aa9278175 (patch) | |
tree | ca72a11fd5ac68d82b6f37c0484c22f27af3b525 /Lib/asyncio/proactor_events.py | |
parent | 2a5181829af394b82e8e8c917183c709ee72a2b7 (diff) | |
download | cpython-568fb0ff4aa641329261cdde20795b0aa9278175.zip cpython-568fb0ff4aa641329261cdde20795b0aa9278175.tar.gz cpython-568fb0ff4aa641329261cdde20795b0aa9278175.tar.bz2 |
bpo-41273: asyncio's proactor read transport's better performance by using recv_into instead of recv (#21442)
* bpo-41273: Proactor transport read loop to use recv_into
By using recv_into instead of recv we do not allocate a new buffer each
time _loop_reading calls recv.
This betters performance for any stream using proactor (basically any
asyncio stream on windows).
* bpo-41273: Double proactor read transport buffer size
By doubling the read buffer size we get better performance.
Diffstat (limited to 'Lib/asyncio/proactor_events.py')
-rw-r--r-- | Lib/asyncio/proactor_events.py | 40 |
1 files changed, 21 insertions, 19 deletions
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 8338449..d0b7100 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -179,11 +179,12 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, """Transport for read pipes.""" def __init__(self, loop, sock, protocol, waiter=None, - extra=None, server=None): - self._pending_data = None + extra=None, server=None, buffer_size=65536): + self._pending_data_length = -1 self._paused = True super().__init__(loop, sock, protocol, waiter, extra, server) + self._data = bytearray(buffer_size) self._loop.call_soon(self._loop_reading) self._paused = False @@ -217,12 +218,12 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if self._read_fut is None: self._loop.call_soon(self._loop_reading, None) - data = self._pending_data - self._pending_data = None - if data is not None: + length = self._pending_data_length + self._pending_data_length = -1 + if length > -1: # Call the protocol methode after calling _loop_reading(), # since the protocol can decide to pause reading again. - self._loop.call_soon(self._data_received, data) + self._loop.call_soon(self._data_received, self._data[:length], length) if self._loop.get_debug(): logger.debug("%r resumes reading", self) @@ -243,15 +244,15 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if not keep_open: self.close() - def _data_received(self, data): + def _data_received(self, data, length): if self._paused: # Don't call any protocol method while reading is paused. # The protocol will be called on resume_reading(). - assert self._pending_data is None - self._pending_data = data + assert self._pending_data_length == -1 + self._pending_data_length = length return - if not data: + if length == 0: self._eof_received() return @@ -269,6 +270,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, self._protocol.data_received(data) def _loop_reading(self, fut=None): + length = -1 data = None try: if fut is not None: @@ -277,18 +279,18 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, self._read_fut = None if fut.done(): # deliver data later in "finally" clause - data = fut.result() + length = fut.result() + if length == 0: + # we got end-of-file so no need to reschedule a new read + return + + data = self._data[:length] else: # the future will be replaced by next proactor.recv call fut.cancel() if self._closing: # since close() has been called we ignore any read data - data = None - return - - if data == b'': - # we got end-of-file so no need to reschedule a new read return # bpo-33694: buffer_updated() has currently no fast path because of @@ -296,7 +298,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if not self._paused: # reschedule a new read - self._read_fut = self._loop._proactor.recv(self._sock, 32768) + self._read_fut = self._loop._proactor.recv_into(self._sock, self._data) except ConnectionAbortedError as exc: if not self._closing: self._fatal_error(exc, 'Fatal read error on pipe transport') @@ -314,8 +316,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if not self._paused: self._read_fut.add_done_callback(self._loop_reading) finally: - if data is not None: - self._data_received(data) + if length > -1: + self._data_received(data, length) class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, |