summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/streams.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio/streams.py')
-rw-r--r--Lib/asyncio/streams.py225
1 files changed, 187 insertions, 38 deletions
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 9097e38..0008d51 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -3,6 +3,7 @@
__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
'open_connection', 'start_server',
'IncompleteReadError',
+ 'LimitOverrunError',
]
import socket
@@ -27,15 +28,28 @@ class IncompleteReadError(EOFError):
Incomplete read error. Attributes:
- partial: read bytes string before the end of stream was reached
- - expected: total number of expected bytes
+ - expected: total number of expected bytes (or None if unknown)
"""
def __init__(self, partial, expected):
- EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
- % (len(partial), expected))
+ super().__init__("%d bytes read on a total of %r expected bytes"
+ % (len(partial), expected))
self.partial = partial
self.expected = expected
+class LimitOverrunError(Exception):
+ """Reached buffer limit while looking for the separator.
+
+ Attributes:
+ - message: error message
+ - consumed: total number of bytes that should be consumed
+ """
+ def __init__(self, message, consumed):
+ super().__init__(message)
+ self.message = message
+ self.consumed = consumed
+
+
@coroutine
def open_connection(host=None, port=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
@@ -318,6 +332,10 @@ class StreamReader:
def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
# The line length limit is a security feature;
# it also doubles as half the buffer limit.
+
+ if limit <= 0:
+ raise ValueError('Limit cannot be <= 0')
+
self._limit = limit
if loop is None:
self._loop = events.get_event_loop()
@@ -361,7 +379,7 @@ class StreamReader:
waiter.set_exception(exc)
def _wakeup_waiter(self):
- """Wakeup read() or readline() function waiting for data or EOF."""
+ """Wakeup read*() functions waiting for data or EOF."""
waiter = self._waiter
if waiter is not None:
self._waiter = None
@@ -409,7 +427,10 @@ class StreamReader:
@coroutine
def _wait_for_data(self, func_name):
- """Wait until feed_data() or feed_eof() is called."""
+ """Wait until feed_data() or feed_eof() is called.
+
+ If stream was paused, automatically resume it.
+ """
# StreamReader uses a future to link the protocol feed_data() method
# to a read coroutine. Running two read coroutines at the same time
# would have an unexpected behaviour. It would not possible to know
@@ -418,6 +439,13 @@ class StreamReader:
raise RuntimeError('%s() called while another coroutine is '
'already waiting for incoming data' % func_name)
+ assert not self._eof, '_wait_for_data after EOF'
+
+ # Waiting for data while paused will make deadlock, so prevent it.
+ if self._paused:
+ self._paused = False
+ self._transport.resume_reading()
+
self._waiter = futures.Future(loop=self._loop)
try:
yield from self._waiter
@@ -426,43 +454,150 @@ class StreamReader:
@coroutine
def readline(self):
+ """Read chunk of data from the stream until newline (b'\n') is found.
+
+ On success, return chunk that ends with newline. If only partial
+ line can be read due to EOF, return incomplete line without
+ terminating newline. When EOF was reached while no bytes read, empty
+ bytes object is returned.
+
+ If limit is reached, ValueError will be raised. In that case, if
+ newline was found, complete line including newline will be removed
+ from internal buffer. Else, internal buffer will be cleared. Limit is
+ compared against part of the line without newline.
+
+ If stream was paused, this function will automatically resume it if
+ needed.
+ """
+ sep = b'\n'
+ seplen = len(sep)
+ try:
+ line = yield from self.readuntil(sep)
+ except IncompleteReadError as e:
+ return e.partial
+ except LimitOverrunError as e:
+ if self._buffer.startswith(sep, e.consumed):
+ del self._buffer[:e.consumed + seplen]
+ else:
+ self._buffer.clear()
+ self._maybe_resume_transport()
+ raise ValueError(e.args[0])
+ return line
+
+ @coroutine
+ def readuntil(self, separator=b'\n'):
+ """Read chunk of data from the stream until `separator` is found.
+
+ On success, chunk and its separator will be removed from internal buffer
+ (i.e. consumed). Returned chunk will include separator at the end.
+
+ Configured stream limit is used to check result. Limit means maximal
+ length of chunk that can be returned, not counting the separator.
+
+ If EOF occurs and complete separator still not found,
+ IncompleteReadError(<partial data>, None) will be raised and internal
+ buffer becomes empty. This partial data may contain a partial separator.
+
+ If chunk cannot be read due to overlimit, LimitOverrunError will be raised
+ and data will be left in internal buffer, so it can be read again, in
+ some different way.
+
+ If stream was paused, this function will automatically resume it if
+ needed.
+ """
+ seplen = len(separator)
+ if seplen == 0:
+ raise ValueError('Separator should be at least one-byte string')
+
if self._exception is not None:
raise self._exception
- line = bytearray()
- not_enough = True
-
- while not_enough:
- while self._buffer and not_enough:
- ichar = self._buffer.find(b'\n')
- if ichar < 0:
- line.extend(self._buffer)
- self._buffer.clear()
- else:
- ichar += 1
- line.extend(self._buffer[:ichar])
- del self._buffer[:ichar]
- not_enough = False
-
- if len(line) > self._limit:
- self._maybe_resume_transport()
- raise ValueError('Line is too long')
+ # Consume whole buffer except last bytes, which length is
+ # one less than seplen. Let's check corner cases with
+ # separator='SEPARATOR':
+ # * we have received almost complete separator (without last
+ # byte). i.e buffer='some textSEPARATO'. In this case we
+ # can safely consume len(separator) - 1 bytes.
+ # * last byte of buffer is first byte of separator, i.e.
+ # buffer='abcdefghijklmnopqrS'. We may safely consume
+ # everything except that last byte, but this require to
+ # analyze bytes of buffer that match partial separator.
+ # This is slow and/or require FSM. For this case our
+ # implementation is not optimal, since require rescanning
+ # of data that is known to not belong to separator. In
+ # real world, separator will not be so long to notice
+ # performance problems. Even when reading MIME-encoded
+ # messages :)
+
+ # `offset` is the number of bytes from the beginning of the buffer where
+ # is no occurrence of `separator`.
+ offset = 0
+
+ # Loop until we find `separator` in the buffer, exceed the buffer size,
+ # or an EOF has happened.
+ while True:
+ buflen = len(self._buffer)
+
+ # Check if we now have enough data in the buffer for `separator` to
+ # fit.
+ if buflen - offset >= seplen:
+ isep = self._buffer.find(separator, offset)
+
+ if isep != -1:
+ # `separator` is in the buffer. `isep` will be used later to
+ # retrieve the data.
+ break
+
+ # see upper comment for explanation.
+ offset = buflen + 1 - seplen
+ if offset > self._limit:
+ raise LimitOverrunError('Separator is not found, and chunk exceed the limit', offset)
+ # Complete message (with full separator) may be present in buffer
+ # even when EOF flag is set. This may happen when the last chunk
+ # adds data which makes separator be found. That's why we check for
+ # EOF *ater* inspecting the buffer.
if self._eof:
- break
+ chunk = bytes(self._buffer)
+ self._buffer.clear()
+ raise IncompleteReadError(chunk, None)
+
+ # _wait_for_data() will resume reading if stream was paused.
+ yield from self._wait_for_data('readuntil')
- if not_enough:
- yield from self._wait_for_data('readline')
+ if isep > self._limit:
+ raise LimitOverrunError('Separator is found, but chunk is longer than limit', isep)
+ chunk = self._buffer[:isep + seplen]
+ del self._buffer[:isep + seplen]
self._maybe_resume_transport()
- return bytes(line)
+ return bytes(chunk)
@coroutine
def read(self, n=-1):
+ """Read up to `n` bytes from the stream.
+
+ If n is not provided, or set to -1, read until EOF and return all read
+ bytes. If the EOF was received and the internal buffer is empty, return
+ an empty bytes object.
+
+ If n is zero, return empty bytes object immediatelly.
+
+ If n is positive, this function try to read `n` bytes, and may return
+ less or equal bytes than requested, but at least one byte. If EOF was
+ received before any byte is read, this function returns empty byte
+ object.
+
+ Returned value is not limited with limit, configured at stream creation.
+
+ If stream was paused, this function will automatically resume it if
+ needed.
+ """
+
if self._exception is not None:
raise self._exception
- if not n:
+ if n == 0:
return b''
if n < 0:
@@ -477,29 +612,41 @@ class StreamReader:
break
blocks.append(block)
return b''.join(blocks)
- else:
- if not self._buffer and not self._eof:
- yield from self._wait_for_data('read')
- if n < 0 or len(self._buffer) <= n:
- data = bytes(self._buffer)
- self._buffer.clear()
- else:
- # n > 0 and len(self._buffer) > n
- data = bytes(self._buffer[:n])
- del self._buffer[:n]
+ if not self._buffer and not self._eof:
+ yield from self._wait_for_data('read')
+
+ # This will work right even if buffer is less than n bytes
+ data = bytes(self._buffer[:n])
+ del self._buffer[:n]
self._maybe_resume_transport()
return data
@coroutine
def readexactly(self, n):
+ """Read exactly `n` bytes.
+
+ Raise an `IncompleteReadError` if EOF is reached before `n` bytes can be
+ read. The `IncompleteReadError.partial` attribute of the exception will
+ contain the partial read bytes.
+
+ if n is zero, return empty bytes object.
+
+ Returned value is not limited with limit, configured at stream creation.
+
+ If stream was paused, this function will automatically resume it if
+ needed.
+ """
if n < 0:
raise ValueError('readexactly size can not be less than zero')
if self._exception is not None:
raise self._exception
+ if n == 0:
+ return b''
+
# There used to be "optimized" code here. It created its own
# Future and waited until self._buffer had at least the n
# bytes, then called read(n). Unfortunately, this could pause
@@ -516,6 +663,8 @@ class StreamReader:
blocks.append(block)
n -= len(block)
+ assert n == 0
+
return b''.join(blocks)
if compat.PY35: