summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/selector_events.py
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-03-13 16:42:29 (GMT)
committerGitHub <noreply@github.com>2022-03-13 16:42:29 (GMT)
commit9f04ee569cebb8b4c6f04bea95d91a19c5403806 (patch)
treeabd4626ee19d394f35109b4cd2830d50a4a4f9d9 /Lib/asyncio/selector_events.py
parent7e473e94a52024ac821dd2f206290423e4987ead (diff)
downloadcpython-9f04ee569cebb8b4c6f04bea95d91a19c5403806.zip
cpython-9f04ee569cebb8b4c6f04bea95d91a19c5403806.tar.gz
cpython-9f04ee569cebb8b4c6f04bea95d91a19c5403806.tar.bz2
bpo-46805: Add low level UDP socket functions to asyncio (GH-31455)
Diffstat (limited to 'Lib/asyncio/selector_events.py')
-rw-r--r--Lib/asyncio/selector_events.py124
1 files changed, 124 insertions, 0 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index c3c2ec1..bfd8019 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -434,6 +434,88 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else:
fut.set_result(nbytes)
+ async def sock_recvfrom(self, sock, bufsize):
+ """Receive a datagram from a datagram socket.
+
+ The return value is a tuple of (bytes, address) representing the
+ datagram received and the address it came from.
+ The maximum amount of data to be received at once is specified by
+ nbytes.
+ """
+ base_events._check_ssl_socket(sock)
+ if self._debug and sock.gettimeout() != 0:
+ raise ValueError("the socket must be non-blocking")
+ try:
+ return sock.recvfrom(bufsize)
+ except (BlockingIOError, InterruptedError):
+ pass
+ fut = self.create_future()
+ fd = sock.fileno()
+ self._ensure_fd_no_transport(fd)
+ handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
+ fut.add_done_callback(
+ functools.partial(self._sock_read_done, fd, handle=handle))
+ return await fut
+
+ def _sock_recvfrom(self, fut, sock, bufsize):
+ # _sock_recvfrom() can add itself as an I/O callback if the operation
+ # can't be done immediately. Don't use it directly, call
+ # sock_recvfrom().
+ if fut.done():
+ return
+ try:
+ result = sock.recvfrom(bufsize)
+ except (BlockingIOError, InterruptedError):
+ return # try again next time
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
+ fut.set_exception(exc)
+ else:
+ fut.set_result(result)
+
+ async def sock_recvfrom_into(self, sock, buf, nbytes=0):
+ """Receive data from the socket.
+
+ The received data is written into *buf* (a writable buffer).
+ The return value is a tuple of (number of bytes written, address).
+ """
+ base_events._check_ssl_socket(sock)
+ if self._debug and sock.gettimeout() != 0:
+ raise ValueError("the socket must be non-blocking")
+ if not nbytes:
+ nbytes = len(buf)
+
+ try:
+ return sock.recvfrom_into(buf, nbytes)
+ except (BlockingIOError, InterruptedError):
+ pass
+ fut = self.create_future()
+ fd = sock.fileno()
+ self._ensure_fd_no_transport(fd)
+ handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
+ nbytes)
+ fut.add_done_callback(
+ functools.partial(self._sock_read_done, fd, handle=handle))
+ return await fut
+
+ def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
+ # _sock_recv_into() can add itself as an I/O callback if the operation
+ # can't be done immediately. Don't use it directly, call
+ # sock_recv_into().
+ if fut.done():
+ return
+ try:
+ result = sock.recvfrom_into(buf, bufsize)
+ except (BlockingIOError, InterruptedError):
+ return # try again next time
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
+ fut.set_exception(exc)
+ else:
+ fut.set_result(result)
+
async def sock_sendall(self, sock, data):
"""Send data to the socket.
@@ -487,6 +569,48 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else:
pos[0] = start
+ async def sock_sendto(self, sock, data, address):
+ """Send data to the socket.
+
+ The socket must be connected to a remote socket. This method continues
+ to send data from data until either all data has been sent or an
+ error occurs. None is returned on success. On error, an exception is
+ raised, and there is no way to determine how much data, if any, was
+ successfully processed by the receiving end of the connection.
+ """
+ base_events._check_ssl_socket(sock)
+ if self._debug and sock.gettimeout() != 0:
+ raise ValueError("the socket must be non-blocking")
+ try:
+ return sock.sendto(data, address)
+ except (BlockingIOError, InterruptedError):
+ pass
+
+ fut = self.create_future()
+ fd = sock.fileno()
+ self._ensure_fd_no_transport(fd)
+ # use a trick with a list in closure to store a mutable state
+ handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
+ address)
+ fut.add_done_callback(
+ functools.partial(self._sock_write_done, fd, handle=handle))
+ return await fut
+
+ def _sock_sendto(self, fut, sock, data, address):
+ if fut.done():
+ # Future cancellation can be scheduled on previous loop iteration
+ return
+ try:
+ n = sock.sendto(data, 0, address)
+ except (BlockingIOError, InterruptedError):
+ return
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
+ fut.set_exception(exc)
+ else:
+ fut.set_result(n)
+
async def sock_connect(self, sock, address):
"""Connect to a remote socket at address.