summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/streams.py
diff options
context:
space:
mode:
authorYury Selivanov <yselivanov@sprymix.com>2014-02-05 23:11:13 (GMT)
committerYury Selivanov <yselivanov@sprymix.com>2014-02-05 23:11:13 (GMT)
commite694c9745f2a9cbb46154e290c84c02d77066055 (patch)
tree9d8512b2f6f727605c1a7e2b8fc38135c4c9a183 /Lib/asyncio/streams.py
parent58af25e93008c5bd1468871a132435a5d09b6035 (diff)
downloadcpython-e694c9745f2a9cbb46154e290c84c02d77066055.zip
cpython-e694c9745f2a9cbb46154e290c84c02d77066055.tar.gz
cpython-e694c9745f2a9cbb46154e290c84c02d77066055.tar.bz2
asyncio.streams: Use bytebuffer in StreamReader; Add assertion in feed_data
Diffstat (limited to 'Lib/asyncio/streams.py')
-rw-r--r--Lib/asyncio/streams.py73
1 files changed, 24 insertions, 49 deletions
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 06f052a..3da1d10 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -4,8 +4,6 @@ __all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
'open_connection', 'start_server', 'IncompleteReadError',
]
-import collections
-
from . import events
from . import futures
from . import protocols
@@ -259,9 +257,7 @@ class StreamReader:
if loop is None:
loop = events.get_event_loop()
self._loop = loop
- # TODO: Use a bytearray for a buffer, like the transport.
- self._buffer = collections.deque() # Deque of bytes objects.
- self._byte_count = 0 # Bytes in buffer.
+ self._buffer = bytearray()
self._eof = False # Whether we're done.
self._waiter = None # A future.
self._exception = None
@@ -285,7 +281,7 @@ class StreamReader:
self._transport = transport
def _maybe_resume_transport(self):
- if self._paused and self._byte_count <= self._limit:
+ if self._paused and len(self._buffer) <= self._limit:
self._paused = False
self._transport.resume_reading()
@@ -298,11 +294,12 @@ class StreamReader:
waiter.set_result(True)
def feed_data(self, data):
+ assert not self._eof, 'feed_data after feed_eof'
+
if not data:
return
- self._buffer.append(data)
- self._byte_count += len(data)
+ self._buffer.extend(data)
waiter = self._waiter
if waiter is not None:
@@ -312,7 +309,7 @@ class StreamReader:
if (self._transport is not None and
not self._paused and
- self._byte_count > 2*self._limit):
+ len(self._buffer) > 2*self._limit):
try:
self._transport.pause_reading()
except NotImplementedError:
@@ -338,28 +335,22 @@ class StreamReader:
if self._exception is not None:
raise self._exception
- parts = []
- parts_size = 0
+ line = bytearray()
not_enough = True
while not_enough:
while self._buffer and not_enough:
- data = self._buffer.popleft()
- ichar = data.find(b'\n')
+ ichar = self._buffer.find(b'\n')
if ichar < 0:
- parts.append(data)
- parts_size += len(data)
+ line.extend(self._buffer)
+ self._buffer.clear()
else:
ichar += 1
- head, tail = data[:ichar], data[ichar:]
- if tail:
- self._buffer.appendleft(tail)
+ line.extend(self._buffer[:ichar])
+ del self._buffer[:ichar]
not_enough = False
- parts.append(head)
- parts_size += len(head)
- if parts_size > self._limit:
- self._byte_count -= parts_size
+ if len(line) > self._limit:
self._maybe_resume_transport()
raise ValueError('Line is too long')
@@ -373,11 +364,8 @@ class StreamReader:
finally:
self._waiter = None
- line = b''.join(parts)
- self._byte_count -= parts_size
self._maybe_resume_transport()
-
- return line
+ return bytes(line)
@tasks.coroutine
def read(self, n=-1):
@@ -395,36 +383,23 @@ class StreamReader:
finally:
self._waiter = None
else:
- if not self._byte_count and not self._eof:
+ if not self._buffer and not self._eof:
self._waiter = self._create_waiter('read')
try:
yield from self._waiter
finally:
self._waiter = None
- if n < 0 or self._byte_count <= n:
- data = b''.join(self._buffer)
+ if n < 0 or len(self._buffer) <= n:
+ data = bytes(self._buffer)
self._buffer.clear()
- self._byte_count = 0
- self._maybe_resume_transport()
- return data
-
- parts = []
- parts_bytes = 0
- while self._buffer and parts_bytes < n:
- data = self._buffer.popleft()
- data_bytes = len(data)
- if n < parts_bytes + data_bytes:
- data_bytes = n - parts_bytes
- data, rest = data[:data_bytes], data[data_bytes:]
- self._buffer.appendleft(rest)
-
- parts.append(data)
- parts_bytes += data_bytes
- self._byte_count -= data_bytes
- self._maybe_resume_transport()
-
- return b''.join(parts)
+ else:
+ # n > 0 and len(self._buffer) > n
+ data = bytes(self._buffer[:n])
+ del self._buffer[:n]
+
+ self._maybe_resume_transport()
+ return data
@tasks.coroutine
def readexactly(self, n):