diff options
author | Andrew Svetlov <andrew.svetlov@gmail.com> | 2017-12-08 22:23:48 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-08 22:23:48 (GMT) |
commit | 5f841b553814969220b096a2b4f959b7f6fcbaf6 (patch) | |
tree | b48ea916d9585efa9bf7ff370b50c4e2dfb30247 /Lib/asyncio/base_events.py | |
parent | ede157331b4f9e550334900b3b4de1c8590688de (diff) | |
download | cpython-5f841b553814969220b096a2b4f959b7f6fcbaf6.zip cpython-5f841b553814969220b096a2b4f959b7f6fcbaf6.tar.gz cpython-5f841b553814969220b096a2b4f959b7f6fcbaf6.tar.bz2 |
bpo-32193: Convert asyncio to async/await usage (#4753)
* Convert asyncio/tasks.py to async/await
* Convert asyncio/queues.py to async/await
* Convert asyncio/test_utils.py to async/await
* Convert asyncio/base_subprocess.py to async/await
* Convert asyncio/subprocess.py to async/await
* Convert asyncio/streams.py to async/await
* Fix comments
* Convert asyncio/locks.py to async/await
* Convert asyncio.sleep to async def
* Add a comment
* Add missing news
* Convert stubs from AbstrctEventLoop to async functions
* Convert subprocess_shell/subprocess_exec
* Convert connect_read_pipe/connect_write_pip to async/await syntax
* Convert create_datagram_endpoint
* Convert create_unix_server/create_unix_connection
* Get rid of old style coroutines in unix_events.py
* Convert selector_events.py to async/await
* Convert wait_closed and create_connection
* Drop redundant line
* Convert base_events.py
* Code cleanup
* Drop redundant comments
* Fix indentation
* Add explicit tests for compatibility between old and new coroutines
* Convert windows event loop to use async/await
* Fix double awaiting of async function
* Convert asyncio/locks.py
* Improve docstring
* Convert tests to async/await
* Convert more tests
* Convert more tests
* Convert more tests
* Convert tests
* Improve test
Diffstat (limited to 'Lib/asyncio/base_events.py')
-rw-r--r-- | Lib/asyncio/base_events.py | 128 |
1 files changed, 59 insertions, 69 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index ffdb50f..ab92a0b 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -33,7 +33,6 @@ from . import coroutines from . import events from . import futures from . import tasks -from .coroutines import coroutine from .log import logger @@ -220,13 +219,12 @@ class Server(events.AbstractServer): if not waiter.done(): waiter.set_result(waiter) - @coroutine - def wait_closed(self): + async def wait_closed(self): if self.sockets is None or self._waiters is None: return waiter = self._loop.create_future() self._waiters.append(waiter) - yield from waiter + await waiter class BaseEventLoop(events.AbstractEventLoop): @@ -330,10 +328,9 @@ class BaseEventLoop(events.AbstractEventLoop): """Create write pipe transport.""" raise NotImplementedError - @coroutine - def _make_subprocess_transport(self, protocol, args, shell, - stdin, stdout, stderr, bufsize, - extra=None, **kwargs): + async def _make_subprocess_transport(self, protocol, args, shell, + stdin, stdout, stderr, bufsize, + extra=None, **kwargs): """Create subprocess transport.""" raise NotImplementedError @@ -371,8 +368,7 @@ class BaseEventLoop(events.AbstractEventLoop): self._asyncgens.add(agen) - @coroutine - def shutdown_asyncgens(self): + async def shutdown_asyncgens(self): """Shutdown all active asynchronous generators.""" self._asyncgens_shutdown_called = True @@ -384,12 +380,11 @@ class BaseEventLoop(events.AbstractEventLoop): closing_agens = list(self._asyncgens) self._asyncgens.clear() - shutdown_coro = tasks.gather( + results = await tasks.gather( *[ag.aclose() for ag in closing_agens], return_exceptions=True, loop=self) - results = yield from shutdown_coro for result, agen in zip(results, closing_agens): if isinstance(result, Exception): self.call_exception_handler({ @@ -671,10 +666,10 @@ class BaseEventLoop(events.AbstractEventLoop): def getnameinfo(self, sockaddr, flags=0): return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags) - @coroutine - def create_connection(self, protocol_factory, host=None, port=None, *, - ssl=None, family=0, proto=0, flags=0, sock=None, - local_addr=None, server_hostname=None): + async def create_connection(self, protocol_factory, host=None, port=None, + *, ssl=None, family=0, + proto=0, flags=0, sock=None, + local_addr=None, server_hostname=None): """Connect to a TCP server. Create a streaming transport connection to a given Internet host and @@ -722,7 +717,7 @@ class BaseEventLoop(events.AbstractEventLoop): else: f2 = None - yield from tasks.wait(fs, loop=self) + await tasks.wait(fs, loop=self) infos = f1.result() if not infos: @@ -755,7 +750,7 @@ class BaseEventLoop(events.AbstractEventLoop): continue if self._debug: logger.debug("connect %r to %r", sock, address) - yield from self.sock_connect(sock, address) + await self.sock_connect(sock, address) except OSError as exc: if sock is not None: sock.close() @@ -793,7 +788,7 @@ class BaseEventLoop(events.AbstractEventLoop): raise ValueError( 'A Stream Socket was expected, got {!r}'.format(sock)) - transport, protocol = yield from self._create_connection_transport( + transport, protocol = await self._create_connection_transport( sock, protocol_factory, ssl, server_hostname) if self._debug: # Get the socket from the transport because SSL transport closes @@ -803,9 +798,8 @@ class BaseEventLoop(events.AbstractEventLoop): sock, host, port, transport, protocol) return transport, protocol - @coroutine - def _create_connection_transport(self, sock, protocol_factory, ssl, - server_hostname, server_side=False): + async def _create_connection_transport(self, sock, protocol_factory, ssl, + server_hostname, server_side=False): sock.setblocking(False) @@ -820,19 +814,18 @@ class BaseEventLoop(events.AbstractEventLoop): transport = self._make_socket_transport(sock, protocol, waiter) try: - yield from waiter + await waiter except: transport.close() raise return transport, protocol - @coroutine - def create_datagram_endpoint(self, protocol_factory, - local_addr=None, remote_addr=None, *, - family=0, proto=0, flags=0, - reuse_address=None, reuse_port=None, - allow_broadcast=None, sock=None): + async def create_datagram_endpoint(self, protocol_factory, + local_addr=None, remote_addr=None, *, + family=0, proto=0, flags=0, + reuse_address=None, reuse_port=None, + allow_broadcast=None, sock=None): """Create datagram connection.""" if sock is not None: if not _is_dgram_socket(sock): @@ -872,7 +865,7 @@ class BaseEventLoop(events.AbstractEventLoop): assert isinstance(addr, tuple) and len(addr) == 2, ( '2-tuple is expected') - infos = yield from _ensure_resolved( + infos = await _ensure_resolved( addr, family=family, type=socket.SOCK_DGRAM, proto=proto, flags=flags, loop=self) if not infos: @@ -918,7 +911,7 @@ class BaseEventLoop(events.AbstractEventLoop): if local_addr: sock.bind(local_address) if remote_addr: - yield from self.sock_connect(sock, remote_address) + await self.sock_connect(sock, remote_address) r_addr = remote_address except OSError as exc: if sock is not None: @@ -948,32 +941,30 @@ class BaseEventLoop(events.AbstractEventLoop): remote_addr, transport, protocol) try: - yield from waiter + await waiter except: transport.close() raise return transport, protocol - @coroutine - def _create_server_getaddrinfo(self, host, port, family, flags): - infos = yield from _ensure_resolved((host, port), family=family, - type=socket.SOCK_STREAM, - flags=flags, loop=self) + async def _create_server_getaddrinfo(self, host, port, family, flags): + infos = await _ensure_resolved((host, port), family=family, + type=socket.SOCK_STREAM, + flags=flags, loop=self) if not infos: raise OSError('getaddrinfo({!r}) returned empty list'.format(host)) return infos - @coroutine - def create_server(self, protocol_factory, host=None, port=None, - *, - family=socket.AF_UNSPEC, - flags=socket.AI_PASSIVE, - sock=None, - backlog=100, - ssl=None, - reuse_address=None, - reuse_port=None): + async def create_server(self, protocol_factory, host=None, port=None, + *, + family=socket.AF_UNSPEC, + flags=socket.AI_PASSIVE, + sock=None, + backlog=100, + ssl=None, + reuse_address=None, + reuse_port=None): """Create a TCP server. The host parameter can be a string, in that case the TCP server is bound @@ -1011,7 +1002,7 @@ class BaseEventLoop(events.AbstractEventLoop): fs = [self._create_server_getaddrinfo(host, port, family=family, flags=flags) for host in hosts] - infos = yield from tasks.gather(*fs, loop=self) + infos = await tasks.gather(*fs, loop=self) infos = set(itertools.chain.from_iterable(infos)) completed = False @@ -1068,8 +1059,8 @@ class BaseEventLoop(events.AbstractEventLoop): logger.info("%r is serving", server) return server - @coroutine - def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None): + async def connect_accepted_socket(self, protocol_factory, sock, + *, ssl=None): """Handle an accepted connection. This is used by servers that accept connections outside of @@ -1082,7 +1073,7 @@ class BaseEventLoop(events.AbstractEventLoop): raise ValueError( 'A Stream Socket was expected, got {!r}'.format(sock)) - transport, protocol = yield from self._create_connection_transport( + transport, protocol = await self._create_connection_transport( sock, protocol_factory, ssl, '', server_side=True) if self._debug: # Get the socket from the transport because SSL transport closes @@ -1091,14 +1082,13 @@ class BaseEventLoop(events.AbstractEventLoop): logger.debug("%r handled: (%r, %r)", sock, transport, protocol) return transport, protocol - @coroutine - def connect_read_pipe(self, protocol_factory, pipe): + async def connect_read_pipe(self, protocol_factory, pipe): protocol = protocol_factory() waiter = self.create_future() transport = self._make_read_pipe_transport(pipe, protocol, waiter) try: - yield from waiter + await waiter except: transport.close() raise @@ -1108,14 +1098,13 @@ class BaseEventLoop(events.AbstractEventLoop): pipe.fileno(), transport, protocol) return transport, protocol - @coroutine - def connect_write_pipe(self, protocol_factory, pipe): + async def connect_write_pipe(self, protocol_factory, pipe): protocol = protocol_factory() waiter = self.create_future() transport = self._make_write_pipe_transport(pipe, protocol, waiter) try: - yield from waiter + await waiter except: transport.close() raise @@ -1138,11 +1127,13 @@ class BaseEventLoop(events.AbstractEventLoop): info.append('stderr=%s' % _format_pipe(stderr)) logger.debug(' '.join(info)) - @coroutine - def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=False, shell=True, bufsize=0, - **kwargs): + async def subprocess_shell(self, protocol_factory, cmd, *, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=False, + shell=True, bufsize=0, + **kwargs): if not isinstance(cmd, (bytes, str)): raise ValueError("cmd must be a string") if universal_newlines: @@ -1157,17 +1148,16 @@ class BaseEventLoop(events.AbstractEventLoop): # (password) and may be too long debug_log = 'run shell command %r' % cmd self._log_subprocess(debug_log, stdin, stdout, stderr) - transport = yield from self._make_subprocess_transport( + transport = await self._make_subprocess_transport( protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) if self._debug: logger.info('%s: %r', debug_log, transport) return transport, protocol - @coroutine - def subprocess_exec(self, protocol_factory, program, *args, - stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, universal_newlines=False, - shell=False, bufsize=0, **kwargs): + async def subprocess_exec(self, protocol_factory, program, *args, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, universal_newlines=False, + shell=False, bufsize=0, **kwargs): if universal_newlines: raise ValueError("universal_newlines must be False") if shell: @@ -1186,7 +1176,7 @@ class BaseEventLoop(events.AbstractEventLoop): # (password) and may be too long debug_log = 'execute program %r' % program self._log_subprocess(debug_log, stdin, stdout, stderr) - transport = yield from self._make_subprocess_transport( + transport = await self._make_subprocess_transport( protocol, popen_args, False, stdin, stdout, stderr, bufsize, **kwargs) if self._debug: |