diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2015-01-13 23:53:37 (GMT) |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2015-01-13 23:53:37 (GMT) |
commit | c2c12e433aa47149c692eef5e5debd7c475b04c7 (patch) | |
tree | 15e9b44d4598413e8a78f1e64b1dda0f8bbd6a9a /Lib | |
parent | 231b404cb026649d4b7172e75ac394ef558efe60 (diff) | |
download | cpython-c2c12e433aa47149c692eef5e5debd7c475b04c7.zip cpython-c2c12e433aa47149c692eef5e5debd7c475b04c7.tar.gz cpython-c2c12e433aa47149c692eef5e5debd7c475b04c7.tar.bz2 |
Issue #23198: Reactor asyncio.StreamReader
- Add a new _wakeup_waiter() method
- Replace _create_waiter() method with a _wait_for_data() coroutine function
- Use the value None instead of True or False to wake up the waiter
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/asyncio/streams.py | 47 |
1 files changed, 22 insertions, 25 deletions
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 5a96b24..7ff16a4 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -313,8 +313,8 @@ class StreamReader: else: self._loop = loop self._buffer = bytearray() - self._eof = False # Whether we're done. - self._waiter = None # A future. + self._eof = False # Whether we're done. + self._waiter = None # A future used by _wait_for_data() self._exception = None self._transport = None self._paused = False @@ -331,6 +331,14 @@ class StreamReader: if not waiter.cancelled(): waiter.set_exception(exc) + def _wakeup_waiter(self): + """Wakeup read() or readline() function waiting for data or EOF.""" + waiter = self._waiter + if waiter is not None: + self._waiter = None + if not waiter.cancelled(): + waiter.set_result(None) + def set_transport(self, transport): assert self._transport is None, 'Transport already set' self._transport = transport @@ -342,11 +350,7 @@ class StreamReader: def feed_eof(self): self._eof = True - waiter = self._waiter - if waiter is not None: - self._waiter = None - if not waiter.cancelled(): - waiter.set_result(True) + self._wakeup_waiter() def at_eof(self): """Return True if the buffer is empty and 'feed_eof' was called.""" @@ -359,12 +363,7 @@ class StreamReader: return self._buffer.extend(data) - - waiter = self._waiter - if waiter is not None: - self._waiter = None - if not waiter.cancelled(): - waiter.set_result(False) + self._wakeup_waiter() if (self._transport is not None and not self._paused and @@ -379,7 +378,8 @@ class StreamReader: else: self._paused = True - def _create_waiter(self, func_name): + def _wait_for_data(self, func_name): + """Wait until feed_data() or feed_eof() is called.""" # StreamReader uses a future to link the protocol feed_data() method # to a read coroutine. Running two read coroutines at the same time # would have an unexpected behaviour. It would not possible to know @@ -387,7 +387,12 @@ class StreamReader: if self._waiter is not None: raise RuntimeError('%s() called while another coroutine is ' 'already waiting for incoming data' % func_name) - return futures.Future(loop=self._loop) + + self._waiter = futures.Future(loop=self._loop) + try: + yield from self._waiter + finally: + self._waiter = None @coroutine def readline(self): @@ -417,11 +422,7 @@ class StreamReader: break if not_enough: - self._waiter = self._create_waiter('readline') - try: - yield from self._waiter - finally: - self._waiter = None + yield from self._wait_for_data('readline') self._maybe_resume_transport() return bytes(line) @@ -448,11 +449,7 @@ class StreamReader: return b''.join(blocks) else: if not self._buffer and not self._eof: - self._waiter = self._create_waiter('read') - try: - yield from self._waiter - finally: - self._waiter = None + yield from self._wait_for_data('read') if n < 0 or len(self._buffer) <= n: data = bytes(self._buffer) |