diff options
-rw-r--r-- | Lib/asyncio/base_events.py | 22 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_base_events.py | 20 |
2 files changed, 39 insertions, 3 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 41dd681..b2a412c 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -102,10 +102,26 @@ def _ipaddr_info(host, port, family, type, proto): else: return None - if port in {None, '', b''}: + if port is None: port = 0 - elif isinstance(port, (bytes, str)): - port = int(port) + elif isinstance(port, bytes): + if port == b'': + port = 0 + else: + try: + port = int(port) + except ValueError: + # Might be a service name like b"http". + port = socket.getservbyname(port.decode('ascii')) + elif isinstance(port, str): + if port == '': + port = 0 + else: + try: + port = int(port) + except ValueError: + # Might be a service name like "http". + port = socket.getservbyname(port) if hasattr(socket, 'inet_pton'): if family == socket.AF_UNSPEC: diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 81c35c8..e800ec4 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -146,6 +146,26 @@ class BaseEventTests(test_utils.TestCase): (INET, STREAM, TCP, '', ('1.2.3.4', 1)), base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP)) + def test_getaddrinfo_servname(self): + INET = socket.AF_INET + STREAM = socket.SOCK_STREAM + TCP = socket.IPPROTO_TCP + + self.assertEqual( + (INET, STREAM, TCP, '', ('1.2.3.4', 80)), + base_events._ipaddr_info('1.2.3.4', 'http', INET, STREAM, TCP)) + + self.assertEqual( + (INET, STREAM, TCP, '', ('1.2.3.4', 80)), + base_events._ipaddr_info('1.2.3.4', b'http', INET, STREAM, TCP)) + + # Raises "service/proto not found". + with self.assertRaises(OSError): + base_events._ipaddr_info('1.2.3.4', 'nonsense', INET, STREAM, TCP) + + with self.assertRaises(OSError): + base_events._ipaddr_info('1.2.3.4', 'nonsense', INET, STREAM, TCP) + @patch_socket def test_ipaddr_info_no_inet_pton(self, m_socket): del m_socket.inet_pton |