summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/streams.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio/streams.py')
-rw-r--r--Lib/asyncio/streams.py129
1 files changed, 72 insertions, 57 deletions
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 0008d51..a82cc79 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -14,13 +14,12 @@ if hasattr(socket, 'AF_UNIX'):
from . import coroutines
from . import compat
from . import events
-from . import futures
from . import protocols
from .coroutines import coroutine
from .log import logger
-_DEFAULT_LIMIT = 2**16
+_DEFAULT_LIMIT = 2 ** 16
class IncompleteReadError(EOFError):
@@ -38,15 +37,13 @@ class IncompleteReadError(EOFError):
class LimitOverrunError(Exception):
- """Reached buffer limit while looking for the separator.
+ """Reached the buffer limit while looking for a separator.
Attributes:
- - message: error message
- - consumed: total number of bytes that should be consumed
+ - consumed: total number of to be consumed bytes.
"""
def __init__(self, message, consumed):
super().__init__(message)
- self.message = message
self.consumed = consumed
@@ -132,7 +129,6 @@ if hasattr(socket, 'AF_UNIX'):
writer = StreamWriter(transport, protocol, reader, loop)
return reader, writer
-
@coroutine
def start_unix_server(client_connected_cb, path=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
@@ -210,7 +206,7 @@ class FlowControlMixin(protocols.Protocol):
return
waiter = self._drain_waiter
assert waiter is None or waiter.cancelled()
- waiter = futures.Future(loop=self._loop)
+ waiter = self._loop.create_future()
self._drain_waiter = waiter
yield from waiter
@@ -229,9 +225,11 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
self._stream_reader = stream_reader
self._stream_writer = None
self._client_connected_cb = client_connected_cb
+ self._over_ssl = False
def connection_made(self, transport):
self._stream_reader.set_transport(transport)
+ self._over_ssl = transport.get_extra_info('sslcontext') is not None
if self._client_connected_cb is not None:
self._stream_writer = StreamWriter(transport, self,
self._stream_reader,
@@ -242,17 +240,25 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
self._loop.create_task(res)
def connection_lost(self, exc):
- if exc is None:
- self._stream_reader.feed_eof()
- else:
- self._stream_reader.set_exception(exc)
+ if self._stream_reader is not None:
+ if exc is None:
+ self._stream_reader.feed_eof()
+ else:
+ self._stream_reader.set_exception(exc)
super().connection_lost(exc)
+ self._stream_reader = None
+ self._stream_writer = None
def data_received(self, data):
self._stream_reader.feed_data(data)
def eof_received(self):
self._stream_reader.feed_eof()
+ if self._over_ssl:
+ # Prevent a warning in SSLProtocol.eof_received:
+ # "returning true from eof_received()
+ # has no effect when using ssl"
+ return False
return True
@@ -413,8 +419,8 @@ class StreamReader:
self._wakeup_waiter()
if (self._transport is not None and
- not self._paused and
- len(self._buffer) > 2*self._limit):
+ not self._paused and
+ len(self._buffer) > 2 * self._limit):
try:
self._transport.pause_reading()
except NotImplementedError:
@@ -442,11 +448,12 @@ class StreamReader:
assert not self._eof, '_wait_for_data after EOF'
# Waiting for data while paused will make deadlock, so prevent it.
+ # This is essential for readexactly(n) for case when n > self._limit.
if self._paused:
self._paused = False
self._transport.resume_reading()
- self._waiter = futures.Future(loop=self._loop)
+ self._waiter = self._loop.create_future()
try:
yield from self._waiter
finally:
@@ -486,24 +493,24 @@ class StreamReader:
@coroutine
def readuntil(self, separator=b'\n'):
- """Read chunk of data from the stream until `separator` is found.
+ """Read data from the stream until ``separator`` is found.
- On success, chunk and its separator will be removed from internal buffer
- (i.e. consumed). Returned chunk will include separator at the end.
+ On success, the data and separator will be removed from the
+ internal buffer (consumed). Returned data will include the
+ separator at the end.
- Configured stream limit is used to check result. Limit means maximal
- length of chunk that can be returned, not counting the separator.
+ Configured stream limit is used to check result. Limit sets the
+ maximal length of data that can be returned, not counting the
+ separator.
- If EOF occurs and complete separator still not found,
- IncompleteReadError(<partial data>, None) will be raised and internal
- buffer becomes empty. This partial data may contain a partial separator.
+ If an EOF occurs and the complete separator is still not found,
+ an IncompleteReadError exception will be raised, and the internal
+ buffer will be reset. The IncompleteReadError.partial attribute
+ may contain the separator partially.
- If chunk cannot be read due to overlimit, LimitOverrunError will be raised
- and data will be left in internal buffer, so it can be read again, in
- some different way.
-
- If stream was paused, this function will automatically resume it if
- needed.
+ If the data cannot be read because of over limit, a
+ LimitOverrunError exception will be raised, and the data
+ will be left in the internal buffer, so it can be read again.
"""
seplen = len(separator)
if seplen == 0:
@@ -529,8 +536,8 @@ class StreamReader:
# performance problems. Even when reading MIME-encoded
# messages :)
- # `offset` is the number of bytes from the beginning of the buffer where
- # is no occurrence of `separator`.
+ # `offset` is the number of bytes from the beginning of the buffer
+ # where there is no occurrence of `separator`.
offset = 0
# Loop until we find `separator` in the buffer, exceed the buffer size,
@@ -544,14 +551,16 @@ class StreamReader:
isep = self._buffer.find(separator, offset)
if isep != -1:
- # `separator` is in the buffer. `isep` will be used later to
- # retrieve the data.
+ # `separator` is in the buffer. `isep` will be used later
+ # to retrieve the data.
break
# see upper comment for explanation.
offset = buflen + 1 - seplen
if offset > self._limit:
- raise LimitOverrunError('Separator is not found, and chunk exceed the limit', offset)
+ raise LimitOverrunError(
+ 'Separator is not found, and chunk exceed the limit',
+ offset)
# Complete message (with full separator) may be present in buffer
# even when EOF flag is set. This may happen when the last chunk
@@ -566,7 +575,8 @@ class StreamReader:
yield from self._wait_for_data('readuntil')
if isep > self._limit:
- raise LimitOverrunError('Separator is found, but chunk is longer than limit', isep)
+ raise LimitOverrunError(
+ 'Separator is found, but chunk is longer than limit', isep)
chunk = self._buffer[:isep + seplen]
del self._buffer[:isep + seplen]
@@ -581,14 +591,15 @@ class StreamReader:
bytes. If the EOF was received and the internal buffer is empty, return
an empty bytes object.
- If n is zero, return empty bytes object immediatelly.
+ If n is zero, return empty bytes object immediately.
If n is positive, this function try to read `n` bytes, and may return
less or equal bytes than requested, but at least one byte. If EOF was
received before any byte is read, this function returns empty byte
object.
- Returned value is not limited with limit, configured at stream creation.
+ Returned value is not limited with limit, configured at stream
+ creation.
If stream was paused, this function will automatically resume it if
needed.
@@ -627,13 +638,14 @@ class StreamReader:
def readexactly(self, n):
"""Read exactly `n` bytes.
- Raise an `IncompleteReadError` if EOF is reached before `n` bytes can be
- read. The `IncompleteReadError.partial` attribute of the exception will
+ Raise an IncompleteReadError if EOF is reached before `n` bytes can be
+ read. The IncompleteReadError.partial attribute of the exception will
contain the partial read bytes.
if n is zero, return empty bytes object.
- Returned value is not limited with limit, configured at stream creation.
+ Returned value is not limited with limit, configured at stream
+ creation.
If stream was paused, this function will automatically resume it if
needed.
@@ -647,25 +659,22 @@ class StreamReader:
if n == 0:
return b''
- # There used to be "optimized" code here. It created its own
- # Future and waited until self._buffer had at least the n
- # bytes, then called read(n). Unfortunately, this could pause
- # the transport if the argument was larger than the pause
- # limit (which is twice self._limit). So now we just read()
- # into a local buffer.
-
- blocks = []
- while n > 0:
- block = yield from self.read(n)
- if not block:
- partial = b''.join(blocks)
- raise IncompleteReadError(partial, len(partial) + n)
- blocks.append(block)
- n -= len(block)
+ while len(self._buffer) < n:
+ if self._eof:
+ incomplete = bytes(self._buffer)
+ self._buffer.clear()
+ raise IncompleteReadError(incomplete, n)
- assert n == 0
+ yield from self._wait_for_data('readexactly')
- return b''.join(blocks)
+ if len(self._buffer) == n:
+ data = bytes(self._buffer)
+ self._buffer.clear()
+ else:
+ data = bytes(self._buffer[:n])
+ del self._buffer[:n]
+ self._maybe_resume_transport()
+ return data
if compat.PY35:
@coroutine
@@ -678,3 +687,9 @@ class StreamReader:
if val == b'':
raise StopAsyncIteration
return val
+
+ if compat.PY352:
+ # In Python 3.5.2 and greater, __aiter__ should return
+ # the asynchronous iterator directly.
+ def __aiter__(self):
+ return self