diff options
Diffstat (limited to 'Lib/asyncio/streams.py')
-rw-r--r-- | Lib/asyncio/streams.py | 35 |
1 files changed, 27 insertions, 8 deletions
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 79adf02..d9a9f5e 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -352,6 +352,8 @@ class StreamWriter: assert reader is None or isinstance(reader, StreamReader) self._reader = reader self._loop = loop + self._complete_fut = self._loop.create_future() + self._complete_fut.set_result(None) def __repr__(self): info = [self.__class__.__name__, f'transport={self._transport!r}'] @@ -365,9 +367,33 @@ class StreamWriter: def write(self, data): self._transport.write(data) + return self._fast_drain() def writelines(self, data): self._transport.writelines(data) + return self._fast_drain() + + def _fast_drain(self): + # The helper tries to use fast-path to return already existing complete future + # object if underlying transport is not paused and actual waiting for writing + # resume is not needed + if self._reader is not None: + # this branch will be simplified after merging reader with writer + exc = self._reader.exception() + if exc is not None: + fut = self._loop.create_future() + fut.set_exception(exc) + return fut + if not self._transport.is_closing(): + if self._protocol._connection_lost: + fut = self._loop.create_future() + fut.set_exception(ConnectionResetError('Connection lost')) + return fut + if not self._protocol._paused: + # fast path, the stream is not paused + # no need to wait for resume signal + return self._complete_fut + return self._loop.create_task(self.drain()) def write_eof(self): return self._transport.write_eof() @@ -377,6 +403,7 @@ class StreamWriter: def close(self): self._transport.close() + return self._protocol._get_close_waiter(self) def is_closing(self): return self._transport.is_closing() @@ -408,14 +435,6 @@ class StreamWriter: raise ConnectionResetError('Connection lost') await self._protocol._drain_helper() - async def aclose(self): - self.close() - await self.wait_closed() - - async def awrite(self, data): - self.write(data) - await self.drain() - class StreamReader: |