diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-03-13 16:42:29 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-13 16:42:29 (GMT) |
commit | 9f04ee569cebb8b4c6f04bea95d91a19c5403806 (patch) | |
tree | abd4626ee19d394f35109b4cd2830d50a4a4f9d9 /Lib/test/test_asyncio/test_sock_lowlevel.py | |
parent | 7e473e94a52024ac821dd2f206290423e4987ead (diff) | |
download | cpython-9f04ee569cebb8b4c6f04bea95d91a19c5403806.zip cpython-9f04ee569cebb8b4c6f04bea95d91a19c5403806.tar.gz cpython-9f04ee569cebb8b4c6f04bea95d91a19c5403806.tar.bz2 |
bpo-46805: Add low level UDP socket functions to asyncio (GH-31455)
Diffstat (limited to 'Lib/test/test_asyncio/test_sock_lowlevel.py')
-rw-r--r-- | Lib/test/test_asyncio/test_sock_lowlevel.py | 75 |
1 files changed, 74 insertions, 1 deletions
diff --git a/Lib/test/test_asyncio/test_sock_lowlevel.py b/Lib/test/test_asyncio/test_sock_lowlevel.py index 14001a4..db47616 100644 --- a/Lib/test/test_asyncio/test_sock_lowlevel.py +++ b/Lib/test/test_asyncio/test_sock_lowlevel.py @@ -5,11 +5,11 @@ import unittest from asyncio import proactor_events from itertools import cycle, islice +from unittest.mock import patch, Mock from test.test_asyncio import utils as test_utils from test import support from test.support import socket_helper - def tearDownModule(): asyncio.set_event_loop_policy(None) @@ -380,6 +380,79 @@ class BaseSockTestsMixin: self.loop.run_until_complete( self._basetest_huge_content_recvinto(httpd.address)) + async def _basetest_datagram_recvfrom(self, server_address): + # Happy path, sock.sendto() returns immediately + data = b'\x01' * 4096 + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.setblocking(False) + await self.loop.sock_sendto(sock, data, server_address) + received_data, from_addr = await self.loop.sock_recvfrom( + sock, 4096) + self.assertEqual(received_data, data) + self.assertEqual(from_addr, server_address) + + def test_recvfrom(self): + with test_utils.run_udp_echo_server() as server_address: + self.loop.run_until_complete( + self._basetest_datagram_recvfrom(server_address)) + + async def _basetest_datagram_recvfrom_into(self, server_address): + # Happy path, sock.sendto() returns immediately + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.setblocking(False) + + buf = bytearray(4096) + data = b'\x01' * 4096 + await self.loop.sock_sendto(sock, data, server_address) + num_bytes, from_addr = await self.loop.sock_recvfrom_into( + sock, buf) + self.assertEqual(num_bytes, 4096) + self.assertEqual(buf, data) + self.assertEqual(from_addr, server_address) + + buf = bytearray(8192) + await self.loop.sock_sendto(sock, data, server_address) + num_bytes, from_addr = await self.loop.sock_recvfrom_into( + sock, buf, 4096) + self.assertEqual(num_bytes, 4096) + self.assertEqual(buf[:4096], data[:4096]) + self.assertEqual(from_addr, server_address) + + def test_recvfrom_into(self): + with test_utils.run_udp_echo_server() as server_address: + self.loop.run_until_complete( + self._basetest_datagram_recvfrom_into(server_address)) + + async def _basetest_datagram_sendto_blocking(self, server_address): + # Sad path, sock.sendto() raises BlockingIOError + # This involves patching sock.sendto() to raise BlockingIOError but + # sendto() is not used by the proactor event loop + data = b'\x01' * 4096 + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.setblocking(False) + mock_sock = Mock(sock) + mock_sock.gettimeout = sock.gettimeout + mock_sock.sendto.configure_mock(side_effect=BlockingIOError) + mock_sock.fileno = sock.fileno + self.loop.call_soon( + lambda: setattr(mock_sock, 'sendto', sock.sendto) + ) + await self.loop.sock_sendto(mock_sock, data, server_address) + + received_data, from_addr = await self.loop.sock_recvfrom( + sock, 4096) + self.assertEqual(received_data, data) + self.assertEqual(from_addr, server_address) + + def test_sendto_blocking(self): + if sys.platform == 'win32': + if isinstance(self.loop, asyncio.ProactorEventLoop): + raise unittest.SkipTest('Not relevant to ProactorEventLoop') + + with test_utils.run_udp_echo_server() as server_address: + self.loop.run_until_complete( + self._basetest_datagram_sendto_blocking(server_address)) + @socket_helper.skip_unless_bind_unix_socket def test_unix_sock_client_ops(self): with test_utils.run_test_unix_server() as httpd: |