diff options
author | Andrew Svetlov <andrew.svetlov@gmail.com> | 2017-12-08 22:23:48 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-08 22:23:48 (GMT) |
commit | 5f841b553814969220b096a2b4f959b7f6fcbaf6 (patch) | |
tree | b48ea916d9585efa9bf7ff370b50c4e2dfb30247 /Lib/asyncio/streams.py | |
parent | ede157331b4f9e550334900b3b4de1c8590688de (diff) | |
download | cpython-5f841b553814969220b096a2b4f959b7f6fcbaf6.zip cpython-5f841b553814969220b096a2b4f959b7f6fcbaf6.tar.gz cpython-5f841b553814969220b096a2b4f959b7f6fcbaf6.tar.bz2 |
bpo-32193: Convert asyncio to async/await usage (#4753)
* Convert asyncio/tasks.py to async/await
* Convert asyncio/queues.py to async/await
* Convert asyncio/test_utils.py to async/await
* Convert asyncio/base_subprocess.py to async/await
* Convert asyncio/subprocess.py to async/await
* Convert asyncio/streams.py to async/await
* Fix comments
* Convert asyncio/locks.py to async/await
* Convert asyncio.sleep to async def
* Add a comment
* Add missing news
* Convert stubs from AbstrctEventLoop to async functions
* Convert subprocess_shell/subprocess_exec
* Convert connect_read_pipe/connect_write_pip to async/await syntax
* Convert create_datagram_endpoint
* Convert create_unix_server/create_unix_connection
* Get rid of old style coroutines in unix_events.py
* Convert selector_events.py to async/await
* Convert wait_closed and create_connection
* Drop redundant line
* Convert base_events.py
* Code cleanup
* Drop redundant comments
* Fix indentation
* Add explicit tests for compatibility between old and new coroutines
* Convert windows event loop to use async/await
* Fix double awaiting of async function
* Convert asyncio/locks.py
* Improve docstring
* Convert tests to async/await
* Convert more tests
* Convert more tests
* Convert more tests
* Convert tests
* Improve test
Diffstat (limited to 'Lib/asyncio/streams.py')
-rw-r--r-- | Lib/asyncio/streams.py | 78 |
1 files changed, 33 insertions, 45 deletions
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 15c9513..baa9ec9 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -14,8 +14,8 @@ if hasattr(socket, 'AF_UNIX'): from . import coroutines from . import events from . import protocols -from .coroutines import coroutine from .log import logger +from .tasks import sleep _DEFAULT_LIMIT = 2 ** 16 @@ -52,9 +52,8 @@ class LimitOverrunError(Exception): return type(self), (self.args[0], self.consumed) -@coroutine -def open_connection(host=None, port=None, *, - loop=None, limit=_DEFAULT_LIMIT, **kwds): +async def open_connection(host=None, port=None, *, + loop=None, limit=_DEFAULT_LIMIT, **kwds): """A wrapper for create_connection() returning a (reader, writer) pair. The reader returned is a StreamReader instance; the writer is a @@ -76,15 +75,14 @@ def open_connection(host=None, port=None, *, loop = events.get_event_loop() reader = StreamReader(limit=limit, loop=loop) protocol = StreamReaderProtocol(reader, loop=loop) - transport, _ = yield from loop.create_connection( + transport, _ = await loop.create_connection( lambda: protocol, host, port, **kwds) writer = StreamWriter(transport, protocol, reader, loop) return reader, writer -@coroutine -def start_server(client_connected_cb, host=None, port=None, *, - loop=None, limit=_DEFAULT_LIMIT, **kwds): +async def start_server(client_connected_cb, host=None, port=None, *, + loop=None, limit=_DEFAULT_LIMIT, **kwds): """Start a socket server, call back for each client connected. The first parameter, `client_connected_cb`, takes two parameters: @@ -115,28 +113,26 @@ def start_server(client_connected_cb, host=None, port=None, *, loop=loop) return protocol - return (yield from loop.create_server(factory, host, port, **kwds)) + return await loop.create_server(factory, host, port, **kwds) if hasattr(socket, 'AF_UNIX'): # UNIX Domain Sockets are supported on this platform - @coroutine - def open_unix_connection(path=None, *, - loop=None, limit=_DEFAULT_LIMIT, **kwds): + async def open_unix_connection(path=None, *, + loop=None, limit=_DEFAULT_LIMIT, **kwds): """Similar to `open_connection` but works with UNIX Domain Sockets.""" if loop is None: loop = events.get_event_loop() reader = StreamReader(limit=limit, loop=loop) protocol = StreamReaderProtocol(reader, loop=loop) - transport, _ = yield from loop.create_unix_connection( + transport, _ = await loop.create_unix_connection( lambda: protocol, path, **kwds) writer = StreamWriter(transport, protocol, reader, loop) return reader, writer - @coroutine - def start_unix_server(client_connected_cb, path=None, *, - loop=None, limit=_DEFAULT_LIMIT, **kwds): + async def start_unix_server(client_connected_cb, path=None, *, + loop=None, limit=_DEFAULT_LIMIT, **kwds): """Similar to `start_server` but works with UNIX Domain Sockets.""" if loop is None: loop = events.get_event_loop() @@ -147,7 +143,7 @@ if hasattr(socket, 'AF_UNIX'): loop=loop) return protocol - return (yield from loop.create_unix_server(factory, path, **kwds)) + return await loop.create_unix_server(factory, path, **kwds) class FlowControlMixin(protocols.Protocol): @@ -203,8 +199,7 @@ class FlowControlMixin(protocols.Protocol): else: waiter.set_exception(exc) - @coroutine - def _drain_helper(self): + async def _drain_helper(self): if self._connection_lost: raise ConnectionResetError('Connection lost') if not self._paused: @@ -213,7 +208,7 @@ class FlowControlMixin(protocols.Protocol): assert waiter is None or waiter.cancelled() waiter = self._loop.create_future() self._drain_waiter = waiter - yield from waiter + await waiter class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): @@ -313,14 +308,13 @@ class StreamWriter: def get_extra_info(self, name, default=None): return self._transport.get_extra_info(name, default) - @coroutine - def drain(self): + async def drain(self): """Flush the write buffer. The intended use is to write w.write(data) - yield from w.drain() + await w.drain() """ if self._reader is not None: exc = self._reader.exception() @@ -331,11 +325,11 @@ class StreamWriter: # Yield to the event loop so connection_lost() may be # called. Without this, _drain_helper() would return # immediately, and code that calls - # write(...); yield from drain() + # write(...); await drain() # in a loop would never call connection_lost(), so it # would not see an error when the socket is closed. - yield - yield from self._protocol._drain_helper() + await sleep(0, loop=self._loop) + await self._protocol._drain_helper() class StreamReader: @@ -436,8 +430,7 @@ class StreamReader: else: self._paused = True - @coroutine - def _wait_for_data(self, func_name): + async def _wait_for_data(self, func_name): """Wait until feed_data() or feed_eof() is called. If stream was paused, automatically resume it. @@ -460,12 +453,11 @@ class StreamReader: self._waiter = self._loop.create_future() try: - yield from self._waiter + await self._waiter finally: self._waiter = None - @coroutine - def readline(self): + async def readline(self): """Read chunk of data from the stream until newline (b'\n') is found. On success, return chunk that ends with newline. If only partial @@ -484,7 +476,7 @@ class StreamReader: sep = b'\n' seplen = len(sep) try: - line = yield from self.readuntil(sep) + line = await self.readuntil(sep) except IncompleteReadError as e: return e.partial except LimitOverrunError as e: @@ -496,8 +488,7 @@ class StreamReader: raise ValueError(e.args[0]) return line - @coroutine - def readuntil(self, separator=b'\n'): + async def readuntil(self, separator=b'\n'): """Read data from the stream until ``separator`` is found. On success, the data and separator will be removed from the @@ -577,7 +568,7 @@ class StreamReader: raise IncompleteReadError(chunk, None) # _wait_for_data() will resume reading if stream was paused. - yield from self._wait_for_data('readuntil') + await self._wait_for_data('readuntil') if isep > self._limit: raise LimitOverrunError( @@ -588,8 +579,7 @@ class StreamReader: self._maybe_resume_transport() return bytes(chunk) - @coroutine - def read(self, n=-1): + async def read(self, n=-1): """Read up to `n` bytes from the stream. If n is not provided, or set to -1, read until EOF and return all read @@ -623,14 +613,14 @@ class StreamReader: # bytes. So just call self.read(self._limit) until EOF. blocks = [] while True: - block = yield from self.read(self._limit) + block = await self.read(self._limit) if not block: break blocks.append(block) return b''.join(blocks) if not self._buffer and not self._eof: - yield from self._wait_for_data('read') + await self._wait_for_data('read') # This will work right even if buffer is less than n bytes data = bytes(self._buffer[:n]) @@ -639,8 +629,7 @@ class StreamReader: self._maybe_resume_transport() return data - @coroutine - def readexactly(self, n): + async def readexactly(self, n): """Read exactly `n` bytes. Raise an IncompleteReadError if EOF is reached before `n` bytes can be @@ -670,7 +659,7 @@ class StreamReader: self._buffer.clear() raise IncompleteReadError(incomplete, n) - yield from self._wait_for_data('readexactly') + await self._wait_for_data('readexactly') if len(self._buffer) == n: data = bytes(self._buffer) @@ -684,9 +673,8 @@ class StreamReader: def __aiter__(self): return self - @coroutine - def __anext__(self): - val = yield from self.readline() + async def __anext__(self): + val = await self.readline() if val == b'': raise StopAsyncIteration return val |