summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-03-13 16:42:29 (GMT)
committerGitHub <noreply@github.com>2022-03-13 16:42:29 (GMT)
commit9f04ee569cebb8b4c6f04bea95d91a19c5403806 (patch)
treeabd4626ee19d394f35109b4cd2830d50a4a4f9d9 /Lib/asyncio
parent7e473e94a52024ac821dd2f206290423e4987ead (diff)
downloadcpython-9f04ee569cebb8b4c6f04bea95d91a19c5403806.zip
cpython-9f04ee569cebb8b4c6f04bea95d91a19c5403806.tar.gz
cpython-9f04ee569cebb8b4c6f04bea95d91a19c5403806.tar.bz2
bpo-46805: Add low level UDP socket functions to asyncio (GH-31455)
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/events.py9
-rw-r--r--Lib/asyncio/proactor_events.py12
-rw-r--r--Lib/asyncio/selector_events.py124
-rw-r--r--Lib/asyncio/windows_events.py20
4 files changed, 165 insertions, 0 deletions
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 1d305e3..e682a19 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -546,9 +546,18 @@ class AbstractEventLoop:
async def sock_recv_into(self, sock, buf):
raise NotImplementedError
+ async def sock_recvfrom(self, sock, bufsize):
+ raise NotImplementedError
+
+ async def sock_recvfrom_into(self, sock, buf, nbytes=0):
+ raise NotImplementedError
+
async def sock_sendall(self, sock, data):
raise NotImplementedError
+ async def sock_sendto(self, sock, data, address):
+ raise NotImplementedError
+
async def sock_connect(self, sock, address):
raise NotImplementedError
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index 43d5e70..087f095 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -700,9 +700,21 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
async def sock_recv_into(self, sock, buf):
return await self._proactor.recv_into(sock, buf)
+ async def sock_recvfrom(self, sock, bufsize):
+ return await self._proactor.recvfrom(sock, bufsize)
+
+ async def sock_recvfrom_into(self, sock, buf, nbytes=0):
+ if not nbytes:
+ nbytes = len(buf)
+
+ return await self._proactor.recvfrom_into(sock, buf, nbytes)
+
async def sock_sendall(self, sock, data):
return await self._proactor.send(sock, data)
+ async def sock_sendto(self, sock, data, address):
+ return await self._proactor.sendto(sock, data, 0, address)
+
async def sock_connect(self, sock, address):
return await self._proactor.connect(sock, address)
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index c3c2ec1..bfd8019 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -434,6 +434,88 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else:
fut.set_result(nbytes)
+ async def sock_recvfrom(self, sock, bufsize):
+ """Receive a datagram from a datagram socket.
+
+ The return value is a tuple of (bytes, address) representing the
+ datagram received and the address it came from.
+ The maximum amount of data to be received at once is specified by
+ nbytes.
+ """
+ base_events._check_ssl_socket(sock)
+ if self._debug and sock.gettimeout() != 0:
+ raise ValueError("the socket must be non-blocking")
+ try:
+ return sock.recvfrom(bufsize)
+ except (BlockingIOError, InterruptedError):
+ pass
+ fut = self.create_future()
+ fd = sock.fileno()
+ self._ensure_fd_no_transport(fd)
+ handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
+ fut.add_done_callback(
+ functools.partial(self._sock_read_done, fd, handle=handle))
+ return await fut
+
+ def _sock_recvfrom(self, fut, sock, bufsize):
+ # _sock_recvfrom() can add itself as an I/O callback if the operation
+ # can't be done immediately. Don't use it directly, call
+ # sock_recvfrom().
+ if fut.done():
+ return
+ try:
+ result = sock.recvfrom(bufsize)
+ except (BlockingIOError, InterruptedError):
+ return # try again next time
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
+ fut.set_exception(exc)
+ else:
+ fut.set_result(result)
+
+ async def sock_recvfrom_into(self, sock, buf, nbytes=0):
+ """Receive data from the socket.
+
+ The received data is written into *buf* (a writable buffer).
+ The return value is a tuple of (number of bytes written, address).
+ """
+ base_events._check_ssl_socket(sock)
+ if self._debug and sock.gettimeout() != 0:
+ raise ValueError("the socket must be non-blocking")
+ if not nbytes:
+ nbytes = len(buf)
+
+ try:
+ return sock.recvfrom_into(buf, nbytes)
+ except (BlockingIOError, InterruptedError):
+ pass
+ fut = self.create_future()
+ fd = sock.fileno()
+ self._ensure_fd_no_transport(fd)
+ handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
+ nbytes)
+ fut.add_done_callback(
+ functools.partial(self._sock_read_done, fd, handle=handle))
+ return await fut
+
+ def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
+ # _sock_recv_into() can add itself as an I/O callback if the operation
+ # can't be done immediately. Don't use it directly, call
+ # sock_recv_into().
+ if fut.done():
+ return
+ try:
+ result = sock.recvfrom_into(buf, bufsize)
+ except (BlockingIOError, InterruptedError):
+ return # try again next time
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
+ fut.set_exception(exc)
+ else:
+ fut.set_result(result)
+
async def sock_sendall(self, sock, data):
"""Send data to the socket.
@@ -487,6 +569,48 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else:
pos[0] = start
+ async def sock_sendto(self, sock, data, address):
+ """Send data to the socket.
+
+ The socket must be connected to a remote socket. This method continues
+ to send data from data until either all data has been sent or an
+ error occurs. None is returned on success. On error, an exception is
+ raised, and there is no way to determine how much data, if any, was
+ successfully processed by the receiving end of the connection.
+ """
+ base_events._check_ssl_socket(sock)
+ if self._debug and sock.gettimeout() != 0:
+ raise ValueError("the socket must be non-blocking")
+ try:
+ return sock.sendto(data, address)
+ except (BlockingIOError, InterruptedError):
+ pass
+
+ fut = self.create_future()
+ fd = sock.fileno()
+ self._ensure_fd_no_transport(fd)
+ # use a trick with a list in closure to store a mutable state
+ handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
+ address)
+ fut.add_done_callback(
+ functools.partial(self._sock_write_done, fd, handle=handle))
+ return await fut
+
+ def _sock_sendto(self, fut, sock, data, address):
+ if fut.done():
+ # Future cancellation can be scheduled on previous loop iteration
+ return
+ try:
+ n = sock.sendto(data, 0, address)
+ except (BlockingIOError, InterruptedError):
+ return
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
+ fut.set_exception(exc)
+ else:
+ fut.set_result(n)
+
async def sock_connect(self, sock, address):
"""Connect to a remote socket at address.
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index 0d9a07e..90b259c 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -512,6 +512,26 @@ class IocpProactor:
return self._register(ov, conn, finish_recv)
+ def recvfrom_into(self, conn, buf, flags=0):
+ self._register_with_iocp(conn)
+ ov = _overlapped.Overlapped(NULL)
+ try:
+ ov.WSARecvFromInto(conn.fileno(), buf, flags)
+ except BrokenPipeError:
+ return self._result((0, None))
+
+ def finish_recv(trans, key, ov):
+ try:
+ return ov.getresult()
+ except OSError as exc:
+ if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
+ _overlapped.ERROR_OPERATION_ABORTED):
+ raise ConnectionResetError(*exc.args)
+ else:
+ raise
+
+ return self._register(ov, conn, finish_recv)
+
def sendto(self, conn, buf, flags=0, addr=None):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)