summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_socket.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_socket.py')
-rw-r--r--Lib/test/test_socket.py112
1 files changed, 56 insertions, 56 deletions
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 0ce9729..c0ccb5d 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -46,7 +46,7 @@ def _have_socket_can():
"""Check whether CAN sockets are supported on this host."""
try:
s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
- except (AttributeError, socket.error, OSError):
+ except (AttributeError, OSError):
return False
else:
s.close()
@@ -126,7 +126,7 @@ class SocketCANTest(unittest.TestCase):
self.addCleanup(self.s.close)
try:
self.s.bind((self.interface,))
- except socket.error:
+ except OSError:
self.skipTest('network interface `%s` does not exist' %
self.interface)
@@ -295,7 +295,7 @@ class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
try:
self.cli.bind((self.interface,))
- except socket.error:
+ except OSError:
# skipTest should not be called here, and will be called in the
# server instead
pass
@@ -608,7 +608,7 @@ def requireSocket(*args):
for obj in args]
try:
s = socket.socket(*callargs)
- except socket.error as e:
+ except OSError as e:
# XXX: check errno?
err = str(e)
else:
@@ -645,11 +645,11 @@ class GeneralModuleTests(unittest.TestCase):
def testSocketError(self):
# Testing socket module exceptions
msg = "Error raising socket exception (%s)."
- with self.assertRaises(socket.error, msg=msg % 'socket.error'):
- raise socket.error
- with self.assertRaises(socket.error, msg=msg % 'socket.herror'):
+ with self.assertRaises(OSError, msg=msg % 'OSError'):
+ raise OSError
+ with self.assertRaises(OSError, msg=msg % 'socket.herror'):
raise socket.herror
- with self.assertRaises(socket.error, msg=msg % 'socket.gaierror'):
+ with self.assertRaises(OSError, msg=msg % 'socket.gaierror'):
raise socket.gaierror
def testSendtoErrors(self):
@@ -712,13 +712,13 @@ class GeneralModuleTests(unittest.TestCase):
hostname = socket.gethostname()
try:
ip = socket.gethostbyname(hostname)
- except socket.error:
+ except OSError:
# Probably name lookup wasn't set up right; skip this test
return
self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
try:
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
- except socket.error:
+ except OSError:
# Probably a similar problem as above; skip this test
return
all_host_names = [hostname, hname] + aliases
@@ -732,7 +732,7 @@ class GeneralModuleTests(unittest.TestCase):
oldhn = socket.gethostname()
try:
socket.sethostname('new')
- except socket.error as e:
+ except OSError as e:
if e.errno == errno.EPERM:
self.skipTest("test should be run as root")
else:
@@ -766,8 +766,8 @@ class GeneralModuleTests(unittest.TestCase):
'socket.if_nameindex() not available.')
def testInvalidInterfaceNameIndex(self):
# test nonexistent interface index/name
- self.assertRaises(socket.error, socket.if_indextoname, 0)
- self.assertRaises(socket.error, socket.if_nametoindex, '_DEADBEEF')
+ self.assertRaises(OSError, socket.if_indextoname, 0)
+ self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF')
# test with invalid values
self.assertRaises(TypeError, socket.if_nametoindex, 0)
self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF')
@@ -788,7 +788,7 @@ class GeneralModuleTests(unittest.TestCase):
try:
# On some versions, this crashes the interpreter.
socket.getnameinfo(('x', 0, 0, 0), 0)
- except socket.error:
+ except OSError:
pass
def testNtoH(self):
@@ -835,17 +835,17 @@ class GeneralModuleTests(unittest.TestCase):
try:
port = socket.getservbyname(service, 'tcp')
break
- except socket.error:
+ except OSError:
pass
else:
- raise socket.error
+ raise OSError
# Try same call with optional protocol omitted
port2 = socket.getservbyname(service)
eq(port, port2)
# Try udp, but don't barf it it doesn't exist
try:
udpport = socket.getservbyname(service, 'udp')
- except socket.error:
+ except OSError:
udpport = None
else:
eq(udpport, port)
@@ -901,7 +901,7 @@ class GeneralModuleTests(unittest.TestCase):
g = lambda a: inet_pton(AF_INET, a)
assertInvalid = lambda func,a: self.assertRaises(
- (socket.error, ValueError), func, a
+ (OSError, ValueError), func, a
)
self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0'))
@@ -936,7 +936,7 @@ class GeneralModuleTests(unittest.TestCase):
return
f = lambda a: inet_pton(AF_INET6, a)
assertInvalid = lambda a: self.assertRaises(
- (socket.error, ValueError), f, a
+ (OSError, ValueError), f, a
)
self.assertEqual(b'\x00' * 16, f('::'))
@@ -985,7 +985,7 @@ class GeneralModuleTests(unittest.TestCase):
from socket import inet_ntoa as f, inet_ntop, AF_INET
g = lambda a: inet_ntop(AF_INET, a)
assertInvalid = lambda func,a: self.assertRaises(
- (socket.error, ValueError), func, a
+ (OSError, ValueError), func, a
)
self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00'))
@@ -1014,7 +1014,7 @@ class GeneralModuleTests(unittest.TestCase):
return
f = lambda a: inet_ntop(AF_INET6, a)
assertInvalid = lambda a: self.assertRaises(
- (socket.error, ValueError), f, a
+ (OSError, ValueError), f, a
)
self.assertEqual('::', f(b'\x00' * 16))
@@ -1042,7 +1042,7 @@ class GeneralModuleTests(unittest.TestCase):
# At least for eCos. This is required for the S/390 to pass.
try:
my_ip_addr = socket.gethostbyname(socket.gethostname())
- except socket.error:
+ except OSError:
# Probably name lookup wasn't set up right; skip this test
return
self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
@@ -1069,7 +1069,7 @@ class GeneralModuleTests(unittest.TestCase):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.close()
- self.assertRaises(socket.error, sock.send, b"spam")
+ self.assertRaises(OSError, sock.send, b"spam")
def testNewAttributes(self):
# testing .family, .type and .protocol
@@ -1168,7 +1168,7 @@ class GeneralModuleTests(unittest.TestCase):
def test_getnameinfo(self):
# only IP addresses are allowed
- self.assertRaises(socket.error, socket.getnameinfo, ('mail.python.org',0), 0)
+ self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0)
@unittest.skipUnless(support.is_resource_enabled('network'),
'network is not enabled')
@@ -1296,7 +1296,7 @@ class BasicCANTest(unittest.TestCase):
def testTooLongInterfaceName(self):
# most systems limit IFNAMSIZ to 16, take 1024 to be sure
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
- self.assertRaisesRegex(socket.error, 'interface name too long',
+ self.assertRaisesRegex(OSError, 'interface name too long',
s.bind, ('x' * 1024,))
@unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
@@ -1591,7 +1591,7 @@ class BasicTCPTest(SocketConnectedTest):
self.assertEqual(f, fileno)
# cli_conn cannot be used anymore...
self.assertTrue(self.cli_conn._closed)
- self.assertRaises(socket.error, self.cli_conn.recv, 1024)
+ self.assertRaises(OSError, self.cli_conn.recv, 1024)
self.cli_conn.close()
# ...but we can create another socket using the (still open)
# file descriptor
@@ -1960,7 +1960,7 @@ class SendmsgTests(SendrecvmsgServerTimeoutBase):
def _testSendmsgExcessCmsgReject(self):
if not hasattr(socket, "CMSG_SPACE"):
# Can only send one item
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")])
self.assertIsNone(cm.exception.errno)
self.sendToServer(b"done")
@@ -1971,7 +1971,7 @@ class SendmsgTests(SendrecvmsgServerTimeoutBase):
def _testSendmsgAfterClose(self):
self.cli_sock.close()
- self.assertRaises(socket.error, self.sendmsgToServer, [MSG])
+ self.assertRaises(OSError, self.sendmsgToServer, [MSG])
class SendmsgStreamTests(SendmsgTests):
@@ -2015,7 +2015,7 @@ class SendmsgStreamTests(SendmsgTests):
@testSendmsgDontWait.client_skip
def _testSendmsgDontWait(self):
try:
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
while True:
self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT)
self.assertIn(cm.exception.errno,
@@ -2035,9 +2035,9 @@ class SendmsgConnectionlessTests(SendmsgTests):
pass
def _testSendmsgNoDestAddr(self):
- self.assertRaises(socket.error, self.cli_sock.sendmsg,
+ self.assertRaises(OSError, self.cli_sock.sendmsg,
[MSG])
- self.assertRaises(socket.error, self.cli_sock.sendmsg,
+ self.assertRaises(OSError, self.cli_sock.sendmsg,
[MSG], [], 0, None)
@@ -2123,7 +2123,7 @@ class RecvmsgGenericTests(SendrecvmsgBase):
def testRecvmsgAfterClose(self):
# Check that recvmsg[_into]() fails on a closed socket.
self.serv_sock.close()
- self.assertRaises(socket.error, self.doRecvmsg, self.serv_sock, 1024)
+ self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024)
def _testRecvmsgAfterClose(self):
pass
@@ -2571,7 +2571,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase):
# call fails, just send msg with no ancillary data.
try:
nbytes = self.sendmsgToServer([msg], ancdata)
- except socket.error as e:
+ except OSError as e:
# Check that it was the system call that failed
self.assertIsInstance(e.errno, int)
nbytes = self.sendmsgToServer([msg])
@@ -2949,7 +2949,7 @@ class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase):
array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
array.array("i", [self.hop_limit]))])
- except socket.error as e:
+ except OSError as e:
self.assertIsInstance(e.errno, int)
nbytes = self.sendmsgToServer(
[MSG],
@@ -3396,10 +3396,10 @@ class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase):
self.serv.settimeout(self.timeout)
def checkInterruptedRecv(self, func, *args, **kwargs):
- # Check that func(*args, **kwargs) raises socket.error with an
+ # Check that func(*args, **kwargs) raises OSError with an
# errno of EINTR when interrupted by a signal.
self.setAlarm(self.alarm_time)
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
func(*args, **kwargs)
self.assertNotIsInstance(cm.exception, socket.timeout)
self.assertEqual(cm.exception.errno, errno.EINTR)
@@ -3456,9 +3456,9 @@ class InterruptedSendTimeoutTest(InterruptedTimeoutBase,
def checkInterruptedSend(self, func, *args, **kwargs):
# Check that func(*args, **kwargs), run in a loop, raises
- # socket.error with an errno of EINTR when interrupted by a
+ # OSError with an errno of EINTR when interrupted by a
# signal.
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
while True:
self.setAlarm(self.alarm_time)
func(*args, **kwargs)
@@ -3552,7 +3552,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
start = time.time()
try:
self.serv.accept()
- except socket.error:
+ except OSError:
pass
end = time.time()
self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
@@ -3573,7 +3573,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
start = time.time()
try:
self.serv.accept()
- except socket.error:
+ except OSError:
pass
end = time.time()
self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.")
@@ -3603,7 +3603,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
self.serv.setblocking(0)
try:
conn, addr = self.serv.accept()
- except socket.error:
+ except OSError:
pass
else:
self.fail("Error trying to do non-blocking accept.")
@@ -3633,7 +3633,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
conn.setblocking(0)
try:
msg = conn.recv(len(MSG))
- except socket.error:
+ except OSError:
pass
else:
self.fail("Error trying to do non-blocking recv.")
@@ -3808,7 +3808,7 @@ class FileObjectClassTestCase(SocketConnectedTest):
self.read_file.close()
self.assertRaises(ValueError, self.read_file.fileno)
self.cli_conn.close()
- self.assertRaises(socket.error, self.cli_conn.getsockname)
+ self.assertRaises(OSError, self.cli_conn.getsockname)
def _testRealClose(self):
pass
@@ -3845,7 +3845,7 @@ class FileObjectInterruptedTestCase(unittest.TestCase):
@staticmethod
def _raise_eintr():
- raise socket.error(errno.EINTR, "interrupted")
+ raise OSError(errno.EINTR, "interrupted")
def _textiowrap_mock_socket(self, mock, buffering=-1):
raw = socket.SocketIO(mock, "r")
@@ -3957,7 +3957,7 @@ class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
self.assertEqual(msg, self.read_msg)
# ...until the file is itself closed
self.read_file.close()
- self.assertRaises(socket.error, self.cli_conn.recv, 1024)
+ self.assertRaises(OSError, self.cli_conn.recv, 1024)
def _testMakefileClose(self):
self.write_file.write(self.write_msg)
@@ -4106,7 +4106,7 @@ class NetworkConnectionNoServer(unittest.TestCase):
port = support.find_unused_port()
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(cli.close)
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
cli.connect((HOST, port))
self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
@@ -4114,7 +4114,7 @@ class NetworkConnectionNoServer(unittest.TestCase):
# Issue #9792: errors raised by create_connection() should have
# a proper errno attribute.
port = support.find_unused_port()
- with self.assertRaises(socket.error) as cm:
+ with self.assertRaises(OSError) as cm:
socket.create_connection((HOST, port))
# Issue #16257: create_connection() calls getaddrinfo() against
@@ -4262,7 +4262,7 @@ class TCPTimeoutTest(SocketTCPTest):
foo = self.serv.accept()
except socket.timeout:
self.fail("caught timeout instead of error (TCP)")
- except socket.error:
+ except OSError:
ok = True
except:
self.fail("caught unexpected exception (TCP)")
@@ -4319,7 +4319,7 @@ class UDPTimeoutTest(SocketUDPTest):
foo = self.serv.recv(1024)
except socket.timeout:
self.fail("caught timeout instead of error (UDP)")
- except socket.error:
+ except OSError:
ok = True
except:
self.fail("caught unexpected exception (UDP)")
@@ -4329,10 +4329,10 @@ class UDPTimeoutTest(SocketUDPTest):
class TestExceptions(unittest.TestCase):
def testExceptionTree(self):
- self.assertTrue(issubclass(socket.error, Exception))
- self.assertTrue(issubclass(socket.herror, socket.error))
- self.assertTrue(issubclass(socket.gaierror, socket.error))
- self.assertTrue(issubclass(socket.timeout, socket.error))
+ self.assertTrue(issubclass(OSError, Exception))
+ self.assertTrue(issubclass(socket.herror, OSError))
+ self.assertTrue(issubclass(socket.gaierror, OSError))
+ self.assertTrue(issubclass(socket.timeout, OSError))
class TestLinuxAbstractNamespace(unittest.TestCase):
@@ -4358,7 +4358,7 @@ class TestLinuxAbstractNamespace(unittest.TestCase):
def testNameOverflow(self):
address = "\x00" + "h" * self.UNIX_PATH_MAX
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
- self.assertRaises(socket.error, s.bind, address)
+ self.assertRaises(OSError, s.bind, address)
def testStrName(self):
# Check that an abstract name can be passed as a string.
@@ -4597,7 +4597,7 @@ class ContextManagersTest(ThreadedTCPSocketTest):
self.assertTrue(sock._closed)
# exception inside with block
with socket.socket() as sock:
- self.assertRaises(socket.error, sock.sendall, b'foo')
+ self.assertRaises(OSError, sock.sendall, b'foo')
self.assertTrue(sock._closed)
def testCreateConnectionBase(self):
@@ -4625,7 +4625,7 @@ class ContextManagersTest(ThreadedTCPSocketTest):
with socket.create_connection(address) as sock:
sock.close()
self.assertTrue(sock._closed)
- self.assertRaises(socket.error, sock.sendall, b'foo')
+ self.assertRaises(OSError, sock.sendall, b'foo')
@unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"),