diff options
author | Charles-François Natali <neologix@free.fr> | 2011-10-06 17:47:44 (GMT) |
---|---|---|
committer | Charles-François Natali <neologix@free.fr> | 2011-10-06 17:47:44 (GMT) |
commit | 47413c117145c3da7cdb0ded5c05d0d540a26a4a (patch) | |
tree | 8b216bb9a929129d5a9a457c3e0943f551d7b4a4 /Lib | |
parent | 90c30e87be7df19604c920ee24b6888afa91a093 (diff) | |
download | cpython-47413c117145c3da7cdb0ded5c05d0d540a26a4a.zip cpython-47413c117145c3da7cdb0ded5c05d0d540a26a4a.tar.gz cpython-47413c117145c3da7cdb0ded5c05d0d540a26a4a.tar.bz2 |
Issue #10141: socket: add SocketCAN (PF_CAN) support. Initial patch by Matthias
Fuchs, updated by Tiago Gonçalves.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/test/test_socket.py | 164 |
1 files changed, 164 insertions, 0 deletions
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 79160f4..1558570 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -21,6 +21,7 @@ from weakref import proxy import signal import math import pickle +import struct try: import fcntl except ImportError: @@ -36,6 +37,18 @@ except ImportError: thread = None threading = None +def _have_socket_can(): + """Check whether CAN sockets are supported on this host.""" + try: + s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) + except (AttributeError, socket.error, OSError): + return False + else: + s.close() + return True + +HAVE_SOCKET_CAN = _have_socket_can() + # Size in bytes of the int type SIZEOF_INT = array.array("i").itemsize @@ -80,6 +93,30 @@ class ThreadSafeCleanupTestCase(unittest.TestCase): with self._cleanup_lock: return super().doCleanups(*args, **kwargs) +class SocketCANTest(unittest.TestCase): + + """To be able to run this test, a `vcan0` CAN interface can be created with + the following commands: + # modprobe vcan + # ip link add dev vcan0 type vcan + # ifconfig vcan0 up + """ + interface = 'vcan0' + bufsize = 128 + + def setUp(self): + self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) + try: + self.s.bind((self.interface,)) + except socket.error: + self.skipTest('network interface `%s` does not exist' % + self.interface) + self.s.close() + + def tearDown(self): + self.s.close() + self.s = None + class ThreadableTest: """Threadable Test class @@ -210,6 +247,26 @@ class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): self.cli = None ThreadableTest.clientTearDown(self) +class ThreadedCANSocketTest(SocketCANTest, ThreadableTest): + + def __init__(self, methodName='runTest'): + SocketCANTest.__init__(self, methodName=methodName) + ThreadableTest.__init__(self) + + def clientSetUp(self): + self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) + try: + self.cli.bind((self.interface,)) + except socket.error: + self.skipTest('network interface `%s` does not exist' % + self.interface) + self.cli.close() + + def clientTearDown(self): + self.cli.close() + self.cli = None + ThreadableTest.clientTearDown(self) + class SocketConnectedTest(ThreadedTCPSocketTest): """Socket tests for client-server connection. @@ -1072,6 +1129,112 @@ class GeneralModuleTests(unittest.TestCase): srv.close() +@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') +class BasicCANTest(unittest.TestCase): + + def testCrucialConstants(self): + socket.AF_CAN + socket.PF_CAN + socket.CAN_RAW + + def testCreateSocket(self): + with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: + pass + + def testBindAny(self): + with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: + s.bind(('', )) + + def testTooLongInterfaceName(self): + # most systems limit IFNAMSIZ to 16, take 1024 to be sure + with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: + self.assertRaisesRegexp(socket.error, 'interface name too long', + s.bind, ('x' * 1024,)) + + @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"), + 'socket.CAN_RAW_LOOPBACK required for this test.') + def testLoopback(self): + with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: + for loopback in (0, 1): + s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK, + loopback) + self.assertEqual(loopback, + s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK)) + + @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"), + 'socket.CAN_RAW_FILTER required for this test.') + def testFilter(self): + can_id, can_mask = 0x200, 0x700 + can_filter = struct.pack("=II", can_id, can_mask) + with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: + s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter) + self.assertEqual(can_filter, + s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8)) + + +@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') +@unittest.skipUnless(thread, 'Threading required for this test.') +class CANTest(ThreadedCANSocketTest): + + """The CAN frame structure is defined in <linux/can.h>: + + struct can_frame { + canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */ + __u8 can_dlc; /* data length code: 0 .. 8 */ + __u8 data[8] __attribute__((aligned(8))); + }; + """ + can_frame_fmt = "=IB3x8s" + + def __init__(self, methodName='runTest'): + ThreadedCANSocketTest.__init__(self, methodName=methodName) + + @classmethod + def build_can_frame(cls, can_id, data): + """Build a CAN frame.""" + can_dlc = len(data) + data = data.ljust(8, b'\x00') + return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data) + + @classmethod + def dissect_can_frame(cls, frame): + """Dissect a CAN frame.""" + can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame) + return (can_id, can_dlc, data[:can_dlc]) + + def testSendFrame(self): + cf, addr = self.s.recvfrom(self.bufsize) + self.assertEqual(self.cf, cf) + self.assertEqual(addr[0], self.interface) + self.assertEqual(addr[1], socket.AF_CAN) + + def _testSendFrame(self): + self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05') + self.cli.send(self.cf) + + def testSendMaxFrame(self): + cf, addr = self.s.recvfrom(self.bufsize) + self.assertEqual(self.cf, cf) + + def _testSendMaxFrame(self): + self.cf = self.build_can_frame(0x00, b'\x07' * 8) + self.cli.send(self.cf) + + def testSendMultiFrames(self): + cf, addr = self.s.recvfrom(self.bufsize) + self.assertEqual(self.cf1, cf) + + cf, addr = self.s.recvfrom(self.bufsize) + self.assertEqual(self.cf2, cf) + + def _testSendMultiFrames(self): + self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11') + self.cli.send(self.cf1) + + self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33') + self.cli.send(self.cf2) + + @unittest.skipUnless(thread, 'Threading required for this test.') class BasicTCPTest(SocketConnectedTest): @@ -4194,6 +4357,7 @@ def test_main(): if isTipcAvailable(): tests.append(TIPCTest) tests.append(TIPCThreadableTest) + tests.extend([BasicCANTest, CANTest]) tests.extend([ CmsgMacroTests, SendmsgUDPTest, |