diff options
author | Yury Selivanov <yselivanov@sprymix.com> | 2014-02-05 23:11:13 (GMT) |
---|---|---|
committer | Yury Selivanov <yselivanov@sprymix.com> | 2014-02-05 23:11:13 (GMT) |
commit | e694c9745f2a9cbb46154e290c84c02d77066055 (patch) | |
tree | 9d8512b2f6f727605c1a7e2b8fc38135c4c9a183 /Lib/asyncio/streams.py | |
parent | 58af25e93008c5bd1468871a132435a5d09b6035 (diff) | |
download | cpython-e694c9745f2a9cbb46154e290c84c02d77066055.zip cpython-e694c9745f2a9cbb46154e290c84c02d77066055.tar.gz cpython-e694c9745f2a9cbb46154e290c84c02d77066055.tar.bz2 |
asyncio.streams: Use bytebuffer in StreamReader; Add assertion in feed_data
Diffstat (limited to 'Lib/asyncio/streams.py')
-rw-r--r-- | Lib/asyncio/streams.py | 73 |
1 files changed, 24 insertions, 49 deletions
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 06f052a..3da1d10 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -4,8 +4,6 @@ __all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol', 'open_connection', 'start_server', 'IncompleteReadError', ] -import collections - from . import events from . import futures from . import protocols @@ -259,9 +257,7 @@ class StreamReader: if loop is None: loop = events.get_event_loop() self._loop = loop - # TODO: Use a bytearray for a buffer, like the transport. - self._buffer = collections.deque() # Deque of bytes objects. - self._byte_count = 0 # Bytes in buffer. + self._buffer = bytearray() self._eof = False # Whether we're done. self._waiter = None # A future. self._exception = None @@ -285,7 +281,7 @@ class StreamReader: self._transport = transport def _maybe_resume_transport(self): - if self._paused and self._byte_count <= self._limit: + if self._paused and len(self._buffer) <= self._limit: self._paused = False self._transport.resume_reading() @@ -298,11 +294,12 @@ class StreamReader: waiter.set_result(True) def feed_data(self, data): + assert not self._eof, 'feed_data after feed_eof' + if not data: return - self._buffer.append(data) - self._byte_count += len(data) + self._buffer.extend(data) waiter = self._waiter if waiter is not None: @@ -312,7 +309,7 @@ class StreamReader: if (self._transport is not None and not self._paused and - self._byte_count > 2*self._limit): + len(self._buffer) > 2*self._limit): try: self._transport.pause_reading() except NotImplementedError: @@ -338,28 +335,22 @@ class StreamReader: if self._exception is not None: raise self._exception - parts = [] - parts_size = 0 + line = bytearray() not_enough = True while not_enough: while self._buffer and not_enough: - data = self._buffer.popleft() - ichar = data.find(b'\n') + ichar = self._buffer.find(b'\n') if ichar < 0: - parts.append(data) - parts_size += len(data) + line.extend(self._buffer) + self._buffer.clear() else: ichar += 1 - head, tail = data[:ichar], data[ichar:] - if tail: - self._buffer.appendleft(tail) + line.extend(self._buffer[:ichar]) + del self._buffer[:ichar] not_enough = False - parts.append(head) - parts_size += len(head) - if parts_size > self._limit: - self._byte_count -= parts_size + if len(line) > self._limit: self._maybe_resume_transport() raise ValueError('Line is too long') @@ -373,11 +364,8 @@ class StreamReader: finally: self._waiter = None - line = b''.join(parts) - self._byte_count -= parts_size self._maybe_resume_transport() - - return line + return bytes(line) @tasks.coroutine def read(self, n=-1): @@ -395,36 +383,23 @@ class StreamReader: finally: self._waiter = None else: - if not self._byte_count and not self._eof: + if not self._buffer and not self._eof: self._waiter = self._create_waiter('read') try: yield from self._waiter finally: self._waiter = None - if n < 0 or self._byte_count <= n: - data = b''.join(self._buffer) + if n < 0 or len(self._buffer) <= n: + data = bytes(self._buffer) self._buffer.clear() - self._byte_count = 0 - self._maybe_resume_transport() - return data - - parts = [] - parts_bytes = 0 - while self._buffer and parts_bytes < n: - data = self._buffer.popleft() - data_bytes = len(data) - if n < parts_bytes + data_bytes: - data_bytes = n - parts_bytes - data, rest = data[:data_bytes], data[data_bytes:] - self._buffer.appendleft(rest) - - parts.append(data) - parts_bytes += data_bytes - self._byte_count -= data_bytes - self._maybe_resume_transport() - - return b''.join(parts) + else: + # n > 0 and len(self._buffer) > n + data = bytes(self._buffer[:n]) + del self._buffer[:n] + + self._maybe_resume_transport() + return data @tasks.coroutine def readexactly(self, n): |