summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorCharles-François Natali <neologix@free.fr>2011-10-06 17:47:44 (GMT)
committerCharles-François Natali <neologix@free.fr>2011-10-06 17:47:44 (GMT)
commit47413c117145c3da7cdb0ded5c05d0d540a26a4a (patch)
tree8b216bb9a929129d5a9a457c3e0943f551d7b4a4 /Lib
parent90c30e87be7df19604c920ee24b6888afa91a093 (diff)
downloadcpython-47413c117145c3da7cdb0ded5c05d0d540a26a4a.zip
cpython-47413c117145c3da7cdb0ded5c05d0d540a26a4a.tar.gz
cpython-47413c117145c3da7cdb0ded5c05d0d540a26a4a.tar.bz2
Issue #10141: socket: add SocketCAN (PF_CAN) support. Initial patch by Matthias
Fuchs, updated by Tiago Gonçalves.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/test_socket.py164
1 files changed, 164 insertions, 0 deletions
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 79160f4..1558570 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -21,6 +21,7 @@ from weakref import proxy
import signal
import math
import pickle
+import struct
try:
import fcntl
except ImportError:
@@ -36,6 +37,18 @@ except ImportError:
thread = None
threading = None
+def _have_socket_can():
+ """Check whether CAN sockets are supported on this host."""
+ try:
+ s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
+ except (AttributeError, socket.error, OSError):
+ return False
+ else:
+ s.close()
+ return True
+
+HAVE_SOCKET_CAN = _have_socket_can()
+
# Size in bytes of the int type
SIZEOF_INT = array.array("i").itemsize
@@ -80,6 +93,30 @@ class ThreadSafeCleanupTestCase(unittest.TestCase):
with self._cleanup_lock:
return super().doCleanups(*args, **kwargs)
+class SocketCANTest(unittest.TestCase):
+
+ """To be able to run this test, a `vcan0` CAN interface can be created with
+ the following commands:
+ # modprobe vcan
+ # ip link add dev vcan0 type vcan
+ # ifconfig vcan0 up
+ """
+ interface = 'vcan0'
+ bufsize = 128
+
+ def setUp(self):
+ self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
+ try:
+ self.s.bind((self.interface,))
+ except socket.error:
+ self.skipTest('network interface `%s` does not exist' %
+ self.interface)
+ self.s.close()
+
+ def tearDown(self):
+ self.s.close()
+ self.s = None
+
class ThreadableTest:
"""Threadable Test class
@@ -210,6 +247,26 @@ class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
self.cli = None
ThreadableTest.clientTearDown(self)
+class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
+
+ def __init__(self, methodName='runTest'):
+ SocketCANTest.__init__(self, methodName=methodName)
+ ThreadableTest.__init__(self)
+
+ def clientSetUp(self):
+ self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
+ try:
+ self.cli.bind((self.interface,))
+ except socket.error:
+ self.skipTest('network interface `%s` does not exist' %
+ self.interface)
+ self.cli.close()
+
+ def clientTearDown(self):
+ self.cli.close()
+ self.cli = None
+ ThreadableTest.clientTearDown(self)
+
class SocketConnectedTest(ThreadedTCPSocketTest):
"""Socket tests for client-server connection.
@@ -1072,6 +1129,112 @@ class GeneralModuleTests(unittest.TestCase):
srv.close()
+@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
+class BasicCANTest(unittest.TestCase):
+
+ def testCrucialConstants(self):
+ socket.AF_CAN
+ socket.PF_CAN
+ socket.CAN_RAW
+
+ def testCreateSocket(self):
+ with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+ pass
+
+ def testBindAny(self):
+ with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+ s.bind(('', ))
+
+ def testTooLongInterfaceName(self):
+ # most systems limit IFNAMSIZ to 16, take 1024 to be sure
+ with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+ self.assertRaisesRegexp(socket.error, 'interface name too long',
+ s.bind, ('x' * 1024,))
+
+ @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
+ 'socket.CAN_RAW_LOOPBACK required for this test.')
+ def testLoopback(self):
+ with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+ for loopback in (0, 1):
+ s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK,
+ loopback)
+ self.assertEqual(loopback,
+ s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK))
+
+ @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"),
+ 'socket.CAN_RAW_FILTER required for this test.')
+ def testFilter(self):
+ can_id, can_mask = 0x200, 0x700
+ can_filter = struct.pack("=II", can_id, can_mask)
+ with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+ s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter)
+ self.assertEqual(can_filter,
+ s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8))
+
+
+@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class CANTest(ThreadedCANSocketTest):
+
+ """The CAN frame structure is defined in <linux/can.h>:
+
+ struct can_frame {
+ canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */
+ __u8 can_dlc; /* data length code: 0 .. 8 */
+ __u8 data[8] __attribute__((aligned(8)));
+ };
+ """
+ can_frame_fmt = "=IB3x8s"
+
+ def __init__(self, methodName='runTest'):
+ ThreadedCANSocketTest.__init__(self, methodName=methodName)
+
+ @classmethod
+ def build_can_frame(cls, can_id, data):
+ """Build a CAN frame."""
+ can_dlc = len(data)
+ data = data.ljust(8, b'\x00')
+ return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data)
+
+ @classmethod
+ def dissect_can_frame(cls, frame):
+ """Dissect a CAN frame."""
+ can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame)
+ return (can_id, can_dlc, data[:can_dlc])
+
+ def testSendFrame(self):
+ cf, addr = self.s.recvfrom(self.bufsize)
+ self.assertEqual(self.cf, cf)
+ self.assertEqual(addr[0], self.interface)
+ self.assertEqual(addr[1], socket.AF_CAN)
+
+ def _testSendFrame(self):
+ self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05')
+ self.cli.send(self.cf)
+
+ def testSendMaxFrame(self):
+ cf, addr = self.s.recvfrom(self.bufsize)
+ self.assertEqual(self.cf, cf)
+
+ def _testSendMaxFrame(self):
+ self.cf = self.build_can_frame(0x00, b'\x07' * 8)
+ self.cli.send(self.cf)
+
+ def testSendMultiFrames(self):
+ cf, addr = self.s.recvfrom(self.bufsize)
+ self.assertEqual(self.cf1, cf)
+
+ cf, addr = self.s.recvfrom(self.bufsize)
+ self.assertEqual(self.cf2, cf)
+
+ def _testSendMultiFrames(self):
+ self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11')
+ self.cli.send(self.cf1)
+
+ self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
+ self.cli.send(self.cf2)
+
+
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicTCPTest(SocketConnectedTest):
@@ -4194,6 +4357,7 @@ def test_main():
if isTipcAvailable():
tests.append(TIPCTest)
tests.append(TIPCThreadableTest)
+ tests.extend([BasicCANTest, CANTest])
tests.extend([
CmsgMacroTests,
SendmsgUDPTest,