summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2015-01-13 23:53:37 (GMT)
committerVictor Stinner <victor.stinner@gmail.com>2015-01-13 23:53:37 (GMT)
commitc2c12e433aa47149c692eef5e5debd7c475b04c7 (patch)
tree15e9b44d4598413e8a78f1e64b1dda0f8bbd6a9a /Lib
parent231b404cb026649d4b7172e75ac394ef558efe60 (diff)
downloadcpython-c2c12e433aa47149c692eef5e5debd7c475b04c7.zip
cpython-c2c12e433aa47149c692eef5e5debd7c475b04c7.tar.gz
cpython-c2c12e433aa47149c692eef5e5debd7c475b04c7.tar.bz2
Issue #23198: Reactor asyncio.StreamReader
- Add a new _wakeup_waiter() method - Replace _create_waiter() method with a _wait_for_data() coroutine function - Use the value None instead of True or False to wake up the waiter
Diffstat (limited to 'Lib')
-rw-r--r--Lib/asyncio/streams.py47
1 files changed, 22 insertions, 25 deletions
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 5a96b24..7ff16a4 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -313,8 +313,8 @@ class StreamReader:
else:
self._loop = loop
self._buffer = bytearray()
- self._eof = False # Whether we're done.
- self._waiter = None # A future.
+ self._eof = False # Whether we're done.
+ self._waiter = None # A future used by _wait_for_data()
self._exception = None
self._transport = None
self._paused = False
@@ -331,6 +331,14 @@ class StreamReader:
if not waiter.cancelled():
waiter.set_exception(exc)
+ def _wakeup_waiter(self):
+ """Wakeup read() or readline() function waiting for data or EOF."""
+ waiter = self._waiter
+ if waiter is not None:
+ self._waiter = None
+ if not waiter.cancelled():
+ waiter.set_result(None)
+
def set_transport(self, transport):
assert self._transport is None, 'Transport already set'
self._transport = transport
@@ -342,11 +350,7 @@ class StreamReader:
def feed_eof(self):
self._eof = True
- waiter = self._waiter
- if waiter is not None:
- self._waiter = None
- if not waiter.cancelled():
- waiter.set_result(True)
+ self._wakeup_waiter()
def at_eof(self):
"""Return True if the buffer is empty and 'feed_eof' was called."""
@@ -359,12 +363,7 @@ class StreamReader:
return
self._buffer.extend(data)
-
- waiter = self._waiter
- if waiter is not None:
- self._waiter = None
- if not waiter.cancelled():
- waiter.set_result(False)
+ self._wakeup_waiter()
if (self._transport is not None and
not self._paused and
@@ -379,7 +378,8 @@ class StreamReader:
else:
self._paused = True
- def _create_waiter(self, func_name):
+ def _wait_for_data(self, func_name):
+ """Wait until feed_data() or feed_eof() is called."""
# StreamReader uses a future to link the protocol feed_data() method
# to a read coroutine. Running two read coroutines at the same time
# would have an unexpected behaviour. It would not possible to know
@@ -387,7 +387,12 @@ class StreamReader:
if self._waiter is not None:
raise RuntimeError('%s() called while another coroutine is '
'already waiting for incoming data' % func_name)
- return futures.Future(loop=self._loop)
+
+ self._waiter = futures.Future(loop=self._loop)
+ try:
+ yield from self._waiter
+ finally:
+ self._waiter = None
@coroutine
def readline(self):
@@ -417,11 +422,7 @@ class StreamReader:
break
if not_enough:
- self._waiter = self._create_waiter('readline')
- try:
- yield from self._waiter
- finally:
- self._waiter = None
+ yield from self._wait_for_data('readline')
self._maybe_resume_transport()
return bytes(line)
@@ -448,11 +449,7 @@ class StreamReader:
return b''.join(blocks)
else:
if not self._buffer and not self._eof:
- self._waiter = self._create_waiter('read')
- try:
- yield from self._waiter
- finally:
- self._waiter = None
+ yield from self._wait_for_data('read')
if n < 0 or len(self._buffer) <= n:
data = bytes(self._buffer)