summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_socket.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_socket.py')
-rw-r--r--Lib/test/test_socket.py247
1 files changed, 167 insertions, 80 deletions
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 0048e3d..54fb9a1 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -2,7 +2,6 @@
import unittest
from test import support
-from unittest.case import _ExpectedFailure
import errno
import io
@@ -46,7 +45,7 @@ def _have_socket_can():
"""Check whether CAN sockets are supported on this host."""
try:
s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
- except (AttributeError, socket.error, OSError):
+ except (AttributeError, OSError):
return False
else:
s.close()
@@ -121,12 +120,42 @@ class SocketCANTest(unittest.TestCase):
interface = 'vcan0'
bufsize = 128
+ """The CAN frame structure is defined in <linux/can.h>:
+
+ struct can_frame {
+ canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */
+ __u8 can_dlc; /* data length code: 0 .. 8 */
+ __u8 data[8] __attribute__((aligned(8)));
+ };
+ """
+ can_frame_fmt = "=IB3x8s"
+ can_frame_size = struct.calcsize(can_frame_fmt)
+
+ """The Broadcast Management Command frame structure is defined
+ in <linux/can/bcm.h>:
+
+ struct bcm_msg_head {
+ __u32 opcode;
+ __u32 flags;
+ __u32 count;
+ struct timeval ival1, ival2;
+ canid_t can_id;
+ __u32 nframes;
+ struct can_frame frames[0];
+ }
+
+ `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see
+ `struct can_frame` definition). Must use native not standard types for packing.
+ """
+ bcm_cmd_msg_fmt = "@3I4l2I"
+ bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8)
+
def setUp(self):
self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
self.addCleanup(self.s.close)
try:
self.s.bind((self.interface,))
- except socket.error:
+ except OSError:
self.skipTest('network interface `%s` does not exist' %
self.interface)
@@ -242,9 +271,6 @@ class ThreadableTest:
raise TypeError("test_func must be a callable function")
try:
test_func()
- except _ExpectedFailure:
- # We deliberately ignore expected failures
- pass
except BaseException as e:
self.queue.put(e)
finally:
@@ -295,7 +321,7 @@ class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
try:
self.cli.bind((self.interface,))
- except socket.error:
+ except OSError:
# skipTest should not be called here, and will be called in the
# server instead
pass
@@ -608,7 +634,7 @@ def requireSocket(*args):
for obj in args]
try:
s = socket.socket(*callargs)
- except socket.error as e:
+ except OSError as e:
# XXX: check errno?
err = str(e)
else:
@@ -626,8 +652,17 @@ class GeneralModuleTests(unittest.TestCase):
def test_repr(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.addCleanup(s.close)
- self.assertTrue(repr(s).startswith("<socket.socket object"))
+ with s:
+ self.assertIn('fd=%i' % s.fileno(), repr(s))
+ self.assertIn('family=%i' % socket.AF_INET, repr(s))
+ self.assertIn('type=%i' % socket.SOCK_STREAM, repr(s))
+ self.assertIn('proto=0', repr(s))
+ self.assertNotIn('raddr', repr(s))
+ s.bind(('127.0.0.1', 0))
+ self.assertIn('laddr', repr(s))
+ self.assertIn(str(s.getsockname()), repr(s))
+ self.assertIn('[closed]', repr(s))
+ self.assertNotIn('laddr', repr(s))
def test_weakref(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -645,11 +680,11 @@ class GeneralModuleTests(unittest.TestCase):
def testSocketError(self):
# Testing socket module exceptions
msg = "Error raising socket exception (%s)."
- with self.assertRaises(socket.error, msg=msg % 'socket.error'):
- raise socket.error
- with self.assertRaises(socket.error, msg=msg % 'socket.herror'):
+ with self.assertRaises(OSError, msg=msg % 'OSError'):
+ raise OSError
+ with self.assertRaises(OSError, msg=msg % 'socket.herror'):
raise socket.herror
- with self.assertRaises(socket.error, msg=msg % 'socket.gaierror'):
+ with self.assertRaises(OSError, msg=msg % 'socket.gaierror'):
raise socket.gaierror
def testSendtoErrors(self):
@@ -712,13 +747,13 @@ class GeneralModuleTests(unittest.TestCase):
hostname = socket.gethostname()
try:
ip = socket.gethostbyname(hostname)
- except socket.error:
+ except OSError:
# Probably name lookup wasn't set up right; skip this test
return
self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
try:
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
- except socket.error:
+ except OSError:
# Probably a similar problem as above; skip this test
return
all_host_names = [hostname, hname] + aliases
@@ -732,7 +767,7 @@ class GeneralModuleTests(unittest.TestCase):
oldhn = socket.gethostname()
try:
socket.sethostname('new')
- except socket.error as e:
+ except OSError as e:
if e.errno == errno.EPERM:
self.skipTest("test should be run as root")
else:
@@ -766,8 +801,8 @@ class GeneralModuleTests(unittest.TestCase):
'socket.if_nameindex() not available.')
def testInvalidInterfaceNameIndex(self):
# test nonexistent interface index/name
- self.assertRaises(socket.error, socket.if_indextoname, 0)
- self.assertRaises(socket.error, socket.if_nametoindex, '_DEADBEEF')
+ self.assertRaises(OSError, socket.if_indextoname, 0)
+ self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF')
# test with invalid values
self.assertRaises(TypeError, socket.if_nametoindex, 0)
self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF')
@@ -788,7 +823,7 @@ class GeneralModuleTests(unittest.TestCase):
try:
# On some versions, this crashes the interpreter.
socket.getnameinfo(('x', 0, 0, 0), 0)
- except socket.error:
+ except OSError:
pass
def testNtoH(self):
@@ -835,17 +870,17 @@ class GeneralModuleTests(unittest.TestCase):
try:
port = socket.getservbyname(service, 'tcp')
break
- except socket.error:
+ except OSError:
pass
else:
- raise socket.error
+ raise OSError
# Try same call with optional protocol omitted
port2 = socket.getservbyname(service)
eq(port, port2)
# Try udp, but don't barf if it doesn't exist
try:
udpport = socket.getservbyname(service, 'udp')
- except socket.error:
+ except OSError:
udpport = None
else:
eq(udpport, port)
@@ -901,7 +936,7 @@ class GeneralModuleTests(unittest.TestCase):
g = lambda a: inet_pton(AF_INET, a)
assertInvalid = lambda func,a: self.assertRaises(
- (socket.error, ValueError), func, a
+ (OSError, ValueError), func, a
)
self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0'))
@@ -936,7 +971,7 @@ class GeneralModuleTests(unittest.TestCase):
return
f = lambda a: inet_pton(AF_INET6, a)
assertInvalid = lambda a: self.assertRaises(
- (socket.error, ValueError), f, a
+ (OSError, ValueError), f, a
)
self.assertEqual(b'\x00' * 16, f('::'))
@@ -985,7 +1020,7 @@ class GeneralModuleTests(unittest.TestCase):
from socket import inet_ntoa as f, inet_ntop, AF_INET
g = lambda a: inet_ntop(AF_INET, a)
assertInvalid = lambda func,a: self.assertRaises(
- (socket.error, ValueError), func, a
+ (OSError, ValueError), func, a
)
self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00'))
@@ -1014,7 +1049,7 @@ class GeneralModuleTests(unittest.TestCase):
return
f = lambda a: inet_ntop(AF_INET6, a)
assertInvalid = lambda a: self.assertRaises(
- (socket.error, ValueError), f, a
+ (OSError, ValueError), f, a
)
self.assertEqual('::', f(b'\x00' * 16))
@@ -1042,7 +1077,7 @@ class GeneralModuleTests(unittest.TestCase):
# At least for eCos. This is required for the S/390 to pass.
try:
my_ip_addr = socket.gethostbyname(socket.gethostname())
- except socket.error:
+ except OSError:
# Probably name lookup wasn't set up right; skip this test
return
self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
@@ -1069,7 +1104,7 @@ class GeneralModuleTests(unittest.TestCase):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.close()
- self.assertRaises(socket.error, sock.send, b"spam")
+ self.assertRaises(OSError, sock.send, b"spam")
def testNewAttributes(self):
# testing .family, .type and .protocol
@@ -1172,7 +1207,7 @@ class GeneralModuleTests(unittest.TestCase):
def test_getnameinfo(self):
# only IP addresses are allowed
- self.assertRaises(socket.error, socket.getnameinfo, ('mail.python.org',0), 0)
+ self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0)
@unittest.skipUnless(support.is_resource_enabled('network'),
'network is not enabled')
@@ -1295,10 +1330,35 @@ class BasicCANTest(unittest.TestCase):
socket.PF_CAN
socket.CAN_RAW
+ @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
+ 'socket.CAN_BCM required for this test.')
+ def testBCMConstants(self):
+ socket.CAN_BCM
+
+ # opcodes
+ socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task
+ socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task
+ socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task
+ socket.CAN_BCM_TX_SEND # send one CAN frame
+ socket.CAN_BCM_RX_SETUP # create RX content filter subscription
+ socket.CAN_BCM_RX_DELETE # remove RX content filter subscription
+ socket.CAN_BCM_RX_READ # read properties of RX content filter subscription
+ socket.CAN_BCM_TX_STATUS # reply to TX_READ request
+ socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0)
+ socket.CAN_BCM_RX_STATUS # reply to RX_READ request
+ socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent
+ socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change)
+
def testCreateSocket(self):
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
pass
+ @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
+ 'socket.CAN_BCM required for this test.')
+ def testCreateBCMSocket(self):
+ with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s:
+ pass
+
def testBindAny(self):
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
s.bind(('', ))
@@ -1306,7 +1366,7 @@ class BasicCANTest(unittest.TestCase):
def testTooLongInterfaceName(self):
# most systems limit IFNAMSIZ to 16, take 1024 to be sure
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
- self.assertRaisesRegex(socket.error, 'interface name too long',
+ self.assertRaisesRegex(OSError, 'interface name too long',
s.bind, ('x' * 1024,))
@unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
@@ -1331,19 +1391,8 @@ class BasicCANTest(unittest.TestCase):
@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
-@unittest.skipUnless(thread, 'Threading required for this test.')
class CANTest(ThreadedCANSocketTest):
- """The CAN frame structure is defined in <linux/can.h>:
-
- struct can_frame {
- canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */
- __u8 can_dlc; /* data length code: 0 .. 8 */
- __u8 data[8] __attribute__((aligned(8)));
- };
- """
- can_frame_fmt = "=IB3x8s"
-
def __init__(self, methodName='runTest'):
ThreadedCANSocketTest.__init__(self, methodName=methodName)
@@ -1392,6 +1441,46 @@ class CANTest(ThreadedCANSocketTest):
self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
self.cli.send(self.cf2)
+ @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
+ 'socket.CAN_BCM required for this test.')
+ def _testBCM(self):
+ cf, addr = self.cli.recvfrom(self.bufsize)
+ self.assertEqual(self.cf, cf)
+ can_id, can_dlc, data = self.dissect_can_frame(cf)
+ self.assertEqual(self.can_id, can_id)
+ self.assertEqual(self.data, data)
+
+ @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
+ 'socket.CAN_BCM required for this test.')
+ def testBCM(self):
+ bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM)
+ self.addCleanup(bcm.close)
+ bcm.connect((self.interface,))
+ self.can_id = 0x123
+ self.data = bytes([0xc0, 0xff, 0xee])
+ self.cf = self.build_can_frame(self.can_id, self.data)
+ opcode = socket.CAN_BCM_TX_SEND
+ flags = 0
+ count = 0
+ ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0
+ bcm_can_id = 0x0222
+ nframes = 1
+ assert len(self.cf) == 16
+ header = struct.pack(self.bcm_cmd_msg_fmt,
+ opcode,
+ flags,
+ count,
+ ival1_seconds,
+ ival1_usec,
+ ival2_seconds,
+ ival2_usec,
+ bcm_can_id,
+ nframes,
+ )
+ header_plus_frame = header + self.cf
+ bytes_sent = bcm.send(header_plus_frame)
+ self.assertEqual(bytes_sent, len(header_plus_frame))
+
@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
class BasicRDSTest(unittest.TestCase):
@@ -1606,7 +1695,7 @@ class BasicTCPTest(SocketConnectedTest):
self.assertEqual(f, fileno)
# cli_conn cannot be used anymore...
self.assertTrue(self.cli_conn._closed)
- self.assertRaises(socket.error, self.cli_conn.recv, 1024)
+ self.assertRaises(OSError, self.cli_conn.recv, 1024)
self.cli_conn.close()
# ...but we can create another socket using the (still open)
# file descriptor
@@ -1975,7 +2064,7 @@ class SendmsgTests(SendrecvmsgServerTimeoutBase):
def _testSendmsgExcessCmsgReject(self):
if not hasattr(socket, "CMSG_SPACE"):
# Can only send one item
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")])
self.assertIsNone(cm.exception.errno)
self.sendToServer(b"done")
@@ -1986,7 +2075,7 @@ class SendmsgTests(SendrecvmsgServerTimeoutBase):
def _testSendmsgAfterClose(self):
self.cli_sock.close()
- self.assertRaises(socket.error, self.sendmsgToServer, [MSG])
+ self.assertRaises(OSError, self.sendmsgToServer, [MSG])
class SendmsgStreamTests(SendmsgTests):
@@ -2030,7 +2119,7 @@ class SendmsgStreamTests(SendmsgTests):
@testSendmsgDontWait.client_skip
def _testSendmsgDontWait(self):
try:
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
while True:
self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT)
self.assertIn(cm.exception.errno,
@@ -2050,9 +2139,9 @@ class SendmsgConnectionlessTests(SendmsgTests):
pass
def _testSendmsgNoDestAddr(self):
- self.assertRaises(socket.error, self.cli_sock.sendmsg,
+ self.assertRaises(OSError, self.cli_sock.sendmsg,
[MSG])
- self.assertRaises(socket.error, self.cli_sock.sendmsg,
+ self.assertRaises(OSError, self.cli_sock.sendmsg,
[MSG], [], 0, None)
@@ -2138,7 +2227,7 @@ class RecvmsgGenericTests(SendrecvmsgBase):
def testRecvmsgAfterClose(self):
# Check that recvmsg[_into]() fails on a closed socket.
self.serv_sock.close()
- self.assertRaises(socket.error, self.doRecvmsg, self.serv_sock, 1024)
+ self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024)
def _testRecvmsgAfterClose(self):
pass
@@ -2533,8 +2622,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase):
def _testFDPassCMSG_LEN(self):
self.createAndSendFDs(1)
- # Issue #12958: The following test has problems on Mac OS X
- @support.anticipate_failure(sys.platform == "darwin")
+ @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
@requireAttrs(socket, "CMSG_SPACE")
def testFDPassSeparate(self):
# Pass two FDs in two separate arrays. Arrays may be combined
@@ -2544,7 +2632,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase):
maxcmsgs=2)
@testFDPassSeparate.client_skip
- @support.anticipate_failure(sys.platform == "darwin")
+ @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
def _testFDPassSeparate(self):
fd0, fd1 = self.newFDs(2)
self.assertEqual(
@@ -2556,8 +2644,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase):
array.array("i", [fd1]))]),
len(MSG))
- # Issue #12958: The following test has problems on Mac OS X
- @support.anticipate_failure(sys.platform == "darwin")
+ @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
@requireAttrs(socket, "CMSG_SPACE")
def testFDPassSeparateMinSpace(self):
# Pass two FDs in two separate arrays, receiving them into the
@@ -2569,7 +2656,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase):
maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC)
@testFDPassSeparateMinSpace.client_skip
- @support.anticipate_failure(sys.platform == "darwin")
+ @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
def _testFDPassSeparateMinSpace(self):
fd0, fd1 = self.newFDs(2)
self.assertEqual(
@@ -2586,7 +2673,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase):
# call fails, just send msg with no ancillary data.
try:
nbytes = self.sendmsgToServer([msg], ancdata)
- except socket.error as e:
+ except OSError as e:
# Check that it was the system call that failed
self.assertIsInstance(e.errno, int)
nbytes = self.sendmsgToServer([msg])
@@ -2964,7 +3051,7 @@ class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase):
array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
array.array("i", [self.hop_limit]))])
- except socket.error as e:
+ except OSError as e:
self.assertIsInstance(e.errno, int)
nbytes = self.sendmsgToServer(
[MSG],
@@ -3422,10 +3509,10 @@ class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase):
self.serv.settimeout(self.timeout)
def checkInterruptedRecv(self, func, *args, **kwargs):
- # Check that func(*args, **kwargs) raises socket.error with an
+ # Check that func(*args, **kwargs) raises OSError with an
# errno of EINTR when interrupted by a signal.
self.setAlarm(self.alarm_time)
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
func(*args, **kwargs)
self.assertNotIsInstance(cm.exception, socket.timeout)
self.assertEqual(cm.exception.errno, errno.EINTR)
@@ -3482,9 +3569,9 @@ class InterruptedSendTimeoutTest(InterruptedTimeoutBase,
def checkInterruptedSend(self, func, *args, **kwargs):
# Check that func(*args, **kwargs), run in a loop, raises
- # socket.error with an errno of EINTR when interrupted by a
+ # OSError with an errno of EINTR when interrupted by a
# signal.
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
while True:
self.setAlarm(self.alarm_time)
func(*args, **kwargs)
@@ -3581,7 +3668,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
start = time.time()
try:
self.serv.accept()
- except socket.error:
+ except OSError:
pass
end = time.time()
self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
@@ -3606,7 +3693,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
start = time.time()
try:
self.serv.accept()
- except socket.error:
+ except OSError:
pass
end = time.time()
self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.")
@@ -3636,7 +3723,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
self.serv.setblocking(0)
try:
conn, addr = self.serv.accept()
- except socket.error:
+ except OSError:
pass
else:
self.fail("Error trying to do non-blocking accept.")
@@ -3666,7 +3753,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
conn.setblocking(0)
try:
msg = conn.recv(len(MSG))
- except socket.error:
+ except OSError:
pass
else:
self.fail("Error trying to do non-blocking recv.")
@@ -3749,7 +3836,7 @@ class FileObjectClassTestCase(SocketConnectedTest):
# First read raises a timeout
self.assertRaises(socket.timeout, self.read_file.read, 1)
# Second read is disallowed
- with self.assertRaises(IOError) as ctx:
+ with self.assertRaises(OSError) as ctx:
self.read_file.read(1)
self.assertIn("cannot read from timed out object", str(ctx.exception))
@@ -3841,7 +3928,7 @@ class FileObjectClassTestCase(SocketConnectedTest):
self.read_file.close()
self.assertRaises(ValueError, self.read_file.fileno)
self.cli_conn.close()
- self.assertRaises(socket.error, self.cli_conn.getsockname)
+ self.assertRaises(OSError, self.cli_conn.getsockname)
def _testRealClose(self):
pass
@@ -3878,7 +3965,7 @@ class FileObjectInterruptedTestCase(unittest.TestCase):
@staticmethod
def _raise_eintr():
- raise socket.error(errno.EINTR, "interrupted")
+ raise OSError(errno.EINTR, "interrupted")
def _textiowrap_mock_socket(self, mock, buffering=-1):
raw = socket.SocketIO(mock, "r")
@@ -3990,7 +4077,7 @@ class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
self.assertEqual(msg, self.read_msg)
# ...until the file is itself closed
self.read_file.close()
- self.assertRaises(socket.error, self.cli_conn.recv, 1024)
+ self.assertRaises(OSError, self.cli_conn.recv, 1024)
def _testMakefileClose(self):
self.write_file.write(self.write_msg)
@@ -4139,7 +4226,7 @@ class NetworkConnectionNoServer(unittest.TestCase):
port = support.find_unused_port()
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(cli.close)
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
cli.connect((HOST, port))
self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
@@ -4147,7 +4234,7 @@ class NetworkConnectionNoServer(unittest.TestCase):
# Issue #9792: errors raised by create_connection() should have
# a proper errno attribute.
port = support.find_unused_port()
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
socket.create_connection((HOST, port))
# Issue #16257: create_connection() calls getaddrinfo() against
@@ -4295,7 +4382,7 @@ class TCPTimeoutTest(SocketTCPTest):
foo = self.serv.accept()
except socket.timeout:
self.fail("caught timeout instead of error (TCP)")
- except socket.error:
+ except OSError:
ok = True
except:
self.fail("caught unexpected exception (TCP)")
@@ -4352,7 +4439,7 @@ class UDPTimeoutTest(SocketUDPTest):
foo = self.serv.recv(1024)
except socket.timeout:
self.fail("caught timeout instead of error (UDP)")
- except socket.error:
+ except OSError:
ok = True
except:
self.fail("caught unexpected exception (UDP)")
@@ -4362,10 +4449,10 @@ class UDPTimeoutTest(SocketUDPTest):
class TestExceptions(unittest.TestCase):
def testExceptionTree(self):
- self.assertTrue(issubclass(socket.error, Exception))
- self.assertTrue(issubclass(socket.herror, socket.error))
- self.assertTrue(issubclass(socket.gaierror, socket.error))
- self.assertTrue(issubclass(socket.timeout, socket.error))
+ self.assertTrue(issubclass(OSError, Exception))
+ self.assertTrue(issubclass(socket.herror, OSError))
+ self.assertTrue(issubclass(socket.gaierror, OSError))
+ self.assertTrue(issubclass(socket.timeout, OSError))
class TestLinuxAbstractNamespace(unittest.TestCase):
@@ -4391,7 +4478,7 @@ class TestLinuxAbstractNamespace(unittest.TestCase):
def testNameOverflow(self):
address = "\x00" + "h" * self.UNIX_PATH_MAX
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
- self.assertRaises(socket.error, s.bind, address)
+ self.assertRaises(OSError, s.bind, address)
def testStrName(self):
# Check that an abstract name can be passed as a string.
@@ -4630,7 +4717,7 @@ class ContextManagersTest(ThreadedTCPSocketTest):
self.assertTrue(sock._closed)
# exception inside with block
with socket.socket() as sock:
- self.assertRaises(socket.error, sock.sendall, b'foo')
+ self.assertRaises(OSError, sock.sendall, b'foo')
self.assertTrue(sock._closed)
def testCreateConnectionBase(self):
@@ -4658,7 +4745,7 @@ class ContextManagersTest(ThreadedTCPSocketTest):
with socket.create_connection(address) as sock:
sock.close()
self.assertTrue(sock._closed)
- self.assertRaises(socket.error, sock.sendall, b'foo')
+ self.assertRaises(OSError, sock.sendall, b'foo')
@unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"),