summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/streams.py
diff options
context:
space:
mode:
authorAndrew Svetlov <andrew.svetlov@gmail.com>2017-12-08 22:23:48 (GMT)
committerGitHub <noreply@github.com>2017-12-08 22:23:48 (GMT)
commit5f841b553814969220b096a2b4f959b7f6fcbaf6 (patch)
treeb48ea916d9585efa9bf7ff370b50c4e2dfb30247 /Lib/asyncio/streams.py
parentede157331b4f9e550334900b3b4de1c8590688de (diff)
downloadcpython-5f841b553814969220b096a2b4f959b7f6fcbaf6.zip
cpython-5f841b553814969220b096a2b4f959b7f6fcbaf6.tar.gz
cpython-5f841b553814969220b096a2b4f959b7f6fcbaf6.tar.bz2
bpo-32193: Convert asyncio to async/await usage (#4753)
* Convert asyncio/tasks.py to async/await * Convert asyncio/queues.py to async/await * Convert asyncio/test_utils.py to async/await * Convert asyncio/base_subprocess.py to async/await * Convert asyncio/subprocess.py to async/await * Convert asyncio/streams.py to async/await * Fix comments * Convert asyncio/locks.py to async/await * Convert asyncio.sleep to async def * Add a comment * Add missing news * Convert stubs from AbstrctEventLoop to async functions * Convert subprocess_shell/subprocess_exec * Convert connect_read_pipe/connect_write_pip to async/await syntax * Convert create_datagram_endpoint * Convert create_unix_server/create_unix_connection * Get rid of old style coroutines in unix_events.py * Convert selector_events.py to async/await * Convert wait_closed and create_connection * Drop redundant line * Convert base_events.py * Code cleanup * Drop redundant comments * Fix indentation * Add explicit tests for compatibility between old and new coroutines * Convert windows event loop to use async/await * Fix double awaiting of async function * Convert asyncio/locks.py * Improve docstring * Convert tests to async/await * Convert more tests * Convert more tests * Convert more tests * Convert tests * Improve test
Diffstat (limited to 'Lib/asyncio/streams.py')
-rw-r--r--Lib/asyncio/streams.py78
1 files changed, 33 insertions, 45 deletions
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 15c9513..baa9ec9 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -14,8 +14,8 @@ if hasattr(socket, 'AF_UNIX'):
from . import coroutines
from . import events
from . import protocols
-from .coroutines import coroutine
from .log import logger
+from .tasks import sleep
_DEFAULT_LIMIT = 2 ** 16
@@ -52,9 +52,8 @@ class LimitOverrunError(Exception):
return type(self), (self.args[0], self.consumed)
-@coroutine
-def open_connection(host=None, port=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
+async def open_connection(host=None, port=None, *,
+ loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""A wrapper for create_connection() returning a (reader, writer) pair.
The reader returned is a StreamReader instance; the writer is a
@@ -76,15 +75,14 @@ def open_connection(host=None, port=None, *,
loop = events.get_event_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
- transport, _ = yield from loop.create_connection(
+ transport, _ = await loop.create_connection(
lambda: protocol, host, port, **kwds)
writer = StreamWriter(transport, protocol, reader, loop)
return reader, writer
-@coroutine
-def start_server(client_connected_cb, host=None, port=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
+async def start_server(client_connected_cb, host=None, port=None, *,
+ loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Start a socket server, call back for each client connected.
The first parameter, `client_connected_cb`, takes two parameters:
@@ -115,28 +113,26 @@ def start_server(client_connected_cb, host=None, port=None, *,
loop=loop)
return protocol
- return (yield from loop.create_server(factory, host, port, **kwds))
+ return await loop.create_server(factory, host, port, **kwds)
if hasattr(socket, 'AF_UNIX'):
# UNIX Domain Sockets are supported on this platform
- @coroutine
- def open_unix_connection(path=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ async def open_unix_connection(path=None, *,
+ loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `open_connection` but works with UNIX Domain Sockets."""
if loop is None:
loop = events.get_event_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
- transport, _ = yield from loop.create_unix_connection(
+ transport, _ = await loop.create_unix_connection(
lambda: protocol, path, **kwds)
writer = StreamWriter(transport, protocol, reader, loop)
return reader, writer
- @coroutine
- def start_unix_server(client_connected_cb, path=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ async def start_unix_server(client_connected_cb, path=None, *,
+ loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `start_server` but works with UNIX Domain Sockets."""
if loop is None:
loop = events.get_event_loop()
@@ -147,7 +143,7 @@ if hasattr(socket, 'AF_UNIX'):
loop=loop)
return protocol
- return (yield from loop.create_unix_server(factory, path, **kwds))
+ return await loop.create_unix_server(factory, path, **kwds)
class FlowControlMixin(protocols.Protocol):
@@ -203,8 +199,7 @@ class FlowControlMixin(protocols.Protocol):
else:
waiter.set_exception(exc)
- @coroutine
- def _drain_helper(self):
+ async def _drain_helper(self):
if self._connection_lost:
raise ConnectionResetError('Connection lost')
if not self._paused:
@@ -213,7 +208,7 @@ class FlowControlMixin(protocols.Protocol):
assert waiter is None or waiter.cancelled()
waiter = self._loop.create_future()
self._drain_waiter = waiter
- yield from waiter
+ await waiter
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
@@ -313,14 +308,13 @@ class StreamWriter:
def get_extra_info(self, name, default=None):
return self._transport.get_extra_info(name, default)
- @coroutine
- def drain(self):
+ async def drain(self):
"""Flush the write buffer.
The intended use is to write
w.write(data)
- yield from w.drain()
+ await w.drain()
"""
if self._reader is not None:
exc = self._reader.exception()
@@ -331,11 +325,11 @@ class StreamWriter:
# Yield to the event loop so connection_lost() may be
# called. Without this, _drain_helper() would return
# immediately, and code that calls
- # write(...); yield from drain()
+ # write(...); await drain()
# in a loop would never call connection_lost(), so it
# would not see an error when the socket is closed.
- yield
- yield from self._protocol._drain_helper()
+ await sleep(0, loop=self._loop)
+ await self._protocol._drain_helper()
class StreamReader:
@@ -436,8 +430,7 @@ class StreamReader:
else:
self._paused = True
- @coroutine
- def _wait_for_data(self, func_name):
+ async def _wait_for_data(self, func_name):
"""Wait until feed_data() or feed_eof() is called.
If stream was paused, automatically resume it.
@@ -460,12 +453,11 @@ class StreamReader:
self._waiter = self._loop.create_future()
try:
- yield from self._waiter
+ await self._waiter
finally:
self._waiter = None
- @coroutine
- def readline(self):
+ async def readline(self):
"""Read chunk of data from the stream until newline (b'\n') is found.
On success, return chunk that ends with newline. If only partial
@@ -484,7 +476,7 @@ class StreamReader:
sep = b'\n'
seplen = len(sep)
try:
- line = yield from self.readuntil(sep)
+ line = await self.readuntil(sep)
except IncompleteReadError as e:
return e.partial
except LimitOverrunError as e:
@@ -496,8 +488,7 @@ class StreamReader:
raise ValueError(e.args[0])
return line
- @coroutine
- def readuntil(self, separator=b'\n'):
+ async def readuntil(self, separator=b'\n'):
"""Read data from the stream until ``separator`` is found.
On success, the data and separator will be removed from the
@@ -577,7 +568,7 @@ class StreamReader:
raise IncompleteReadError(chunk, None)
# _wait_for_data() will resume reading if stream was paused.
- yield from self._wait_for_data('readuntil')
+ await self._wait_for_data('readuntil')
if isep > self._limit:
raise LimitOverrunError(
@@ -588,8 +579,7 @@ class StreamReader:
self._maybe_resume_transport()
return bytes(chunk)
- @coroutine
- def read(self, n=-1):
+ async def read(self, n=-1):
"""Read up to `n` bytes from the stream.
If n is not provided, or set to -1, read until EOF and return all read
@@ -623,14 +613,14 @@ class StreamReader:
# bytes. So just call self.read(self._limit) until EOF.
blocks = []
while True:
- block = yield from self.read(self._limit)
+ block = await self.read(self._limit)
if not block:
break
blocks.append(block)
return b''.join(blocks)
if not self._buffer and not self._eof:
- yield from self._wait_for_data('read')
+ await self._wait_for_data('read')
# This will work right even if buffer is less than n bytes
data = bytes(self._buffer[:n])
@@ -639,8 +629,7 @@ class StreamReader:
self._maybe_resume_transport()
return data
- @coroutine
- def readexactly(self, n):
+ async def readexactly(self, n):
"""Read exactly `n` bytes.
Raise an IncompleteReadError if EOF is reached before `n` bytes can be
@@ -670,7 +659,7 @@ class StreamReader:
self._buffer.clear()
raise IncompleteReadError(incomplete, n)
- yield from self._wait_for_data('readexactly')
+ await self._wait_for_data('readexactly')
if len(self._buffer) == n:
data = bytes(self._buffer)
@@ -684,9 +673,8 @@ class StreamReader:
def __aiter__(self):
return self
- @coroutine
- def __anext__(self):
- val = yield from self.readline()
+ async def __anext__(self):
+ val = await self.readline()
if val == b'':
raise StopAsyncIteration
return val