diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-07-22 10:03:40 (GMT) |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-07-22 10:03:40 (GMT) |
commit | 31e7bfa6ba5354ba44677736a23facb8463077a9 (patch) | |
tree | 328b5c21b47753e1b62916d5f9545cea012eb17e /Lib | |
parent | 1392df96efdccc7ed369fe0ab8e55bf4e9c9e0c4 (diff) | |
download | cpython-31e7bfa6ba5354ba44677736a23facb8463077a9.zip cpython-31e7bfa6ba5354ba44677736a23facb8463077a9.tar.gz cpython-31e7bfa6ba5354ba44677736a23facb8463077a9.tar.bz2 |
asyncio, tulip issue 193: Convert StreamWriter.drain() to a classic coroutine
Replace also _make_drain_waiter() function with a classic _drain_helper()
coroutine.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/asyncio/streams.py | 37 |
1 files changed, 18 insertions, 19 deletions
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index d18db77..c77eb60 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -141,15 +141,14 @@ class FlowControlMixin(protocols.Protocol): resume_reading() and connection_lost(). If the subclass overrides these it must call the super methods. - StreamWriter.drain() must check for error conditions and then call - _make_drain_waiter(), which will return either () or a Future - depending on the paused state. + StreamWriter.drain() must wait for _drain_helper() coroutine. """ def __init__(self, loop=None): self._loop = loop # May be None; we may never need it. self._paused = False self._drain_waiter = None + self._connection_lost = False def pause_writing(self): assert not self._paused @@ -170,6 +169,7 @@ class FlowControlMixin(protocols.Protocol): waiter.set_result(None) def connection_lost(self, exc): + self._connection_lost = True # Wake up the writer if currently paused. if not self._paused: return @@ -184,14 +184,17 @@ class FlowControlMixin(protocols.Protocol): else: waiter.set_exception(exc) - def _make_drain_waiter(self): + @coroutine + def _drain_helper(self): + if self._connection_lost: + raise ConnectionResetError('Connection lost') if not self._paused: - return () + return waiter = self._drain_waiter assert waiter is None or waiter.cancelled() waiter = futures.Future(loop=self._loop) self._drain_waiter = waiter - return waiter + yield from waiter class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): @@ -247,6 +250,8 @@ class StreamWriter: def __init__(self, transport, protocol, reader, loop): self._transport = transport self._protocol = protocol + # drain() expects that the reader has a exception() method + assert reader is None or isinstance(reader, StreamReader) self._reader = reader self._loop = loop @@ -278,26 +283,20 @@ class StreamWriter: def get_extra_info(self, name, default=None): return self._transport.get_extra_info(name, default) + @coroutine def drain(self): - """This method has an unusual return value. + """Flush the write buffer. The intended use is to write w.write(data) yield from w.drain() - - When there's nothing to wait for, drain() returns (), and the - yield-from continues immediately. When the transport buffer - is full (the protocol is paused), drain() creates and returns - a Future and the yield-from will block until that Future is - completed, which will happen when the buffer is (partially) - drained and the protocol is resumed. """ - if self._reader is not None and self._reader._exception is not None: - raise self._reader._exception - if self._transport._conn_lost: # Uses private variable. - raise ConnectionResetError('Connection lost') - return self._protocol._make_drain_waiter() + if self._reader is not None: + exc = self._reader.exception() + if exc is not None: + raise exc + yield from self._protocol._drain_helper() class StreamReader: |