diff options
author | Yury Selivanov <yury@magic.io> | 2017-12-15 01:53:26 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-15 01:53:26 (GMT) |
commit | 19a44f63c738388ef3c8515348b4ffc061dfd627 (patch) | |
tree | 6de5ddd62a1bdee7a90e5fe8fc59348fe7c4f4f8 /Lib/asyncio | |
parent | 41264f1cd4d6066b2797ff07cae465c1e06ff3b2 (diff) | |
download | cpython-19a44f63c738388ef3c8515348b4ffc061dfd627.zip cpython-19a44f63c738388ef3c8515348b4ffc061dfd627.tar.gz cpython-19a44f63c738388ef3c8515348b4ffc061dfd627.tar.bz2 |
bpo-32327: Convert asyncio functions documented as coroutines to coroutines. (#4872)
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/base_events.py | 85 | ||||
-rw-r--r-- | Lib/asyncio/proactor_events.py | 20 | ||||
-rw-r--r-- | Lib/asyncio/selector_events.py | 30 |
3 files changed, 59 insertions, 76 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 9584d63..80d2b69 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -157,20 +157,6 @@ def _ipaddr_info(host, port, family, type, proto): return None -def _ensure_resolved(address, *, family=0, type=socket.SOCK_STREAM, proto=0, - flags=0, loop): - host, port = address[:2] - info = _ipaddr_info(host, port, family, type, proto) - if info is not None: - # "host" is already a resolved IP. - fut = loop.create_future() - fut.set_result([info]) - return fut - else: - return loop.getaddrinfo(host, port, family=family, type=type, - proto=proto, flags=flags) - - def _run_until_complete_cb(fut): exc = fut._exception if isinstance(exc, BaseException) and not isinstance(exc, Exception): @@ -614,7 +600,7 @@ class BaseEventLoop(events.AbstractEventLoop): self._write_to_self() return handle - def run_in_executor(self, executor, func, *args): + async def run_in_executor(self, executor, func, *args): self._check_closed() if self._debug: self._check_callback(func, 'run_in_executor') @@ -623,7 +609,8 @@ class BaseEventLoop(events.AbstractEventLoop): if executor is None: executor = concurrent.futures.ThreadPoolExecutor() self._default_executor = executor - return futures.wrap_future(executor.submit(func, *args), loop=self) + return await futures.wrap_future( + executor.submit(func, *args), loop=self) def set_default_executor(self, executor): self._default_executor = executor @@ -652,17 +639,19 @@ class BaseEventLoop(events.AbstractEventLoop): logger.debug(msg) return addrinfo - def getaddrinfo(self, host, port, *, - family=0, type=0, proto=0, flags=0): + async def getaddrinfo(self, host, port, *, + family=0, type=0, proto=0, flags=0): if self._debug: - return self.run_in_executor(None, self._getaddrinfo_debug, - host, port, family, type, proto, flags) + getaddr_func = self._getaddrinfo_debug else: - return self.run_in_executor(None, socket.getaddrinfo, - host, port, family, type, proto, flags) + getaddr_func = socket.getaddrinfo - def getnameinfo(self, sockaddr, flags=0): - return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags) + return await self.run_in_executor( + None, getaddr_func, host, port, family, type, proto, flags) + + async def getnameinfo(self, sockaddr, flags=0): + return await self.run_in_executor( + None, socket.getnameinfo, sockaddr, flags) async def create_connection(self, protocol_factory, host=None, port=None, *, ssl=None, family=0, @@ -703,25 +692,17 @@ class BaseEventLoop(events.AbstractEventLoop): raise ValueError( 'host/port and sock can not be specified at the same time') - f1 = _ensure_resolved((host, port), family=family, - type=socket.SOCK_STREAM, proto=proto, - flags=flags, loop=self) - fs = [f1] - if local_addr is not None: - f2 = _ensure_resolved(local_addr, family=family, - type=socket.SOCK_STREAM, proto=proto, - flags=flags, loop=self) - fs.append(f2) - else: - f2 = None - - await tasks.wait(fs, loop=self) - - infos = f1.result() + infos = await self._ensure_resolved( + (host, port), family=family, + type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) if not infos: raise OSError('getaddrinfo() returned empty list') - if f2 is not None: - laddr_infos = f2.result() + + if local_addr is not None: + laddr_infos = await self._ensure_resolved( + local_addr, family=family, + type=socket.SOCK_STREAM, proto=proto, + flags=flags, loop=self) if not laddr_infos: raise OSError('getaddrinfo() returned empty list') @@ -730,7 +711,7 @@ class BaseEventLoop(events.AbstractEventLoop): try: sock = socket.socket(family=family, type=type, proto=proto) sock.setblocking(False) - if f2 is not None: + if local_addr is not None: for _, _, _, _, laddr in laddr_infos: try: sock.bind(laddr) @@ -863,7 +844,7 @@ class BaseEventLoop(events.AbstractEventLoop): assert isinstance(addr, tuple) and len(addr) == 2, ( '2-tuple is expected') - infos = await _ensure_resolved( + infos = await self._ensure_resolved( addr, family=family, type=socket.SOCK_DGRAM, proto=proto, flags=flags, loop=self) if not infos: @@ -946,10 +927,22 @@ class BaseEventLoop(events.AbstractEventLoop): return transport, protocol + async def _ensure_resolved(self, address, *, + family=0, type=socket.SOCK_STREAM, + proto=0, flags=0, loop): + host, port = address[:2] + info = _ipaddr_info(host, port, family, type, proto) + if info is not None: + # "host" is already a resolved IP. + return [info] + else: + return await loop.getaddrinfo(host, port, family=family, type=type, + proto=proto, flags=flags) + async def _create_server_getaddrinfo(self, host, port, family, flags): - infos = await _ensure_resolved((host, port), family=family, - type=socket.SOCK_STREAM, - flags=flags, loop=self) + infos = await self._ensure_resolved((host, port), family=family, + type=socket.SOCK_STREAM, + flags=flags, loop=self) if not infos: raise OSError(f'getaddrinfo({host!r}) returned empty list') return infos diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 3d48a2c..291d989 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -432,20 +432,20 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): # Close the event loop super().close() - def sock_recv(self, sock, n): - return self._proactor.recv(sock, n) + async def sock_recv(self, sock, n): + return await self._proactor.recv(sock, n) - def sock_recv_into(self, sock, buf): - return self._proactor.recv_into(sock, buf) + async def sock_recv_into(self, sock, buf): + return await self._proactor.recv_into(sock, buf) - def sock_sendall(self, sock, data): - return self._proactor.send(sock, data) + async def sock_sendall(self, sock, data): + return await self._proactor.send(sock, data) - def sock_connect(self, sock, address): - return self._proactor.connect(sock, address) + async def sock_connect(self, sock, address): + return await self._proactor.connect(sock, address) - def sock_accept(self, sock): - return self._proactor.accept(sock) + async def sock_accept(self, sock): + return await self._proactor.accept(sock) def _close_self_pipe(self): if self._self_reading_future is not None: diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 2467e23..78ebf3e 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -336,20 +336,18 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): self._ensure_fd_no_transport(fd) return self._remove_writer(fd) - def sock_recv(self, sock, n): + async def sock_recv(self, sock, n): """Receive data from the socket. The return value is a bytes object representing the data received. The maximum amount of data to be received at once is specified by nbytes. - - This method is a coroutine. """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") fut = self.create_future() self._sock_recv(fut, None, sock, n) - return fut + return await fut def _sock_recv(self, fut, registered_fd, sock, n): # _sock_recv() can add itself as an I/O callback if the operation can't @@ -372,19 +370,17 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): else: fut.set_result(data) - def sock_recv_into(self, sock, buf): + async def sock_recv_into(self, sock, buf): """Receive data from the socket. The received data is written into *buf* (a writable buffer). The return value is the number of bytes written. - - This method is a coroutine. """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") fut = self.create_future() self._sock_recv_into(fut, None, sock, buf) - return fut + return await fut def _sock_recv_into(self, fut, registered_fd, sock, buf): # _sock_recv_into() can add itself as an I/O callback if the operation @@ -408,7 +404,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): else: fut.set_result(nbytes) - def sock_sendall(self, sock, data): + async def sock_sendall(self, sock, data): """Send data to the socket. The socket must be connected to a remote socket. This method continues @@ -416,8 +412,6 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): error occurs. None is returned on success. On error, an exception is raised, and there is no way to determine how much data, if any, was successfully processed by the receiving end of the connection. - - This method is a coroutine. """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") @@ -426,7 +420,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): self._sock_sendall(fut, None, sock, data) else: fut.set_result(None) - return fut + return await fut def _sock_sendall(self, fut, registered_fd, sock, data): if registered_fd is not None: @@ -459,11 +453,9 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): raise ValueError("the socket must be non-blocking") if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX: - resolved = base_events._ensure_resolved( + resolved = await self._ensure_resolved( address, family=sock.family, proto=sock.proto, loop=self) - if not resolved.done(): - await resolved - _, _, _, _, address = resolved.result()[0] + _, _, _, _, address = resolved[0] fut = self.create_future() self._sock_connect(fut, sock, address) @@ -506,21 +498,19 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): else: fut.set_result(None) - def sock_accept(self, sock): + async def sock_accept(self, sock): """Accept a connection. The socket must be bound to an address and listening for connections. The return value is a pair (conn, address) where conn is a new socket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection. - - This method is a coroutine. """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") fut = self.create_future() self._sock_accept(fut, False, sock) - return fut + return await fut def _sock_accept(self, fut, registered, sock): fd = sock.fileno() |