summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/base_events.py
diff options
context:
space:
mode:
authorAndrew Svetlov <andrew.svetlov@gmail.com>2017-12-08 22:23:48 (GMT)
committerGitHub <noreply@github.com>2017-12-08 22:23:48 (GMT)
commit5f841b553814969220b096a2b4f959b7f6fcbaf6 (patch)
treeb48ea916d9585efa9bf7ff370b50c4e2dfb30247 /Lib/asyncio/base_events.py
parentede157331b4f9e550334900b3b4de1c8590688de (diff)
downloadcpython-5f841b553814969220b096a2b4f959b7f6fcbaf6.zip
cpython-5f841b553814969220b096a2b4f959b7f6fcbaf6.tar.gz
cpython-5f841b553814969220b096a2b4f959b7f6fcbaf6.tar.bz2
bpo-32193: Convert asyncio to async/await usage (#4753)
* Convert asyncio/tasks.py to async/await * Convert asyncio/queues.py to async/await * Convert asyncio/test_utils.py to async/await * Convert asyncio/base_subprocess.py to async/await * Convert asyncio/subprocess.py to async/await * Convert asyncio/streams.py to async/await * Fix comments * Convert asyncio/locks.py to async/await * Convert asyncio.sleep to async def * Add a comment * Add missing news * Convert stubs from AbstrctEventLoop to async functions * Convert subprocess_shell/subprocess_exec * Convert connect_read_pipe/connect_write_pip to async/await syntax * Convert create_datagram_endpoint * Convert create_unix_server/create_unix_connection * Get rid of old style coroutines in unix_events.py * Convert selector_events.py to async/await * Convert wait_closed and create_connection * Drop redundant line * Convert base_events.py * Code cleanup * Drop redundant comments * Fix indentation * Add explicit tests for compatibility between old and new coroutines * Convert windows event loop to use async/await * Fix double awaiting of async function * Convert asyncio/locks.py * Improve docstring * Convert tests to async/await * Convert more tests * Convert more tests * Convert more tests * Convert tests * Improve test
Diffstat (limited to 'Lib/asyncio/base_events.py')
-rw-r--r--Lib/asyncio/base_events.py128
1 files changed, 59 insertions, 69 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index ffdb50f..ab92a0b 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -33,7 +33,6 @@ from . import coroutines
from . import events
from . import futures
from . import tasks
-from .coroutines import coroutine
from .log import logger
@@ -220,13 +219,12 @@ class Server(events.AbstractServer):
if not waiter.done():
waiter.set_result(waiter)
- @coroutine
- def wait_closed(self):
+ async def wait_closed(self):
if self.sockets is None or self._waiters is None:
return
waiter = self._loop.create_future()
self._waiters.append(waiter)
- yield from waiter
+ await waiter
class BaseEventLoop(events.AbstractEventLoop):
@@ -330,10 +328,9 @@ class BaseEventLoop(events.AbstractEventLoop):
"""Create write pipe transport."""
raise NotImplementedError
- @coroutine
- def _make_subprocess_transport(self, protocol, args, shell,
- stdin, stdout, stderr, bufsize,
- extra=None, **kwargs):
+ async def _make_subprocess_transport(self, protocol, args, shell,
+ stdin, stdout, stderr, bufsize,
+ extra=None, **kwargs):
"""Create subprocess transport."""
raise NotImplementedError
@@ -371,8 +368,7 @@ class BaseEventLoop(events.AbstractEventLoop):
self._asyncgens.add(agen)
- @coroutine
- def shutdown_asyncgens(self):
+ async def shutdown_asyncgens(self):
"""Shutdown all active asynchronous generators."""
self._asyncgens_shutdown_called = True
@@ -384,12 +380,11 @@ class BaseEventLoop(events.AbstractEventLoop):
closing_agens = list(self._asyncgens)
self._asyncgens.clear()
- shutdown_coro = tasks.gather(
+ results = await tasks.gather(
*[ag.aclose() for ag in closing_agens],
return_exceptions=True,
loop=self)
- results = yield from shutdown_coro
for result, agen in zip(results, closing_agens):
if isinstance(result, Exception):
self.call_exception_handler({
@@ -671,10 +666,10 @@ class BaseEventLoop(events.AbstractEventLoop):
def getnameinfo(self, sockaddr, flags=0):
return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
- @coroutine
- def create_connection(self, protocol_factory, host=None, port=None, *,
- ssl=None, family=0, proto=0, flags=0, sock=None,
- local_addr=None, server_hostname=None):
+ async def create_connection(self, protocol_factory, host=None, port=None,
+ *, ssl=None, family=0,
+ proto=0, flags=0, sock=None,
+ local_addr=None, server_hostname=None):
"""Connect to a TCP server.
Create a streaming transport connection to a given Internet host and
@@ -722,7 +717,7 @@ class BaseEventLoop(events.AbstractEventLoop):
else:
f2 = None
- yield from tasks.wait(fs, loop=self)
+ await tasks.wait(fs, loop=self)
infos = f1.result()
if not infos:
@@ -755,7 +750,7 @@ class BaseEventLoop(events.AbstractEventLoop):
continue
if self._debug:
logger.debug("connect %r to %r", sock, address)
- yield from self.sock_connect(sock, address)
+ await self.sock_connect(sock, address)
except OSError as exc:
if sock is not None:
sock.close()
@@ -793,7 +788,7 @@ class BaseEventLoop(events.AbstractEventLoop):
raise ValueError(
'A Stream Socket was expected, got {!r}'.format(sock))
- transport, protocol = yield from self._create_connection_transport(
+ transport, protocol = await self._create_connection_transport(
sock, protocol_factory, ssl, server_hostname)
if self._debug:
# Get the socket from the transport because SSL transport closes
@@ -803,9 +798,8 @@ class BaseEventLoop(events.AbstractEventLoop):
sock, host, port, transport, protocol)
return transport, protocol
- @coroutine
- def _create_connection_transport(self, sock, protocol_factory, ssl,
- server_hostname, server_side=False):
+ async def _create_connection_transport(self, sock, protocol_factory, ssl,
+ server_hostname, server_side=False):
sock.setblocking(False)
@@ -820,19 +814,18 @@ class BaseEventLoop(events.AbstractEventLoop):
transport = self._make_socket_transport(sock, protocol, waiter)
try:
- yield from waiter
+ await waiter
except:
transport.close()
raise
return transport, protocol
- @coroutine
- def create_datagram_endpoint(self, protocol_factory,
- local_addr=None, remote_addr=None, *,
- family=0, proto=0, flags=0,
- reuse_address=None, reuse_port=None,
- allow_broadcast=None, sock=None):
+ async def create_datagram_endpoint(self, protocol_factory,
+ local_addr=None, remote_addr=None, *,
+ family=0, proto=0, flags=0,
+ reuse_address=None, reuse_port=None,
+ allow_broadcast=None, sock=None):
"""Create datagram connection."""
if sock is not None:
if not _is_dgram_socket(sock):
@@ -872,7 +865,7 @@ class BaseEventLoop(events.AbstractEventLoop):
assert isinstance(addr, tuple) and len(addr) == 2, (
'2-tuple is expected')
- infos = yield from _ensure_resolved(
+ infos = await _ensure_resolved(
addr, family=family, type=socket.SOCK_DGRAM,
proto=proto, flags=flags, loop=self)
if not infos:
@@ -918,7 +911,7 @@ class BaseEventLoop(events.AbstractEventLoop):
if local_addr:
sock.bind(local_address)
if remote_addr:
- yield from self.sock_connect(sock, remote_address)
+ await self.sock_connect(sock, remote_address)
r_addr = remote_address
except OSError as exc:
if sock is not None:
@@ -948,32 +941,30 @@ class BaseEventLoop(events.AbstractEventLoop):
remote_addr, transport, protocol)
try:
- yield from waiter
+ await waiter
except:
transport.close()
raise
return transport, protocol
- @coroutine
- def _create_server_getaddrinfo(self, host, port, family, flags):
- infos = yield from _ensure_resolved((host, port), family=family,
- type=socket.SOCK_STREAM,
- flags=flags, loop=self)
+ async def _create_server_getaddrinfo(self, host, port, family, flags):
+ infos = await _ensure_resolved((host, port), family=family,
+ type=socket.SOCK_STREAM,
+ flags=flags, loop=self)
if not infos:
raise OSError('getaddrinfo({!r}) returned empty list'.format(host))
return infos
- @coroutine
- def create_server(self, protocol_factory, host=None, port=None,
- *,
- family=socket.AF_UNSPEC,
- flags=socket.AI_PASSIVE,
- sock=None,
- backlog=100,
- ssl=None,
- reuse_address=None,
- reuse_port=None):
+ async def create_server(self, protocol_factory, host=None, port=None,
+ *,
+ family=socket.AF_UNSPEC,
+ flags=socket.AI_PASSIVE,
+ sock=None,
+ backlog=100,
+ ssl=None,
+ reuse_address=None,
+ reuse_port=None):
"""Create a TCP server.
The host parameter can be a string, in that case the TCP server is bound
@@ -1011,7 +1002,7 @@ class BaseEventLoop(events.AbstractEventLoop):
fs = [self._create_server_getaddrinfo(host, port, family=family,
flags=flags)
for host in hosts]
- infos = yield from tasks.gather(*fs, loop=self)
+ infos = await tasks.gather(*fs, loop=self)
infos = set(itertools.chain.from_iterable(infos))
completed = False
@@ -1068,8 +1059,8 @@ class BaseEventLoop(events.AbstractEventLoop):
logger.info("%r is serving", server)
return server
- @coroutine
- def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None):
+ async def connect_accepted_socket(self, protocol_factory, sock,
+ *, ssl=None):
"""Handle an accepted connection.
This is used by servers that accept connections outside of
@@ -1082,7 +1073,7 @@ class BaseEventLoop(events.AbstractEventLoop):
raise ValueError(
'A Stream Socket was expected, got {!r}'.format(sock))
- transport, protocol = yield from self._create_connection_transport(
+ transport, protocol = await self._create_connection_transport(
sock, protocol_factory, ssl, '', server_side=True)
if self._debug:
# Get the socket from the transport because SSL transport closes
@@ -1091,14 +1082,13 @@ class BaseEventLoop(events.AbstractEventLoop):
logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
return transport, protocol
- @coroutine
- def connect_read_pipe(self, protocol_factory, pipe):
+ async def connect_read_pipe(self, protocol_factory, pipe):
protocol = protocol_factory()
waiter = self.create_future()
transport = self._make_read_pipe_transport(pipe, protocol, waiter)
try:
- yield from waiter
+ await waiter
except:
transport.close()
raise
@@ -1108,14 +1098,13 @@ class BaseEventLoop(events.AbstractEventLoop):
pipe.fileno(), transport, protocol)
return transport, protocol
- @coroutine
- def connect_write_pipe(self, protocol_factory, pipe):
+ async def connect_write_pipe(self, protocol_factory, pipe):
protocol = protocol_factory()
waiter = self.create_future()
transport = self._make_write_pipe_transport(pipe, protocol, waiter)
try:
- yield from waiter
+ await waiter
except:
transport.close()
raise
@@ -1138,11 +1127,13 @@ class BaseEventLoop(events.AbstractEventLoop):
info.append('stderr=%s' % _format_pipe(stderr))
logger.debug(' '.join(info))
- @coroutine
- def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- universal_newlines=False, shell=True, bufsize=0,
- **kwargs):
+ async def subprocess_shell(self, protocol_factory, cmd, *,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=False,
+ shell=True, bufsize=0,
+ **kwargs):
if not isinstance(cmd, (bytes, str)):
raise ValueError("cmd must be a string")
if universal_newlines:
@@ -1157,17 +1148,16 @@ class BaseEventLoop(events.AbstractEventLoop):
# (password) and may be too long
debug_log = 'run shell command %r' % cmd
self._log_subprocess(debug_log, stdin, stdout, stderr)
- transport = yield from self._make_subprocess_transport(
+ transport = await self._make_subprocess_transport(
protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
if self._debug:
logger.info('%s: %r', debug_log, transport)
return transport, protocol
- @coroutine
- def subprocess_exec(self, protocol_factory, program, *args,
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE, universal_newlines=False,
- shell=False, bufsize=0, **kwargs):
+ async def subprocess_exec(self, protocol_factory, program, *args,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, universal_newlines=False,
+ shell=False, bufsize=0, **kwargs):
if universal_newlines:
raise ValueError("universal_newlines must be False")
if shell:
@@ -1186,7 +1176,7 @@ class BaseEventLoop(events.AbstractEventLoop):
# (password) and may be too long
debug_log = 'execute program %r' % program
self._log_subprocess(debug_log, stdin, stdout, stderr)
- transport = yield from self._make_subprocess_transport(
+ transport = await self._make_subprocess_transport(
protocol, popen_args, False, stdin, stdout, stderr,
bufsize, **kwargs)
if self._debug: