summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
authorCharles-François Natali <neologix@free.fr>2011-11-10 18:21:37 (GMT)
committerCharles-François Natali <neologix@free.fr>2011-11-10 18:21:37 (GMT)
commit10b8cf4455f48eba93077535e4d03eba2ee4bbfb (patch)
tree71ffb167f19f8c96177e9840d413c9d27d9a38da /Lib/test
parent0c929d9d397c2a2578a2b016ae9b3d258ab513aa (diff)
downloadcpython-10b8cf4455f48eba93077535e4d03eba2ee4bbfb.zip
cpython-10b8cf4455f48eba93077535e4d03eba2ee4bbfb.tar.gz
cpython-10b8cf4455f48eba93077535e4d03eba2ee4bbfb.tar.bz2
Issue #7777: socket: Add Reliable Datagram Sockets (PF_RDS) support.
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_socket.py159
1 files changed, 159 insertions, 0 deletions
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index d7a521c..b5e0dc2 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -47,8 +47,20 @@ def _have_socket_can():
s.close()
return True
+def _have_socket_rds():
+ """Check whether RDS sockets are supported on this host."""
+ try:
+ s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
+ except (AttributeError, OSError):
+ return False
+ else:
+ s.close()
+ return True
+
HAVE_SOCKET_CAN = _have_socket_can()
+HAVE_SOCKET_RDS = _have_socket_rds()
+
# Size in bytes of the int type
SIZEOF_INT = array.array("i").itemsize
@@ -113,6 +125,23 @@ class SocketCANTest(unittest.TestCase):
self.skipTest('network interface `%s` does not exist' %
self.interface)
+
+class SocketRDSTest(unittest.TestCase):
+
+ """To be able to run this test, the `rds` kernel module must be loaded:
+ # modprobe rds
+ """
+ bufsize = 8192
+
+ def setUp(self):
+ self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
+ self.addCleanup(self.serv.close)
+ try:
+ self.port = support.bind_port(self.serv)
+ except OSError:
+ self.skipTest('unable to bind RDS socket')
+
+
class ThreadableTest:
"""Threadable Test class
@@ -271,6 +300,29 @@ class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
self.cli = None
ThreadableTest.clientTearDown(self)
+class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest):
+
+ def __init__(self, methodName='runTest'):
+ SocketRDSTest.__init__(self, methodName=methodName)
+ ThreadableTest.__init__(self)
+ self.evt = threading.Event()
+
+ def clientSetUp(self):
+ self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
+ try:
+ # RDS sockets must be bound explicitly to send or receive data
+ self.cli.bind((HOST, 0))
+ self.cli_addr = self.cli.getsockname()
+ except OSError:
+ # skipTest should not be called here, and will be called in the
+ # server instead
+ pass
+
+ def clientTearDown(self):
+ self.cli.close()
+ self.cli = None
+ ThreadableTest.clientTearDown(self)
+
class SocketConnectedTest(ThreadedTCPSocketTest):
"""Socket tests for client-server connection.
@@ -1239,6 +1291,112 @@ class CANTest(ThreadedCANSocketTest):
self.cli.send(self.cf2)
+@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
+class BasicRDSTest(unittest.TestCase):
+
+ def testCrucialConstants(self):
+ socket.AF_RDS
+ socket.PF_RDS
+
+ def testCreateSocket(self):
+ with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
+ pass
+
+ def testSocketBufferSize(self):
+ bufsize = 16384
+ with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize)
+
+
+@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RDSTest(ThreadedRDSSocketTest):
+
+ def __init__(self, methodName='runTest'):
+ ThreadedRDSSocketTest.__init__(self, methodName=methodName)
+
+ def testSendAndRecv(self):
+ data, addr = self.serv.recvfrom(self.bufsize)
+ self.assertEqual(self.data, data)
+ self.assertEqual(self.cli_addr, addr)
+
+ def _testSendAndRecv(self):
+ self.data = b'spam'
+ self.cli.sendto(self.data, 0, (HOST, self.port))
+
+ def testPeek(self):
+ data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK)
+ self.assertEqual(self.data, data)
+ data, addr = self.serv.recvfrom(self.bufsize)
+ self.assertEqual(self.data, data)
+
+ def _testPeek(self):
+ self.data = b'spam'
+ self.cli.sendto(self.data, 0, (HOST, self.port))
+
+ @requireAttrs(socket.socket, 'recvmsg')
+ def testSendAndRecvMsg(self):
+ data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize)
+ self.assertEqual(self.data, data)
+
+ @requireAttrs(socket.socket, 'sendmsg')
+ def _testSendAndRecvMsg(self):
+ self.data = b'hello ' * 10
+ self.cli.sendmsg([self.data], (), 0, (HOST, self.port))
+
+ def testSendAndRecvMulti(self):
+ data, addr = self.serv.recvfrom(self.bufsize)
+ self.assertEqual(self.data1, data)
+
+ data, addr = self.serv.recvfrom(self.bufsize)
+ self.assertEqual(self.data2, data)
+
+ def _testSendAndRecvMulti(self):
+ self.data1 = b'bacon'
+ self.cli.sendto(self.data1, 0, (HOST, self.port))
+
+ self.data2 = b'egg'
+ self.cli.sendto(self.data2, 0, (HOST, self.port))
+
+ def testSelect(self):
+ r, w, x = select.select([self.serv], [], [], 3.0)
+ self.assertIn(self.serv, r)
+ data, addr = self.serv.recvfrom(self.bufsize)
+ self.assertEqual(self.data, data)
+
+ def _testSelect(self):
+ self.data = b'select'
+ self.cli.sendto(self.data, 0, (HOST, self.port))
+
+ def testCongestion(self):
+ # wait until the sender is done
+ self.evt.wait()
+
+ def _testCongestion(self):
+ # test the behavior in case of congestion
+ self.data = b'fill'
+ self.cli.setblocking(False)
+ try:
+ # try to lower the receiver's socket buffer size
+ self.cli.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16384)
+ except OSError:
+ pass
+ with self.assertRaises(OSError) as cm:
+ try:
+ # fill the receiver's socket buffer
+ while True:
+ self.cli.sendto(self.data, 0, (HOST, self.port))
+ finally:
+ # signal the receiver we're done
+ self.evt.set()
+ # sendto() should have failed with ENOBUFS
+ self.assertEqual(cm.exception.errno, errno.ENOBUFS)
+ # and we should have received a congestion notification through poll
+ r, w, x = select.select([self.serv], [], [], 3.0)
+ self.assertIn(self.serv, r)
+
+
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicTCPTest(SocketConnectedTest):
@@ -4362,6 +4520,7 @@ def test_main():
tests.append(TIPCTest)
tests.append(TIPCThreadableTest)
tests.extend([BasicCANTest, CANTest])
+ tests.extend([BasicRDSTest, RDSTest])
tests.extend([
CmsgMacroTests,
SendmsgUDPTest,