summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_socket.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_socket.py')
-rw-r--r--Lib/test/test_socket.py455
1 files changed, 350 insertions, 105 deletions
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index ee4aea3..cc8da17 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -1,9 +1,9 @@
import unittest
from test import support
-from unittest.case import _ExpectedFailure
import errno
import io
+import itertools
import socket
import select
import tempfile
@@ -21,13 +21,13 @@ import math
import pickle
import struct
try:
- import fcntl
-except ImportError:
- fcntl = False
-try:
import multiprocessing
except ImportError:
multiprocessing = False
+try:
+ import fcntl
+except ImportError:
+ fcntl = None
HOST = support.HOST
MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') ## test unicode string and carriage return
@@ -38,12 +38,17 @@ try:
except ImportError:
thread = None
threading = None
+try:
+ import _socket
+except ImportError:
+ _socket = None
+
def _have_socket_can():
"""Check whether CAN sockets are supported on this host."""
try:
s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
- except (AttributeError, socket.error, OSError):
+ except (AttributeError, OSError):
return False
else:
s.close()
@@ -118,12 +123,42 @@ class SocketCANTest(unittest.TestCase):
interface = 'vcan0'
bufsize = 128
+ """The CAN frame structure is defined in <linux/can.h>:
+
+ struct can_frame {
+ canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */
+ __u8 can_dlc; /* data length code: 0 .. 8 */
+ __u8 data[8] __attribute__((aligned(8)));
+ };
+ """
+ can_frame_fmt = "=IB3x8s"
+ can_frame_size = struct.calcsize(can_frame_fmt)
+
+ """The Broadcast Management Command frame structure is defined
+ in <linux/can/bcm.h>:
+
+ struct bcm_msg_head {
+ __u32 opcode;
+ __u32 flags;
+ __u32 count;
+ struct timeval ival1, ival2;
+ canid_t can_id;
+ __u32 nframes;
+ struct can_frame frames[0];
+ }
+
+ `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see
+ `struct can_frame` definition). Must use native not standard types for packing.
+ """
+ bcm_cmd_msg_fmt = "@3I4l2I"
+ bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8)
+
def setUp(self):
self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
self.addCleanup(self.s.close)
try:
self.s.bind((self.interface,))
- except socket.error:
+ except OSError:
self.skipTest('network interface `%s` does not exist' %
self.interface)
@@ -239,9 +274,6 @@ class ThreadableTest:
raise TypeError("test_func must be a callable function")
try:
test_func()
- except _ExpectedFailure:
- # We deliberately ignore expected failures
- pass
except BaseException as e:
self.queue.put(e)
finally:
@@ -292,7 +324,7 @@ class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
try:
self.cli.bind((self.interface,))
- except socket.error:
+ except OSError:
# skipTest should not be called here, and will be called in the
# server instead
pass
@@ -540,11 +572,7 @@ class SCTPStreamBase(InetTestBase):
class Inet6TestBase(InetTestBase):
"""Base class for IPv6 socket tests."""
- # Don't use "localhost" here - it may not have an IPv6 address
- # assigned to it by default (e.g. in /etc/hosts), and if someone
- # has assigned it an IPv4-mapped address, then it's unlikely to
- # work with the full IPv6 API.
- host = "::1"
+ host = support.HOSTv6
class UDP6TestBase(Inet6TestBase):
"""Base class for UDP-over-IPv6 tests."""
@@ -605,7 +633,7 @@ def requireSocket(*args):
for obj in args]
try:
s = socket.socket(*callargs)
- except socket.error as e:
+ except OSError as e:
# XXX: check errno?
err = str(e)
else:
@@ -621,10 +649,39 @@ def requireSocket(*args):
class GeneralModuleTests(unittest.TestCase):
+ def test_SocketType_is_socketobject(self):
+ import _socket
+ self.assertTrue(socket.SocketType is _socket.socket)
+ s = socket.socket()
+ self.assertIsInstance(s, socket.SocketType)
+ s.close()
+
def test_repr(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.addCleanup(s.close)
- self.assertTrue(repr(s).startswith("<socket.socket object"))
+ with s:
+ self.assertIn('fd=%i' % s.fileno(), repr(s))
+ self.assertIn('family=%s' % socket.AF_INET, repr(s))
+ self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s))
+ self.assertIn('proto=0', repr(s))
+ self.assertNotIn('raddr', repr(s))
+ s.bind(('127.0.0.1', 0))
+ self.assertIn('laddr', repr(s))
+ self.assertIn(str(s.getsockname()), repr(s))
+ self.assertIn('[closed]', repr(s))
+ self.assertNotIn('laddr', repr(s))
+
+ @unittest.skipUnless(_socket is not None, 'need _socket module')
+ def test_csocket_repr(self):
+ s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
+ try:
+ expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>'
+ % (s.fileno(), s.family, s.type, s.proto))
+ self.assertEqual(repr(s), expected)
+ finally:
+ s.close()
+ expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>'
+ % (s.family, s.type, s.proto))
+ self.assertEqual(repr(s), expected)
def test_weakref(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -642,15 +699,15 @@ class GeneralModuleTests(unittest.TestCase):
def testSocketError(self):
# Testing socket module exceptions
msg = "Error raising socket exception (%s)."
- with self.assertRaises(socket.error, msg=msg % 'socket.error'):
- raise socket.error
- with self.assertRaises(socket.error, msg=msg % 'socket.herror'):
+ with self.assertRaises(OSError, msg=msg % 'OSError'):
+ raise OSError
+ with self.assertRaises(OSError, msg=msg % 'socket.herror'):
raise socket.herror
- with self.assertRaises(socket.error, msg=msg % 'socket.gaierror'):
+ with self.assertRaises(OSError, msg=msg % 'socket.gaierror'):
raise socket.gaierror
def testSendtoErrors(self):
- # Testing that sendto doens't masks failures. See #10169.
+ # Testing that sendto doesn't masks failures. See #10169.
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.addCleanup(s.close)
s.bind(('', 0))
@@ -709,13 +766,13 @@ class GeneralModuleTests(unittest.TestCase):
hostname = socket.gethostname()
try:
ip = socket.gethostbyname(hostname)
- except socket.error:
+ except OSError:
# Probably name lookup wasn't set up right; skip this test
self.skipTest('name lookup failure')
self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
try:
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
- except socket.error:
+ except OSError:
# Probably a similar problem as above; skip this test
self.skipTest('name lookup failure')
all_host_names = [hostname, hname] + aliases
@@ -723,13 +780,27 @@ class GeneralModuleTests(unittest.TestCase):
if not fqhn in all_host_names:
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
+ def test_host_resolution(self):
+ for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2',
+ '1:1:1:1:1:1:1:1:1']:
+ self.assertRaises(OSError, socket.gethostbyname, addr)
+ self.assertRaises(OSError, socket.gethostbyaddr, addr)
+
+ for addr in [support.HOST, '10.0.0.1', '255.255.255.255']:
+ self.assertEqual(socket.gethostbyname(addr), addr)
+
+ # we don't test support.HOSTv6 because there's a chance it doesn't have
+ # a matching name entry (e.g. 'ip6-localhost')
+ for host in [support.HOST]:
+ self.assertIn(host, socket.gethostbyaddr(host)[2])
+
@unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()")
@unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()")
def test_sethostname(self):
oldhn = socket.gethostname()
try:
socket.sethostname('new')
- except socket.error as e:
+ except OSError as e:
if e.errno == errno.EPERM:
self.skipTest("test should be run as root")
else:
@@ -763,8 +834,8 @@ class GeneralModuleTests(unittest.TestCase):
'socket.if_nameindex() not available.')
def testInvalidInterfaceNameIndex(self):
# test nonexistent interface index/name
- self.assertRaises(socket.error, socket.if_indextoname, 0)
- self.assertRaises(socket.error, socket.if_nametoindex, '_DEADBEEF')
+ self.assertRaises(OSError, socket.if_indextoname, 0)
+ self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF')
# test with invalid values
self.assertRaises(TypeError, socket.if_nametoindex, 0)
self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF')
@@ -786,7 +857,7 @@ class GeneralModuleTests(unittest.TestCase):
try:
# On some versions, this crashes the interpreter.
socket.getnameinfo(('x', 0, 0, 0), 0)
- except socket.error:
+ except OSError:
pass
def testNtoH(self):
@@ -822,7 +893,7 @@ class GeneralModuleTests(unittest.TestCase):
# Find one service that exists, then check all the related interfaces.
# I've ordered this by protocols that have both a tcp and udp
# protocol, at least for modern Linuxes.
- if (sys.platform.startswith(('freebsd', 'netbsd'))
+ if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd'))
or sys.platform in ('linux', 'darwin')):
# avoid the 'echo' service on this platform, as there is an
# assumption breaking non-standard port/protocol entry
@@ -833,17 +904,17 @@ class GeneralModuleTests(unittest.TestCase):
try:
port = socket.getservbyname(service, 'tcp')
break
- except socket.error:
+ except OSError:
pass
else:
- raise socket.error
+ raise OSError
# Try same call with optional protocol omitted
port2 = socket.getservbyname(service)
eq(port, port2)
# Try udp, but don't barf if it doesn't exist
try:
udpport = socket.getservbyname(service, 'udp')
- except socket.error:
+ except OSError:
udpport = None
else:
eq(udpport, port)
@@ -899,7 +970,7 @@ class GeneralModuleTests(unittest.TestCase):
g = lambda a: inet_pton(AF_INET, a)
assertInvalid = lambda func,a: self.assertRaises(
- (socket.error, ValueError), func, a
+ (OSError, ValueError), func, a
)
self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0'))
@@ -932,9 +1003,17 @@ class GeneralModuleTests(unittest.TestCase):
self.skipTest('IPv6 not available')
except ImportError:
self.skipTest('could not import needed symbols from socket')
+
+ if sys.platform == "win32":
+ try:
+ inet_pton(AF_INET6, '::')
+ except OSError as e:
+ if e.winerror == 10022:
+ self.skipTest('IPv6 might not be supported')
+
f = lambda a: inet_pton(AF_INET6, a)
assertInvalid = lambda a: self.assertRaises(
- (socket.error, ValueError), f, a
+ (OSError, ValueError), f, a
)
self.assertEqual(b'\x00' * 16, f('::'))
@@ -983,7 +1062,7 @@ class GeneralModuleTests(unittest.TestCase):
from socket import inet_ntoa as f, inet_ntop, AF_INET
g = lambda a: inet_ntop(AF_INET, a)
assertInvalid = lambda func,a: self.assertRaises(
- (socket.error, ValueError), func, a
+ (OSError, ValueError), func, a
)
self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00'))
@@ -1010,9 +1089,17 @@ class GeneralModuleTests(unittest.TestCase):
self.skipTest('IPv6 not available')
except ImportError:
self.skipTest('could not import needed symbols from socket')
+
+ if sys.platform == "win32":
+ try:
+ inet_ntop(AF_INET6, b'\x00' * 16)
+ except OSError as e:
+ if e.winerror == 10022:
+ self.skipTest('IPv6 might not be supported')
+
f = lambda a: inet_ntop(AF_INET6, a)
assertInvalid = lambda a: self.assertRaises(
- (socket.error, ValueError), f, a
+ (OSError, ValueError), f, a
)
self.assertEqual('::', f(b'\x00' * 16))
@@ -1040,7 +1127,7 @@ class GeneralModuleTests(unittest.TestCase):
# At least for eCos. This is required for the S/390 to pass.
try:
my_ip_addr = socket.gethostbyname(socket.gethostname())
- except socket.error:
+ except OSError:
# Probably name lookup wasn't set up right; skip this test
self.skipTest('name lookup failure')
self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
@@ -1067,28 +1154,41 @@ class GeneralModuleTests(unittest.TestCase):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.close()
- self.assertRaises(socket.error, sock.send, b"spam")
+ self.assertRaises(OSError, sock.send, b"spam")
def testNewAttributes(self):
# testing .family, .type and .protocol
+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.assertEqual(sock.family, socket.AF_INET)
- self.assertEqual(sock.type, socket.SOCK_STREAM)
+ if hasattr(socket, 'SOCK_CLOEXEC'):
+ self.assertIn(sock.type,
+ (socket.SOCK_STREAM | socket.SOCK_CLOEXEC,
+ socket.SOCK_STREAM))
+ else:
+ self.assertEqual(sock.type, socket.SOCK_STREAM)
self.assertEqual(sock.proto, 0)
sock.close()
def test_getsockaddrarg(self):
- host = '0.0.0.0'
+ sock = socket.socket()
+ self.addCleanup(sock.close)
port = support.find_unused_port()
big_port = port + 65536
neg_port = port - 65536
- sock = socket.socket()
- try:
- self.assertRaises(OverflowError, sock.bind, (host, big_port))
- self.assertRaises(OverflowError, sock.bind, (host, neg_port))
- sock.bind((host, port))
- finally:
- sock.close()
+ self.assertRaises(OverflowError, sock.bind, (HOST, big_port))
+ self.assertRaises(OverflowError, sock.bind, (HOST, neg_port))
+ # Since find_unused_port() is inherently subject to race conditions, we
+ # call it a couple times if necessary.
+ for i in itertools.count():
+ port = support.find_unused_port()
+ try:
+ sock.bind((HOST, port))
+ except OSError as e:
+ if e.errno != errno.EADDRINUSE or i == 5:
+ raise
+ else:
+ break
@unittest.skipUnless(os.name == "nt", "Windows specific")
def test_sock_ioctl(self):
@@ -1126,9 +1226,12 @@ class GeneralModuleTests(unittest.TestCase):
socket.getaddrinfo(HOST, 80)
socket.getaddrinfo(HOST, None)
# test family and socktype filters
- infos = socket.getaddrinfo(HOST, None, socket.AF_INET)
- for family, _, _, _, _ in infos:
+ infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM)
+ for family, type, _, _, _ in infos:
self.assertEqual(family, socket.AF_INET)
+ self.assertEqual(str(family), 'AddressFamily.AF_INET')
+ self.assertEqual(type, socket.SOCK_STREAM)
+ self.assertEqual(str(type), 'SocketKind.SOCK_STREAM')
infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
for _, socktype, _, _, _ in infos:
self.assertEqual(socktype, socket.SOCK_STREAM)
@@ -1176,7 +1279,7 @@ class GeneralModuleTests(unittest.TestCase):
def test_getnameinfo(self):
# only IP addresses are allowed
- self.assertRaises(socket.error, socket.getnameinfo, ('mail.python.org',0), 0)
+ self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0)
@unittest.skipUnless(support.is_resource_enabled('network'),
'network is not enabled')
@@ -1272,6 +1375,11 @@ class GeneralModuleTests(unittest.TestCase):
with sock:
for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
self.assertRaises(TypeError, pickle.dumps, sock, protocol)
+ for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
+ family = pickle.loads(pickle.dumps(socket.AF_INET, protocol))
+ self.assertEqual(family, socket.AF_INET)
+ type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol))
+ self.assertEqual(type, socket.SOCK_STREAM)
def test_listen_backlog(self):
for backlog in 0, -1:
@@ -1292,10 +1400,31 @@ class GeneralModuleTests(unittest.TestCase):
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
def test_flowinfo(self):
self.assertRaises(OverflowError, socket.getnameinfo,
- ('::1',0, 0xffffffff), 0)
+ (support.HOSTv6, 0, 0xffffffff), 0)
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
- self.assertRaises(OverflowError, s.bind, ('::1', 0, -10))
-
+ self.assertRaises(OverflowError, s.bind, (support.HOSTv6, 0, -10))
+
+ def test_str_for_enums(self):
+ # Make sure that the AF_* and SOCK_* constants have enum-like string
+ # reprs.
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ self.assertEqual(str(s.family), 'AddressFamily.AF_INET')
+ self.assertEqual(str(s.type), 'SocketKind.SOCK_STREAM')
+
+ @unittest.skipIf(os.name == 'nt', 'Will not work on Windows')
+ def test_uknown_socket_family_repr(self):
+ # Test that when created with a family that's not one of the known
+ # AF_*/SOCK_* constants, socket.family just returns the number.
+ #
+ # To do this we fool socket.socket into believing it already has an
+ # open fd because on this path it doesn't actually verify the family and
+ # type and populates the socket object.
+ #
+ # On Windows this trick won't work, so the test is skipped.
+ fd, _ = tempfile.mkstemp()
+ with socket.socket(family=42424, type=13331, fileno=fd) as s:
+ self.assertEqual(s.family, 42424)
+ self.assertEqual(s.type, 13331)
@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
class BasicCANTest(unittest.TestCase):
@@ -1305,10 +1434,35 @@ class BasicCANTest(unittest.TestCase):
socket.PF_CAN
socket.CAN_RAW
+ @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
+ 'socket.CAN_BCM required for this test.')
+ def testBCMConstants(self):
+ socket.CAN_BCM
+
+ # opcodes
+ socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task
+ socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task
+ socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task
+ socket.CAN_BCM_TX_SEND # send one CAN frame
+ socket.CAN_BCM_RX_SETUP # create RX content filter subscription
+ socket.CAN_BCM_RX_DELETE # remove RX content filter subscription
+ socket.CAN_BCM_RX_READ # read properties of RX content filter subscription
+ socket.CAN_BCM_TX_STATUS # reply to TX_READ request
+ socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0)
+ socket.CAN_BCM_RX_STATUS # reply to RX_READ request
+ socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent
+ socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change)
+
def testCreateSocket(self):
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
pass
+ @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
+ 'socket.CAN_BCM required for this test.')
+ def testCreateBCMSocket(self):
+ with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s:
+ pass
+
def testBindAny(self):
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
s.bind(('', ))
@@ -1316,7 +1470,7 @@ class BasicCANTest(unittest.TestCase):
def testTooLongInterfaceName(self):
# most systems limit IFNAMSIZ to 16, take 1024 to be sure
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
- self.assertRaisesRegex(socket.error, 'interface name too long',
+ self.assertRaisesRegex(OSError, 'interface name too long',
s.bind, ('x' * 1024,))
@unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
@@ -1344,16 +1498,6 @@ class BasicCANTest(unittest.TestCase):
@unittest.skipUnless(thread, 'Threading required for this test.')
class CANTest(ThreadedCANSocketTest):
- """The CAN frame structure is defined in <linux/can.h>:
-
- struct can_frame {
- canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */
- __u8 can_dlc; /* data length code: 0 .. 8 */
- __u8 data[8] __attribute__((aligned(8)));
- };
- """
- can_frame_fmt = "=IB3x8s"
-
def __init__(self, methodName='runTest'):
ThreadedCANSocketTest.__init__(self, methodName=methodName)
@@ -1402,6 +1546,46 @@ class CANTest(ThreadedCANSocketTest):
self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
self.cli.send(self.cf2)
+ @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
+ 'socket.CAN_BCM required for this test.')
+ def _testBCM(self):
+ cf, addr = self.cli.recvfrom(self.bufsize)
+ self.assertEqual(self.cf, cf)
+ can_id, can_dlc, data = self.dissect_can_frame(cf)
+ self.assertEqual(self.can_id, can_id)
+ self.assertEqual(self.data, data)
+
+ @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
+ 'socket.CAN_BCM required for this test.')
+ def testBCM(self):
+ bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM)
+ self.addCleanup(bcm.close)
+ bcm.connect((self.interface,))
+ self.can_id = 0x123
+ self.data = bytes([0xc0, 0xff, 0xee])
+ self.cf = self.build_can_frame(self.can_id, self.data)
+ opcode = socket.CAN_BCM_TX_SEND
+ flags = 0
+ count = 0
+ ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0
+ bcm_can_id = 0x0222
+ nframes = 1
+ assert len(self.cf) == 16
+ header = struct.pack(self.bcm_cmd_msg_fmt,
+ opcode,
+ flags,
+ count,
+ ival1_seconds,
+ ival1_usec,
+ ival2_seconds,
+ ival2_usec,
+ bcm_can_id,
+ nframes,
+ )
+ header_plus_frame = header + self.cf
+ bytes_sent = bcm.send(header_plus_frame)
+ self.assertEqual(bytes_sent, len(header_plus_frame))
+
@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
class BasicRDSTest(unittest.TestCase):
@@ -1624,7 +1808,7 @@ class BasicTCPTest(SocketConnectedTest):
self.assertEqual(f, fileno)
# cli_conn cannot be used anymore...
self.assertTrue(self.cli_conn._closed)
- self.assertRaises(socket.error, self.cli_conn.recv, 1024)
+ self.assertRaises(OSError, self.cli_conn.recv, 1024)
self.cli_conn.close()
# ...but we can create another socket using the (still open)
# file descriptor
@@ -1993,7 +2177,7 @@ class SendmsgTests(SendrecvmsgServerTimeoutBase):
def _testSendmsgExcessCmsgReject(self):
if not hasattr(socket, "CMSG_SPACE"):
# Can only send one item
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")])
self.assertIsNone(cm.exception.errno)
self.sendToServer(b"done")
@@ -2004,7 +2188,7 @@ class SendmsgTests(SendrecvmsgServerTimeoutBase):
def _testSendmsgAfterClose(self):
self.cli_sock.close()
- self.assertRaises(socket.error, self.sendmsgToServer, [MSG])
+ self.assertRaises(OSError, self.sendmsgToServer, [MSG])
class SendmsgStreamTests(SendmsgTests):
@@ -2037,7 +2221,7 @@ class SendmsgStreamTests(SendmsgTests):
# Linux supports MSG_DONTWAIT when sending, but in general, it
# only works when receiving. Could add other platforms if they
# support it too.
- @skipWithClientIf(sys.platform not in {"linux2"},
+ @skipWithClientIf(sys.platform not in {"linux"},
"MSG_DONTWAIT not known to work on this platform when "
"sending")
def testSendmsgDontWait(self):
@@ -2048,7 +2232,7 @@ class SendmsgStreamTests(SendmsgTests):
@testSendmsgDontWait.client_skip
def _testSendmsgDontWait(self):
try:
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
while True:
self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT)
self.assertIn(cm.exception.errno,
@@ -2068,9 +2252,9 @@ class SendmsgConnectionlessTests(SendmsgTests):
pass
def _testSendmsgNoDestAddr(self):
- self.assertRaises(socket.error, self.cli_sock.sendmsg,
+ self.assertRaises(OSError, self.cli_sock.sendmsg,
[MSG])
- self.assertRaises(socket.error, self.cli_sock.sendmsg,
+ self.assertRaises(OSError, self.cli_sock.sendmsg,
[MSG], [], 0, None)
@@ -2156,7 +2340,7 @@ class RecvmsgGenericTests(SendrecvmsgBase):
def testRecvmsgAfterClose(self):
# Check that recvmsg[_into]() fails on a closed socket.
self.serv_sock.close()
- self.assertRaises(socket.error, self.doRecvmsg, self.serv_sock, 1024)
+ self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024)
def _testRecvmsgAfterClose(self):
pass
@@ -2557,6 +2741,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase):
self.createAndSendFDs(1)
@unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
+ @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
@requireAttrs(socket, "CMSG_SPACE")
def testFDPassSeparate(self):
# Pass two FDs in two separate arrays. Arrays may be combined
@@ -2567,6 +2752,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase):
@testFDPassSeparate.client_skip
@unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
+ @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
def _testFDPassSeparate(self):
fd0, fd1 = self.newFDs(2)
self.assertEqual(
@@ -2579,6 +2765,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase):
len(MSG))
@unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
+ @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
@requireAttrs(socket, "CMSG_SPACE")
def testFDPassSeparateMinSpace(self):
# Pass two FDs in two separate arrays, receiving them into the
@@ -2591,6 +2778,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase):
@testFDPassSeparateMinSpace.client_skip
@unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
+ @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
def _testFDPassSeparateMinSpace(self):
fd0, fd1 = self.newFDs(2)
self.assertEqual(
@@ -2607,7 +2795,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase):
# call fails, just send msg with no ancillary data.
try:
nbytes = self.sendmsgToServer([msg], ancdata)
- except socket.error as e:
+ except OSError as e:
# Check that it was the system call that failed
self.assertIsInstance(e.errno, int)
nbytes = self.sendmsgToServer([msg])
@@ -2985,7 +3173,7 @@ class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase):
array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
array.array("i", [self.hop_limit]))])
- except socket.error as e:
+ except OSError as e:
self.assertIsInstance(e.errno, int)
nbytes = self.sendmsgToServer(
[MSG],
@@ -3443,10 +3631,10 @@ class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase):
self.serv.settimeout(self.timeout)
def checkInterruptedRecv(self, func, *args, **kwargs):
- # Check that func(*args, **kwargs) raises socket.error with an
+ # Check that func(*args, **kwargs) raises OSError with an
# errno of EINTR when interrupted by a signal.
self.setAlarm(self.alarm_time)
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
func(*args, **kwargs)
self.assertNotIsInstance(cm.exception, socket.timeout)
self.assertEqual(cm.exception.errno, errno.EINTR)
@@ -3503,9 +3691,9 @@ class InterruptedSendTimeoutTest(InterruptedTimeoutBase,
def checkInterruptedSend(self, func, *args, **kwargs):
# Check that func(*args, **kwargs), run in a loop, raises
- # socket.error with an errno of EINTR when interrupted by a
+ # OSError with an errno of EINTR when interrupted by a
# signal.
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
while True:
self.setAlarm(self.alarm_time)
func(*args, **kwargs)
@@ -3604,7 +3792,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
start = time.time()
try:
self.serv.accept()
- except socket.error:
+ except OSError:
pass
end = time.time()
self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
@@ -3639,7 +3827,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
start = time.time()
try:
self.serv.accept()
- except socket.error:
+ except OSError:
pass
end = time.time()
self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.")
@@ -3669,7 +3857,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
self.serv.setblocking(0)
try:
conn, addr = self.serv.accept()
- except socket.error:
+ except OSError:
pass
else:
self.fail("Error trying to do non-blocking accept.")
@@ -3699,7 +3887,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
conn.setblocking(0)
try:
msg = conn.recv(len(MSG))
- except socket.error:
+ except OSError:
pass
else:
self.fail("Error trying to do non-blocking recv.")
@@ -3782,7 +3970,7 @@ class FileObjectClassTestCase(SocketConnectedTest):
# First read raises a timeout
self.assertRaises(socket.timeout, self.read_file.read, 1)
# Second read is disallowed
- with self.assertRaises(IOError) as ctx:
+ with self.assertRaises(OSError) as ctx:
self.read_file.read(1)
self.assertIn("cannot read from timed out object", str(ctx.exception))
@@ -3874,7 +4062,7 @@ class FileObjectClassTestCase(SocketConnectedTest):
self.read_file.close()
self.assertRaises(ValueError, self.read_file.fileno)
self.cli_conn.close()
- self.assertRaises(socket.error, self.cli_conn.getsockname)
+ self.assertRaises(OSError, self.cli_conn.getsockname)
def _testRealClose(self):
pass
@@ -3911,7 +4099,7 @@ class FileObjectInterruptedTestCase(unittest.TestCase):
@staticmethod
def _raise_eintr():
- raise socket.error(errno.EINTR, "interrupted")
+ raise OSError(errno.EINTR, "interrupted")
def _textiowrap_mock_socket(self, mock, buffering=-1):
raw = socket.SocketIO(mock, "r")
@@ -4023,7 +4211,7 @@ class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
self.assertEqual(msg, self.read_msg)
# ...until the file is itself closed
self.read_file.close()
- self.assertRaises(socket.error, self.cli_conn.recv, 1024)
+ self.assertRaises(OSError, self.cli_conn.recv, 1024)
def _testMakefileClose(self):
self.write_file.write(self.write_msg)
@@ -4172,7 +4360,7 @@ class NetworkConnectionNoServer(unittest.TestCase):
port = support.find_unused_port()
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(cli.close)
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
cli.connect((HOST, port))
self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
@@ -4180,7 +4368,7 @@ class NetworkConnectionNoServer(unittest.TestCase):
# Issue #9792: errors raised by create_connection() should have
# a proper errno attribute.
port = support.find_unused_port()
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
socket.create_connection((HOST, port))
# Issue #16257: create_connection() calls getaddrinfo() against
@@ -4328,7 +4516,7 @@ class TCPTimeoutTest(SocketTCPTest):
foo = self.serv.accept()
except socket.timeout:
self.fail("caught timeout instead of error (TCP)")
- except socket.error:
+ except OSError:
ok = True
except:
self.fail("caught unexpected exception (TCP)")
@@ -4385,7 +4573,7 @@ class UDPTimeoutTest(SocketUDPTest):
foo = self.serv.recv(1024)
except socket.timeout:
self.fail("caught timeout instead of error (UDP)")
- except socket.error:
+ except OSError:
ok = True
except:
self.fail("caught unexpected exception (UDP)")
@@ -4395,10 +4583,10 @@ class UDPTimeoutTest(SocketUDPTest):
class TestExceptions(unittest.TestCase):
def testExceptionTree(self):
- self.assertTrue(issubclass(socket.error, Exception))
- self.assertTrue(issubclass(socket.herror, socket.error))
- self.assertTrue(issubclass(socket.gaierror, socket.error))
- self.assertTrue(issubclass(socket.timeout, socket.error))
+ self.assertTrue(issubclass(OSError, Exception))
+ self.assertTrue(issubclass(socket.herror, OSError))
+ self.assertTrue(issubclass(socket.gaierror, OSError))
+ self.assertTrue(issubclass(socket.timeout, OSError))
@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
class TestLinuxAbstractNamespace(unittest.TestCase):
@@ -4425,7 +4613,7 @@ class TestLinuxAbstractNamespace(unittest.TestCase):
def testNameOverflow(self):
address = "\x00" + "h" * self.UNIX_PATH_MAX
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
- self.assertRaises(socket.error, s.bind, address)
+ self.assertRaises(OSError, s.bind, address)
def testStrName(self):
# Check that an abstract name can be passed as a string.
@@ -4682,7 +4870,7 @@ class ContextManagersTest(ThreadedTCPSocketTest):
self.assertTrue(sock._closed)
# exception inside with block
with socket.socket() as sock:
- self.assertRaises(socket.error, sock.sendall, b'foo')
+ self.assertRaises(OSError, sock.sendall, b'foo')
self.assertTrue(sock._closed)
def testCreateConnectionBase(self):
@@ -4710,19 +4898,76 @@ class ContextManagersTest(ThreadedTCPSocketTest):
with socket.create_connection(address) as sock:
sock.close()
self.assertTrue(sock._closed)
- self.assertRaises(socket.error, sock.sendall, b'foo')
+ self.assertRaises(OSError, sock.sendall, b'foo')
-@unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"),
- "SOCK_CLOEXEC not defined")
-@unittest.skipUnless(fcntl, "module fcntl not available")
-class CloexecConstantTest(unittest.TestCase):
+class InheritanceTest(unittest.TestCase):
+ @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"),
+ "SOCK_CLOEXEC not defined")
@support.requires_linux_version(2, 6, 28)
def test_SOCK_CLOEXEC(self):
with socket.socket(socket.AF_INET,
socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
self.assertTrue(s.type & socket.SOCK_CLOEXEC)
- self.assertTrue(fcntl.fcntl(s, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
+ self.assertFalse(s.get_inheritable())
+
+ def test_default_inheritable(self):
+ sock = socket.socket()
+ with sock:
+ self.assertEqual(sock.get_inheritable(), False)
+
+ def test_dup(self):
+ sock = socket.socket()
+ with sock:
+ newsock = sock.dup()
+ sock.close()
+ with newsock:
+ self.assertEqual(newsock.get_inheritable(), False)
+
+ def test_set_inheritable(self):
+ sock = socket.socket()
+ with sock:
+ sock.set_inheritable(True)
+ self.assertEqual(sock.get_inheritable(), True)
+
+ sock.set_inheritable(False)
+ self.assertEqual(sock.get_inheritable(), False)
+
+ @unittest.skipIf(fcntl is None, "need fcntl")
+ def test_get_inheritable_cloexec(self):
+ sock = socket.socket()
+ with sock:
+ fd = sock.fileno()
+ self.assertEqual(sock.get_inheritable(), False)
+
+ # clear FD_CLOEXEC flag
+ flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+ flags &= ~fcntl.FD_CLOEXEC
+ fcntl.fcntl(fd, fcntl.F_SETFD, flags)
+
+ self.assertEqual(sock.get_inheritable(), True)
+
+ @unittest.skipIf(fcntl is None, "need fcntl")
+ def test_set_inheritable_cloexec(self):
+ sock = socket.socket()
+ with sock:
+ fd = sock.fileno()
+ self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
+ fcntl.FD_CLOEXEC)
+
+ sock.set_inheritable(True)
+ self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
+ 0)
+
+
+ @unittest.skipUnless(hasattr(socket, "socketpair"),
+ "need socket.socketpair()")
+ def test_socketpair(self):
+ s1, s2 = socket.socketpair()
+ self.addCleanup(s1.close)
+ self.addCleanup(s2.close)
+ self.assertEqual(s1.get_inheritable(), False)
+ self.assertEqual(s2.get_inheritable(), False)
@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"),
@@ -4891,7 +5136,7 @@ def test_main():
NetworkConnectionAttributesTest,
NetworkConnectionBehaviourTest,
ContextManagersTest,
- CloexecConstantTest,
+ InheritanceTest,
NonblockConstantTest
])
tests.append(BasicSocketPairTest)