diff options
Diffstat (limited to 'Lib/asyncio/streams.py')
-rw-r--r-- | Lib/asyncio/streams.py | 129 |
1 files changed, 72 insertions, 57 deletions
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 0008d51..a82cc79 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -14,13 +14,12 @@ if hasattr(socket, 'AF_UNIX'): from . import coroutines from . import compat from . import events -from . import futures from . import protocols from .coroutines import coroutine from .log import logger -_DEFAULT_LIMIT = 2**16 +_DEFAULT_LIMIT = 2 ** 16 class IncompleteReadError(EOFError): @@ -38,15 +37,13 @@ class IncompleteReadError(EOFError): class LimitOverrunError(Exception): - """Reached buffer limit while looking for the separator. + """Reached the buffer limit while looking for a separator. Attributes: - - message: error message - - consumed: total number of bytes that should be consumed + - consumed: total number of to be consumed bytes. """ def __init__(self, message, consumed): super().__init__(message) - self.message = message self.consumed = consumed @@ -132,7 +129,6 @@ if hasattr(socket, 'AF_UNIX'): writer = StreamWriter(transport, protocol, reader, loop) return reader, writer - @coroutine def start_unix_server(client_connected_cb, path=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): @@ -210,7 +206,7 @@ class FlowControlMixin(protocols.Protocol): return waiter = self._drain_waiter assert waiter is None or waiter.cancelled() - waiter = futures.Future(loop=self._loop) + waiter = self._loop.create_future() self._drain_waiter = waiter yield from waiter @@ -229,9 +225,11 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): self._stream_reader = stream_reader self._stream_writer = None self._client_connected_cb = client_connected_cb + self._over_ssl = False def connection_made(self, transport): self._stream_reader.set_transport(transport) + self._over_ssl = transport.get_extra_info('sslcontext') is not None if self._client_connected_cb is not None: self._stream_writer = StreamWriter(transport, self, self._stream_reader, @@ -242,17 +240,25 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): self._loop.create_task(res) def connection_lost(self, exc): - if exc is None: - self._stream_reader.feed_eof() - else: - self._stream_reader.set_exception(exc) + if self._stream_reader is not None: + if exc is None: + self._stream_reader.feed_eof() + else: + self._stream_reader.set_exception(exc) super().connection_lost(exc) + self._stream_reader = None + self._stream_writer = None def data_received(self, data): self._stream_reader.feed_data(data) def eof_received(self): self._stream_reader.feed_eof() + if self._over_ssl: + # Prevent a warning in SSLProtocol.eof_received: + # "returning true from eof_received() + # has no effect when using ssl" + return False return True @@ -413,8 +419,8 @@ class StreamReader: self._wakeup_waiter() if (self._transport is not None and - not self._paused and - len(self._buffer) > 2*self._limit): + not self._paused and + len(self._buffer) > 2 * self._limit): try: self._transport.pause_reading() except NotImplementedError: @@ -442,11 +448,12 @@ class StreamReader: assert not self._eof, '_wait_for_data after EOF' # Waiting for data while paused will make deadlock, so prevent it. + # This is essential for readexactly(n) for case when n > self._limit. if self._paused: self._paused = False self._transport.resume_reading() - self._waiter = futures.Future(loop=self._loop) + self._waiter = self._loop.create_future() try: yield from self._waiter finally: @@ -486,24 +493,24 @@ class StreamReader: @coroutine def readuntil(self, separator=b'\n'): - """Read chunk of data from the stream until `separator` is found. + """Read data from the stream until ``separator`` is found. - On success, chunk and its separator will be removed from internal buffer - (i.e. consumed). Returned chunk will include separator at the end. + On success, the data and separator will be removed from the + internal buffer (consumed). Returned data will include the + separator at the end. - Configured stream limit is used to check result. Limit means maximal - length of chunk that can be returned, not counting the separator. + Configured stream limit is used to check result. Limit sets the + maximal length of data that can be returned, not counting the + separator. - If EOF occurs and complete separator still not found, - IncompleteReadError(<partial data>, None) will be raised and internal - buffer becomes empty. This partial data may contain a partial separator. + If an EOF occurs and the complete separator is still not found, + an IncompleteReadError exception will be raised, and the internal + buffer will be reset. The IncompleteReadError.partial attribute + may contain the separator partially. - If chunk cannot be read due to overlimit, LimitOverrunError will be raised - and data will be left in internal buffer, so it can be read again, in - some different way. - - If stream was paused, this function will automatically resume it if - needed. + If the data cannot be read because of over limit, a + LimitOverrunError exception will be raised, and the data + will be left in the internal buffer, so it can be read again. """ seplen = len(separator) if seplen == 0: @@ -529,8 +536,8 @@ class StreamReader: # performance problems. Even when reading MIME-encoded # messages :) - # `offset` is the number of bytes from the beginning of the buffer where - # is no occurrence of `separator`. + # `offset` is the number of bytes from the beginning of the buffer + # where there is no occurrence of `separator`. offset = 0 # Loop until we find `separator` in the buffer, exceed the buffer size, @@ -544,14 +551,16 @@ class StreamReader: isep = self._buffer.find(separator, offset) if isep != -1: - # `separator` is in the buffer. `isep` will be used later to - # retrieve the data. + # `separator` is in the buffer. `isep` will be used later + # to retrieve the data. break # see upper comment for explanation. offset = buflen + 1 - seplen if offset > self._limit: - raise LimitOverrunError('Separator is not found, and chunk exceed the limit', offset) + raise LimitOverrunError( + 'Separator is not found, and chunk exceed the limit', + offset) # Complete message (with full separator) may be present in buffer # even when EOF flag is set. This may happen when the last chunk @@ -566,7 +575,8 @@ class StreamReader: yield from self._wait_for_data('readuntil') if isep > self._limit: - raise LimitOverrunError('Separator is found, but chunk is longer than limit', isep) + raise LimitOverrunError( + 'Separator is found, but chunk is longer than limit', isep) chunk = self._buffer[:isep + seplen] del self._buffer[:isep + seplen] @@ -581,14 +591,15 @@ class StreamReader: bytes. If the EOF was received and the internal buffer is empty, return an empty bytes object. - If n is zero, return empty bytes object immediatelly. + If n is zero, return empty bytes object immediately. If n is positive, this function try to read `n` bytes, and may return less or equal bytes than requested, but at least one byte. If EOF was received before any byte is read, this function returns empty byte object. - Returned value is not limited with limit, configured at stream creation. + Returned value is not limited with limit, configured at stream + creation. If stream was paused, this function will automatically resume it if needed. @@ -627,13 +638,14 @@ class StreamReader: def readexactly(self, n): """Read exactly `n` bytes. - Raise an `IncompleteReadError` if EOF is reached before `n` bytes can be - read. The `IncompleteReadError.partial` attribute of the exception will + Raise an IncompleteReadError if EOF is reached before `n` bytes can be + read. The IncompleteReadError.partial attribute of the exception will contain the partial read bytes. if n is zero, return empty bytes object. - Returned value is not limited with limit, configured at stream creation. + Returned value is not limited with limit, configured at stream + creation. If stream was paused, this function will automatically resume it if needed. @@ -647,25 +659,22 @@ class StreamReader: if n == 0: return b'' - # There used to be "optimized" code here. It created its own - # Future and waited until self._buffer had at least the n - # bytes, then called read(n). Unfortunately, this could pause - # the transport if the argument was larger than the pause - # limit (which is twice self._limit). So now we just read() - # into a local buffer. - - blocks = [] - while n > 0: - block = yield from self.read(n) - if not block: - partial = b''.join(blocks) - raise IncompleteReadError(partial, len(partial) + n) - blocks.append(block) - n -= len(block) + while len(self._buffer) < n: + if self._eof: + incomplete = bytes(self._buffer) + self._buffer.clear() + raise IncompleteReadError(incomplete, n) - assert n == 0 + yield from self._wait_for_data('readexactly') - return b''.join(blocks) + if len(self._buffer) == n: + data = bytes(self._buffer) + self._buffer.clear() + else: + data = bytes(self._buffer[:n]) + del self._buffer[:n] + self._maybe_resume_transport() + return data if compat.PY35: @coroutine @@ -678,3 +687,9 @@ class StreamReader: if val == b'': raise StopAsyncIteration return val + + if compat.PY352: + # In Python 3.5.2 and greater, __aiter__ should return + # the asynchronous iterator directly. + def __aiter__(self): + return self |