diff options
Diffstat (limited to 'Lib/test/test_socket.py')
| -rw-r--r-- | Lib/test/test_socket.py | 220 | 
1 files changed, 152 insertions, 68 deletions
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index da6ef05..19377d0 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -46,7 +46,7 @@ def _have_socket_can():      """Check whether CAN sockets are supported on this host."""      try:          s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) -    except (AttributeError, socket.error, OSError): +    except (AttributeError, OSError):          return False      else:          s.close() @@ -121,12 +121,42 @@ class SocketCANTest(unittest.TestCase):      interface = 'vcan0'      bufsize = 128 +    """The CAN frame structure is defined in <linux/can.h>: + +    struct can_frame { +        canid_t can_id;  /* 32 bit CAN_ID + EFF/RTR/ERR flags */ +        __u8    can_dlc; /* data length code: 0 .. 8 */ +        __u8    data[8] __attribute__((aligned(8))); +    }; +    """ +    can_frame_fmt = "=IB3x8s" +    can_frame_size = struct.calcsize(can_frame_fmt) + +    """The Broadcast Management Command frame structure is defined +    in <linux/can/bcm.h>: + +    struct bcm_msg_head { +        __u32 opcode; +        __u32 flags; +        __u32 count; +        struct timeval ival1, ival2; +        canid_t can_id; +        __u32 nframes; +        struct can_frame frames[0]; +    } + +    `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see +    `struct can_frame` definition). Must use native not standard types for packing. +    """ +    bcm_cmd_msg_fmt = "@3I4l2I" +    bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8) +      def setUp(self):          self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)          self.addCleanup(self.s.close)          try:              self.s.bind((self.interface,)) -        except socket.error: +        except OSError:              self.skipTest('network interface `%s` does not exist' %                             self.interface) @@ -295,7 +325,7 @@ class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):          self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)          try:              self.cli.bind((self.interface,)) -        except socket.error: +        except OSError:              # skipTest should not be called here, and will be called in the              # server instead              pass @@ -608,7 +638,7 @@ def requireSocket(*args):                      for obj in args]          try:              s = socket.socket(*callargs) -        except socket.error as e: +        except OSError as e:              # XXX: check errno?              err = str(e)          else: @@ -645,11 +675,11 @@ class GeneralModuleTests(unittest.TestCase):      def testSocketError(self):          # Testing socket module exceptions          msg = "Error raising socket exception (%s)." -        with self.assertRaises(socket.error, msg=msg % 'socket.error'): -            raise socket.error -        with self.assertRaises(socket.error, msg=msg % 'socket.herror'): +        with self.assertRaises(OSError, msg=msg % 'OSError'): +            raise OSError +        with self.assertRaises(OSError, msg=msg % 'socket.herror'):              raise socket.herror -        with self.assertRaises(socket.error, msg=msg % 'socket.gaierror'): +        with self.assertRaises(OSError, msg=msg % 'socket.gaierror'):              raise socket.gaierror      def testSendtoErrors(self): @@ -712,13 +742,13 @@ class GeneralModuleTests(unittest.TestCase):          hostname = socket.gethostname()          try:              ip = socket.gethostbyname(hostname) -        except socket.error: +        except OSError:              # Probably name lookup wasn't set up right; skip this test              return          self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")          try:              hname, aliases, ipaddrs = socket.gethostbyaddr(ip) -        except socket.error: +        except OSError:              # Probably a similar problem as above; skip this test              return          all_host_names = [hostname, hname] + aliases @@ -732,7 +762,7 @@ class GeneralModuleTests(unittest.TestCase):          oldhn = socket.gethostname()          try:              socket.sethostname('new') -        except socket.error as e: +        except OSError as e:              if e.errno == errno.EPERM:                  self.skipTest("test should be run as root")              else: @@ -766,8 +796,8 @@ class GeneralModuleTests(unittest.TestCase):                           'socket.if_nameindex() not available.')      def testInvalidInterfaceNameIndex(self):          # test nonexistent interface index/name -        self.assertRaises(socket.error, socket.if_indextoname, 0) -        self.assertRaises(socket.error, socket.if_nametoindex, '_DEADBEEF') +        self.assertRaises(OSError, socket.if_indextoname, 0) +        self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF')          # test with invalid values          self.assertRaises(TypeError, socket.if_nametoindex, 0)          self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') @@ -788,7 +818,7 @@ class GeneralModuleTests(unittest.TestCase):          try:              # On some versions, this crashes the interpreter.              socket.getnameinfo(('x', 0, 0, 0), 0) -        except socket.error: +        except OSError:              pass      def testNtoH(self): @@ -835,17 +865,17 @@ class GeneralModuleTests(unittest.TestCase):              try:                  port = socket.getservbyname(service, 'tcp')                  break -            except socket.error: +            except OSError:                  pass          else: -            raise socket.error +            raise OSError          # Try same call with optional protocol omitted          port2 = socket.getservbyname(service)          eq(port, port2)          # Try udp, but don't barf it it doesn't exist          try:              udpport = socket.getservbyname(service, 'udp') -        except socket.error: +        except OSError:              udpport = None          else:              eq(udpport, port) @@ -901,7 +931,7 @@ class GeneralModuleTests(unittest.TestCase):          g = lambda a: inet_pton(AF_INET, a)          assertInvalid = lambda func,a: self.assertRaises( -            (socket.error, ValueError), func, a +            (OSError, ValueError), func, a          )          self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0')) @@ -936,7 +966,7 @@ class GeneralModuleTests(unittest.TestCase):              return          f = lambda a: inet_pton(AF_INET6, a)          assertInvalid = lambda a: self.assertRaises( -            (socket.error, ValueError), f, a +            (OSError, ValueError), f, a          )          self.assertEqual(b'\x00' * 16, f('::')) @@ -985,7 +1015,7 @@ class GeneralModuleTests(unittest.TestCase):          from socket import inet_ntoa as f, inet_ntop, AF_INET          g = lambda a: inet_ntop(AF_INET, a)          assertInvalid = lambda func,a: self.assertRaises( -            (socket.error, ValueError), func, a +            (OSError, ValueError), func, a          )          self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00')) @@ -1014,7 +1044,7 @@ class GeneralModuleTests(unittest.TestCase):              return          f = lambda a: inet_ntop(AF_INET6, a)          assertInvalid = lambda a: self.assertRaises( -            (socket.error, ValueError), f, a +            (OSError, ValueError), f, a          )          self.assertEqual('::', f(b'\x00' * 16)) @@ -1042,7 +1072,7 @@ class GeneralModuleTests(unittest.TestCase):          # At least for eCos.  This is required for the S/390 to pass.          try:              my_ip_addr = socket.gethostbyname(socket.gethostname()) -        except socket.error: +        except OSError:              # Probably name lookup wasn't set up right; skip this test              return          self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) @@ -1069,7 +1099,7 @@ class GeneralModuleTests(unittest.TestCase):          sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)          sock.settimeout(1)          sock.close() -        self.assertRaises(socket.error, sock.send, b"spam") +        self.assertRaises(OSError, sock.send, b"spam")      def testNewAttributes(self):          # testing .family, .type and .protocol @@ -1168,7 +1198,7 @@ class GeneralModuleTests(unittest.TestCase):      def test_getnameinfo(self):          # only IP addresses are allowed -        self.assertRaises(socket.error, socket.getnameinfo, ('mail.python.org',0), 0) +        self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0)      @unittest.skipUnless(support.is_resource_enabled('network'),                           'network is not enabled') @@ -1291,10 +1321,35 @@ class BasicCANTest(unittest.TestCase):          socket.PF_CAN          socket.CAN_RAW +    @unittest.skipUnless(hasattr(socket, "CAN_BCM"), +                         'socket.CAN_BCM required for this test.') +    def testBCMConstants(self): +        socket.CAN_BCM + +        # opcodes +        socket.CAN_BCM_TX_SETUP     # create (cyclic) transmission task +        socket.CAN_BCM_TX_DELETE    # remove (cyclic) transmission task +        socket.CAN_BCM_TX_READ      # read properties of (cyclic) transmission task +        socket.CAN_BCM_TX_SEND      # send one CAN frame +        socket.CAN_BCM_RX_SETUP     # create RX content filter subscription +        socket.CAN_BCM_RX_DELETE    # remove RX content filter subscription +        socket.CAN_BCM_RX_READ      # read properties of RX content filter subscription +        socket.CAN_BCM_TX_STATUS    # reply to TX_READ request +        socket.CAN_BCM_TX_EXPIRED   # notification on performed transmissions (count=0) +        socket.CAN_BCM_RX_STATUS    # reply to RX_READ request +        socket.CAN_BCM_RX_TIMEOUT   # cyclic message is absent +        socket.CAN_BCM_RX_CHANGED   # updated CAN frame (detected content change) +      def testCreateSocket(self):          with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:              pass +    @unittest.skipUnless(hasattr(socket, "CAN_BCM"), +                         'socket.CAN_BCM required for this test.') +    def testCreateBCMSocket(self): +        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s: +            pass +      def testBindAny(self):          with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:              s.bind(('', )) @@ -1302,7 +1357,7 @@ class BasicCANTest(unittest.TestCase):      def testTooLongInterfaceName(self):          # most systems limit IFNAMSIZ to 16, take 1024 to be sure          with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: -            self.assertRaisesRegex(socket.error, 'interface name too long', +            self.assertRaisesRegex(OSError, 'interface name too long',                                     s.bind, ('x' * 1024,))      @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"), @@ -1327,19 +1382,8 @@ class BasicCANTest(unittest.TestCase):  @unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') -@unittest.skipUnless(thread, 'Threading required for this test.')  class CANTest(ThreadedCANSocketTest): -    """The CAN frame structure is defined in <linux/can.h>: - -    struct can_frame { -        canid_t can_id;  /* 32 bit CAN_ID + EFF/RTR/ERR flags */ -        __u8    can_dlc; /* data length code: 0 .. 8 */ -        __u8    data[8] __attribute__((aligned(8))); -    }; -    """ -    can_frame_fmt = "=IB3x8s" -      def __init__(self, methodName='runTest'):          ThreadedCANSocketTest.__init__(self, methodName=methodName) @@ -1388,6 +1432,46 @@ class CANTest(ThreadedCANSocketTest):          self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')          self.cli.send(self.cf2) +    @unittest.skipUnless(hasattr(socket, "CAN_BCM"), +                         'socket.CAN_BCM required for this test.') +    def _testBCM(self): +        cf, addr = self.cli.recvfrom(self.bufsize) +        self.assertEqual(self.cf, cf) +        can_id, can_dlc, data = self.dissect_can_frame(cf) +        self.assertEqual(self.can_id, can_id) +        self.assertEqual(self.data, data) + +    @unittest.skipUnless(hasattr(socket, "CAN_BCM"), +                         'socket.CAN_BCM required for this test.') +    def testBCM(self): +        bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) +        self.addCleanup(bcm.close) +        bcm.connect((self.interface,)) +        self.can_id = 0x123 +        self.data = bytes([0xc0, 0xff, 0xee]) +        self.cf = self.build_can_frame(self.can_id, self.data) +        opcode = socket.CAN_BCM_TX_SEND +        flags = 0 +        count = 0 +        ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0 +        bcm_can_id = 0x0222 +        nframes = 1 +        assert len(self.cf) == 16 +        header = struct.pack(self.bcm_cmd_msg_fmt, +                    opcode, +                    flags, +                    count, +                    ival1_seconds, +                    ival1_usec, +                    ival2_seconds, +                    ival2_usec, +                    bcm_can_id, +                    nframes, +                    ) +        header_plus_frame = header + self.cf +        bytes_sent = bcm.send(header_plus_frame) +        self.assertEqual(bytes_sent, len(header_plus_frame)) +  @unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')  class BasicRDSTest(unittest.TestCase): @@ -1602,7 +1686,7 @@ class BasicTCPTest(SocketConnectedTest):          self.assertEqual(f, fileno)          # cli_conn cannot be used anymore...          self.assertTrue(self.cli_conn._closed) -        self.assertRaises(socket.error, self.cli_conn.recv, 1024) +        self.assertRaises(OSError, self.cli_conn.recv, 1024)          self.cli_conn.close()          # ...but we can create another socket using the (still open)          # file descriptor @@ -1971,7 +2055,7 @@ class SendmsgTests(SendrecvmsgServerTimeoutBase):      def _testSendmsgExcessCmsgReject(self):          if not hasattr(socket, "CMSG_SPACE"):              # Can only send one item -            with self.assertRaises(socket.error) as cm: +            with self.assertRaises(OSError) as cm:                  self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")])              self.assertIsNone(cm.exception.errno)          self.sendToServer(b"done") @@ -1982,7 +2066,7 @@ class SendmsgTests(SendrecvmsgServerTimeoutBase):      def _testSendmsgAfterClose(self):          self.cli_sock.close() -        self.assertRaises(socket.error, self.sendmsgToServer, [MSG]) +        self.assertRaises(OSError, self.sendmsgToServer, [MSG])  class SendmsgStreamTests(SendmsgTests): @@ -2026,7 +2110,7 @@ class SendmsgStreamTests(SendmsgTests):      @testSendmsgDontWait.client_skip      def _testSendmsgDontWait(self):          try: -            with self.assertRaises(socket.error) as cm: +            with self.assertRaises(OSError) as cm:                  while True:                      self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT)              self.assertIn(cm.exception.errno, @@ -2046,9 +2130,9 @@ class SendmsgConnectionlessTests(SendmsgTests):          pass      def _testSendmsgNoDestAddr(self): -        self.assertRaises(socket.error, self.cli_sock.sendmsg, +        self.assertRaises(OSError, self.cli_sock.sendmsg,                            [MSG]) -        self.assertRaises(socket.error, self.cli_sock.sendmsg, +        self.assertRaises(OSError, self.cli_sock.sendmsg,                            [MSG], [], 0, None) @@ -2134,7 +2218,7 @@ class RecvmsgGenericTests(SendrecvmsgBase):      def testRecvmsgAfterClose(self):          # Check that recvmsg[_into]() fails on a closed socket.          self.serv_sock.close() -        self.assertRaises(socket.error, self.doRecvmsg, self.serv_sock, 1024) +        self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024)      def _testRecvmsgAfterClose(self):          pass @@ -2582,7 +2666,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase):          # call fails, just send msg with no ancillary data.          try:              nbytes = self.sendmsgToServer([msg], ancdata) -        except socket.error as e: +        except OSError as e:              # Check that it was the system call that failed              self.assertIsInstance(e.errno, int)              nbytes = self.sendmsgToServer([msg]) @@ -2960,7 +3044,7 @@ class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase):                    array.array("i", [self.traffic_class]).tobytes() + b"\x00"),                   (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,                    array.array("i", [self.hop_limit]))]) -        except socket.error as e: +        except OSError as e:              self.assertIsInstance(e.errno, int)              nbytes = self.sendmsgToServer(                  [MSG], @@ -3414,10 +3498,10 @@ class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase):          self.serv.settimeout(self.timeout)      def checkInterruptedRecv(self, func, *args, **kwargs): -        # Check that func(*args, **kwargs) raises socket.error with an +        # Check that func(*args, **kwargs) raises OSError with an          # errno of EINTR when interrupted by a signal.          self.setAlarm(self.alarm_time) -        with self.assertRaises(socket.error) as cm: +        with self.assertRaises(OSError) as cm:              func(*args, **kwargs)          self.assertNotIsInstance(cm.exception, socket.timeout)          self.assertEqual(cm.exception.errno, errno.EINTR) @@ -3474,9 +3558,9 @@ class InterruptedSendTimeoutTest(InterruptedTimeoutBase,      def checkInterruptedSend(self, func, *args, **kwargs):          # Check that func(*args, **kwargs), run in a loop, raises -        # socket.error with an errno of EINTR when interrupted by a +        # OSError with an errno of EINTR when interrupted by a          # signal. -        with self.assertRaises(socket.error) as cm: +        with self.assertRaises(OSError) as cm:              while True:                  self.setAlarm(self.alarm_time)                  func(*args, **kwargs) @@ -3573,7 +3657,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):          start = time.time()          try:              self.serv.accept() -        except socket.error: +        except OSError:              pass          end = time.time()          self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.") @@ -3598,7 +3682,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):              start = time.time()              try:                  self.serv.accept() -            except socket.error: +            except OSError:                  pass              end = time.time()              self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.") @@ -3628,7 +3712,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):          self.serv.setblocking(0)          try:              conn, addr = self.serv.accept() -        except socket.error: +        except OSError:              pass          else:              self.fail("Error trying to do non-blocking accept.") @@ -3658,7 +3742,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):          conn.setblocking(0)          try:              msg = conn.recv(len(MSG)) -        except socket.error: +        except OSError:              pass          else:              self.fail("Error trying to do non-blocking recv.") @@ -3741,7 +3825,7 @@ class FileObjectClassTestCase(SocketConnectedTest):          # First read raises a timeout          self.assertRaises(socket.timeout, self.read_file.read, 1)          # Second read is disallowed -        with self.assertRaises(IOError) as ctx: +        with self.assertRaises(OSError) as ctx:              self.read_file.read(1)          self.assertIn("cannot read from timed out object", str(ctx.exception)) @@ -3833,7 +3917,7 @@ class FileObjectClassTestCase(SocketConnectedTest):          self.read_file.close()          self.assertRaises(ValueError, self.read_file.fileno)          self.cli_conn.close() -        self.assertRaises(socket.error, self.cli_conn.getsockname) +        self.assertRaises(OSError, self.cli_conn.getsockname)      def _testRealClose(self):          pass @@ -3870,7 +3954,7 @@ class FileObjectInterruptedTestCase(unittest.TestCase):      @staticmethod      def _raise_eintr(): -        raise socket.error(errno.EINTR, "interrupted") +        raise OSError(errno.EINTR, "interrupted")      def _textiowrap_mock_socket(self, mock, buffering=-1):          raw = socket.SocketIO(mock, "r") @@ -3982,7 +4066,7 @@ class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):          self.assertEqual(msg, self.read_msg)          # ...until the file is itself closed          self.read_file.close() -        self.assertRaises(socket.error, self.cli_conn.recv, 1024) +        self.assertRaises(OSError, self.cli_conn.recv, 1024)      def _testMakefileClose(self):          self.write_file.write(self.write_msg) @@ -4131,7 +4215,7 @@ class NetworkConnectionNoServer(unittest.TestCase):          port = support.find_unused_port()          cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)          self.addCleanup(cli.close) -        with self.assertRaises(socket.error) as cm: +        with self.assertRaises(OSError) as cm:              cli.connect((HOST, port))          self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) @@ -4139,7 +4223,7 @@ class NetworkConnectionNoServer(unittest.TestCase):          # Issue #9792: errors raised by create_connection() should have          # a proper errno attribute.          port = support.find_unused_port() -        with self.assertRaises(socket.error) as cm: +        with self.assertRaises(OSError) as cm:              socket.create_connection((HOST, port))          # Issue #16257: create_connection() calls getaddrinfo() against @@ -4287,7 +4371,7 @@ class TCPTimeoutTest(SocketTCPTest):              foo = self.serv.accept()          except socket.timeout:              self.fail("caught timeout instead of error (TCP)") -        except socket.error: +        except OSError:              ok = True          except:              self.fail("caught unexpected exception (TCP)") @@ -4344,7 +4428,7 @@ class UDPTimeoutTest(SocketUDPTest):              foo = self.serv.recv(1024)          except socket.timeout:              self.fail("caught timeout instead of error (UDP)") -        except socket.error: +        except OSError:              ok = True          except:              self.fail("caught unexpected exception (UDP)") @@ -4354,10 +4438,10 @@ class UDPTimeoutTest(SocketUDPTest):  class TestExceptions(unittest.TestCase):      def testExceptionTree(self): -        self.assertTrue(issubclass(socket.error, Exception)) -        self.assertTrue(issubclass(socket.herror, socket.error)) -        self.assertTrue(issubclass(socket.gaierror, socket.error)) -        self.assertTrue(issubclass(socket.timeout, socket.error)) +        self.assertTrue(issubclass(OSError, Exception)) +        self.assertTrue(issubclass(socket.herror, OSError)) +        self.assertTrue(issubclass(socket.gaierror, OSError)) +        self.assertTrue(issubclass(socket.timeout, OSError))  class TestLinuxAbstractNamespace(unittest.TestCase): @@ -4383,7 +4467,7 @@ class TestLinuxAbstractNamespace(unittest.TestCase):      def testNameOverflow(self):          address = "\x00" + "h" * self.UNIX_PATH_MAX          with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: -            self.assertRaises(socket.error, s.bind, address) +            self.assertRaises(OSError, s.bind, address)      def testStrName(self):          # Check that an abstract name can be passed as a string. @@ -4622,7 +4706,7 @@ class ContextManagersTest(ThreadedTCPSocketTest):          self.assertTrue(sock._closed)          # exception inside with block          with socket.socket() as sock: -            self.assertRaises(socket.error, sock.sendall, b'foo') +            self.assertRaises(OSError, sock.sendall, b'foo')          self.assertTrue(sock._closed)      def testCreateConnectionBase(self): @@ -4650,7 +4734,7 @@ class ContextManagersTest(ThreadedTCPSocketTest):          with socket.create_connection(address) as sock:              sock.close()          self.assertTrue(sock._closed) -        self.assertRaises(socket.error, sock.sendall, b'foo') +        self.assertRaises(OSError, sock.sendall, b'foo')  @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"),  | 
