diff options
Diffstat (limited to 'Lib/test/test_socket.py')
-rwxr-xr-x | Lib/test/test_socket.py | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 87ae2e1..4a436cf 100755 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -80,6 +80,16 @@ def _have_socket_can_isotp(): s.close() return True +def _have_socket_can_j1939(): + """Check whether CAN J1939 sockets are supported on this host.""" + try: + s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) + except (AttributeError, OSError): + return False + else: + s.close() + return True + def _have_socket_rds(): """Check whether RDS sockets are supported on this host.""" try: @@ -143,6 +153,8 @@ HAVE_SOCKET_CAN = _have_socket_can() HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp() +HAVE_SOCKET_CAN_J1939 = _have_socket_can_j1939() + HAVE_SOCKET_RDS = _have_socket_rds() HAVE_SOCKET_ALG = _have_socket_alg() @@ -2117,6 +2129,68 @@ class ISOTPTest(unittest.TestCase): raise +@unittest.skipUnless(HAVE_SOCKET_CAN_J1939, 'CAN J1939 required for this test.') +class J1939Test(unittest.TestCase): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.interface = "vcan0" + + @unittest.skipUnless(hasattr(socket, "CAN_J1939"), + 'socket.CAN_J1939 required for this test.') + def testJ1939Constants(self): + socket.CAN_J1939 + + socket.J1939_MAX_UNICAST_ADDR + socket.J1939_IDLE_ADDR + socket.J1939_NO_ADDR + socket.J1939_NO_NAME + socket.J1939_PGN_REQUEST + socket.J1939_PGN_ADDRESS_CLAIMED + socket.J1939_PGN_ADDRESS_COMMANDED + socket.J1939_PGN_PDU1_MAX + socket.J1939_PGN_MAX + socket.J1939_NO_PGN + + # J1939 socket options + socket.SO_J1939_FILTER + socket.SO_J1939_PROMISC + socket.SO_J1939_SEND_PRIO + socket.SO_J1939_ERRQUEUE + + socket.SCM_J1939_DEST_ADDR + socket.SCM_J1939_DEST_NAME + socket.SCM_J1939_PRIO + socket.SCM_J1939_ERRQUEUE + + socket.J1939_NLA_PAD + socket.J1939_NLA_BYTES_ACKED + + socket.J1939_EE_INFO_NONE + socket.J1939_EE_INFO_TX_ABORT + + socket.J1939_FILTER_MAX + + @unittest.skipUnless(hasattr(socket, "CAN_J1939"), + 'socket.CAN_J1939 required for this test.') + def testCreateJ1939Socket(self): + with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s: + pass + + def testBind(self): + try: + with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s: + addr = self.interface, socket.J1939_NO_NAME, socket.J1939_NO_PGN, socket.J1939_NO_ADDR + s.bind(addr) + self.assertEqual(s.getsockname(), addr) + except OSError as e: + if e.errno == errno.ENODEV: + self.skipTest('network interface `%s` does not exist' % + self.interface) + else: + raise + + @unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') class BasicRDSTest(unittest.TestCase): |