diff options
Diffstat (limited to 'Lib/test/test_socket.py')
-rw-r--r--[-rwxr-xr-x] | Lib/test/test_socket.py | 5758 |
1 files changed, 516 insertions, 5242 deletions
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 184c67c..988e12a 100755..100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -1,166 +1,61 @@ import unittest -from test import support +from test import test_support import errno -import io import itertools import socket import select -import tempfile import time import traceback -import queue +import Queue import sys import os -import platform import array import contextlib -from weakref import proxy import signal import math -import pickle -import struct -import random -import shutil -import string -import _thread as thread -import threading -try: - import multiprocessing -except ImportError: - multiprocessing = False -try: - import fcntl -except ImportError: - fcntl = None - -HOST = support.HOST -# test unicode string and carriage return -MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') - -VSOCKPORT = 1234 -AIX = platform.system() == "AIX" - +import weakref try: import _socket except ImportError: _socket = None -def get_cid(): - if fcntl is None: - return None - try: - with open("/dev/vsock", "rb") as f: - r = fcntl.ioctl(f, socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID, " ") - except OSError: - return None - else: - return struct.unpack("I", r)[0] - -def _have_socket_can(): - """Check whether CAN sockets are supported on this host.""" - try: - s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) - except (AttributeError, OSError): - return False - else: - s.close() - return True - -def _have_socket_can_isotp(): - """Check whether CAN ISOTP sockets are supported on this host.""" - try: - s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) - except (AttributeError, OSError): - return False - else: - s.close() - return True - -def _have_socket_rds(): - """Check whether RDS sockets are supported on this host.""" - try: - s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) - except (AttributeError, OSError): - return False - else: - s.close() - return True - -def _have_socket_alg(): - """Check whether AF_ALG sockets are supported on this host.""" - try: - s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) - except (AttributeError, OSError): - return False - else: - s.close() - return True -def _have_socket_qipcrtr(): - """Check whether AF_QIPCRTR sockets are supported on this host.""" - try: - s = socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM, 0) - except (AttributeError, OSError): - return False - else: - s.close() - return True - -def _have_socket_vsock(): - """Check whether AF_VSOCK sockets are supported on this host.""" - ret = get_cid() is not None - return ret +MAIN_TIMEOUT = 60.0 -def _have_socket_bluetooth(): - """Check whether AF_BLUETOOTH sockets are supported on this host.""" +def try_address(host, port=0, family=socket.AF_INET): + """Try to bind a socket on the given host:port and return True + if that has been possible.""" try: - # RFCOMM is supported by all platforms with bluetooth support. Windows - # does not support omitting the protocol. - s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) - except (AttributeError, OSError): + sock = socket.socket(family, socket.SOCK_STREAM) + sock.bind((host, port)) + except (socket.error, socket.gaierror): return False else: - s.close() - return True - - -@contextlib.contextmanager -def socket_setdefaulttimeout(timeout): - old_timeout = socket.getdefaulttimeout() - try: - socket.setdefaulttimeout(timeout) - yield - finally: - socket.setdefaulttimeout(old_timeout) + sock.close() + return True +HOST = test_support.HOST +MSG = b'Michael Gilfix was here\n' +SUPPORTS_IPV6 = test_support.IPV6_ENABLED -HAVE_SOCKET_CAN = _have_socket_can() - -HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp() - -HAVE_SOCKET_RDS = _have_socket_rds() - -HAVE_SOCKET_ALG = _have_socket_alg() - -HAVE_SOCKET_QIPCRTR = _have_socket_qipcrtr() - -HAVE_SOCKET_VSOCK = _have_socket_vsock() - -HAVE_SOCKET_UDPLITE = hasattr(socket, "IPPROTO_UDPLITE") - -HAVE_SOCKET_BLUETOOTH = _have_socket_bluetooth() +try: + import thread + import threading +except ImportError: + thread = None + threading = None -# Size in bytes of the int type -SIZEOF_INT = array.array("i").itemsize +HOST = test_support.HOST +MSG = 'Michael Gilfix was here\n' class SocketTCPTest(unittest.TestCase): def setUp(self): self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.port = support.bind_port(self.serv) - self.serv.listen() + self.port = test_support.bind_port(self.serv) + self.serv.listen(1) def tearDown(self): self.serv.close() @@ -170,104 +65,12 @@ class SocketUDPTest(unittest.TestCase): def setUp(self): self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.port = support.bind_port(self.serv) + self.port = test_support.bind_port(self.serv) def tearDown(self): self.serv.close() self.serv = None -class SocketUDPLITETest(SocketUDPTest): - - def setUp(self): - self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) - self.port = support.bind_port(self.serv) - -class ThreadSafeCleanupTestCase(unittest.TestCase): - """Subclass of unittest.TestCase with thread-safe cleanup methods. - - This subclass protects the addCleanup() and doCleanups() methods - with a recursive lock. - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._cleanup_lock = threading.RLock() - - def addCleanup(self, *args, **kwargs): - with self._cleanup_lock: - return super().addCleanup(*args, **kwargs) - - def doCleanups(self, *args, **kwargs): - with self._cleanup_lock: - return super().doCleanups(*args, **kwargs) - -class SocketCANTest(unittest.TestCase): - - """To be able to run this test, a `vcan0` CAN interface can be created with - the following commands: - # modprobe vcan - # ip link add dev vcan0 type vcan - # ifconfig vcan0 up - """ - interface = 'vcan0' - bufsize = 128 - - """The CAN frame structure is defined in <linux/can.h>: - - struct can_frame { - canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */ - __u8 can_dlc; /* data length code: 0 .. 8 */ - __u8 data[8] __attribute__((aligned(8))); - }; - """ - can_frame_fmt = "=IB3x8s" - can_frame_size = struct.calcsize(can_frame_fmt) - - """The Broadcast Management Command frame structure is defined - in <linux/can/bcm.h>: - - struct bcm_msg_head { - __u32 opcode; - __u32 flags; - __u32 count; - struct timeval ival1, ival2; - canid_t can_id; - __u32 nframes; - struct can_frame frames[0]; - } - - `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see - `struct can_frame` definition). Must use native not standard types for packing. - """ - bcm_cmd_msg_fmt = "@3I4l2I" - bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8) - - def setUp(self): - self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) - self.addCleanup(self.s.close) - try: - self.s.bind((self.interface,)) - except OSError: - self.skipTest('network interface `%s` does not exist' % - self.interface) - - -class SocketRDSTest(unittest.TestCase): - - """To be able to run this test, the `rds` kernel module must be loaded: - # modprobe rds - """ - bufsize = 8192 - - def setUp(self): - self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) - self.addCleanup(self.serv.close) - try: - self.port = support.bind_port(self.serv) - except OSError: - self.skipTest('unable to bind RDS socket') - - class ThreadableTest: """Threadable Test class @@ -321,14 +124,10 @@ class ThreadableTest: self.server_ready.set() def _setUp(self): - self.wait_threads = support.wait_threads_exit() - self.wait_threads.__enter__() - self.server_ready = threading.Event() self.client_ready = threading.Event() self.done = threading.Event() - self.queue = queue.Queue(1) - self.server_crashed = False + self.queue = Queue.Queue(1) # Do some munging to start the client test. methodname = self.id() @@ -338,45 +137,30 @@ class ThreadableTest: self.client_thread = thread.start_new_thread( self.clientRun, (test_method,)) - try: - self.__setUp() - except: - self.server_crashed = True - raise - finally: + self.__setUp() + if not self.server_ready.is_set(): self.server_ready.set() self.client_ready.wait() def _tearDown(self): self.__tearDown() self.done.wait() - self.wait_threads.__exit__(None, None, None) - if self.queue.qsize(): - exc = self.queue.get() - raise exc + if not self.queue.empty(): + msg = self.queue.get() + self.fail(msg) def clientRun(self, test_func): self.server_ready.wait() - try: - self.clientSetUp() - except BaseException as e: - self.queue.put(e) - self.clientTearDown() - return - finally: - self.client_ready.set() - if self.server_crashed: - self.clientTearDown() - return - if not hasattr(test_func, '__call__'): - raise TypeError("test_func must be a callable function") + self.clientSetUp() + self.client_ready.set() + if not callable(test_func): + raise TypeError("test_func must be a callable function.") try: test_func() - except BaseException as e: - self.queue.put(e) - finally: - self.clientTearDown() + except Exception, strerror: + self.queue.put(strerror) + self.clientTearDown() def clientSetUp(self): raise NotImplementedError("clientSetUp must be implemented.") @@ -413,105 +197,7 @@ class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): self.cli = None ThreadableTest.clientTearDown(self) -@unittest.skipUnless(HAVE_SOCKET_UDPLITE, - 'UDPLITE sockets required for this test.') -class ThreadedUDPLITESocketTest(SocketUDPLITETest, ThreadableTest): - - def __init__(self, methodName='runTest'): - SocketUDPLITETest.__init__(self, methodName=methodName) - ThreadableTest.__init__(self) - - def clientSetUp(self): - self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) - - def clientTearDown(self): - self.cli.close() - self.cli = None - ThreadableTest.clientTearDown(self) - -class ThreadedCANSocketTest(SocketCANTest, ThreadableTest): - - def __init__(self, methodName='runTest'): - SocketCANTest.__init__(self, methodName=methodName) - ThreadableTest.__init__(self) - - def clientSetUp(self): - self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) - try: - self.cli.bind((self.interface,)) - except OSError: - # skipTest should not be called here, and will be called in the - # server instead - pass - - def clientTearDown(self): - self.cli.close() - self.cli = None - ThreadableTest.clientTearDown(self) - -class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest): - - def __init__(self, methodName='runTest'): - SocketRDSTest.__init__(self, methodName=methodName) - ThreadableTest.__init__(self) - - def clientSetUp(self): - self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) - try: - # RDS sockets must be bound explicitly to send or receive data - self.cli.bind((HOST, 0)) - self.cli_addr = self.cli.getsockname() - except OSError: - # skipTest should not be called here, and will be called in the - # server instead - pass - - def clientTearDown(self): - self.cli.close() - self.cli = None - ThreadableTest.clientTearDown(self) - -@unittest.skipIf(fcntl is None, "need fcntl") -@unittest.skipUnless(HAVE_SOCKET_VSOCK, - 'VSOCK sockets required for this test.') -@unittest.skipUnless(get_cid() != 2, - "This test can only be run on a virtual guest.") -class ThreadedVSOCKSocketStreamTest(unittest.TestCase, ThreadableTest): - - def __init__(self, methodName='runTest'): - unittest.TestCase.__init__(self, methodName=methodName) - ThreadableTest.__init__(self) - - def setUp(self): - self.serv = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) - self.addCleanup(self.serv.close) - self.serv.bind((socket.VMADDR_CID_ANY, VSOCKPORT)) - self.serv.listen() - self.serverExplicitReady() - self.conn, self.connaddr = self.serv.accept() - self.addCleanup(self.conn.close) - - def clientSetUp(self): - time.sleep(0.1) - self.cli = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) - self.addCleanup(self.cli.close) - cid = get_cid() - self.cli.connect((cid, VSOCKPORT)) - - def testStream(self): - msg = self.conn.recv(1024) - self.assertEqual(msg, MSG) - - def _testStream(self): - self.cli.send(MSG) - self.cli.close() - class SocketConnectedTest(ThreadedTCPSocketTest): - """Socket tests for client-server connection. - - self.cli_conn is a client socket connected to the server. The - setUp() method guarantees that it is connected to the server. - """ def __init__(self, methodName='runTest'): ThreadedTCPSocketTest.__init__(self, methodName=methodName) @@ -561,297 +247,29 @@ class SocketPairTest(unittest.TestCase, ThreadableTest): ThreadableTest.clientTearDown(self) -# The following classes are used by the sendmsg()/recvmsg() tests. -# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase -# gives a drop-in replacement for SocketConnectedTest, but different -# address families can be used, and the attributes serv_addr and -# cli_addr will be set to the addresses of the endpoints. - -class SocketTestBase(unittest.TestCase): - """A base class for socket tests. - - Subclasses must provide methods newSocket() to return a new socket - and bindSock(sock) to bind it to an unused address. - - Creates a socket self.serv and sets self.serv_addr to its address. - """ - - def setUp(self): - self.serv = self.newSocket() - self.bindServer() - - def bindServer(self): - """Bind server socket and set self.serv_addr to its address.""" - self.bindSock(self.serv) - self.serv_addr = self.serv.getsockname() - - def tearDown(self): - self.serv.close() - self.serv = None - - -class SocketListeningTestMixin(SocketTestBase): - """Mixin to listen on the server socket.""" - - def setUp(self): - super().setUp() - self.serv.listen() - - -class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase, - ThreadableTest): - """Mixin to add client socket and allow client/server tests. - - Client socket is self.cli and its address is self.cli_addr. See - ThreadableTest for usage information. - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - ThreadableTest.__init__(self) - - def clientSetUp(self): - self.cli = self.newClientSocket() - self.bindClient() - - def newClientSocket(self): - """Return a new socket for use as client.""" - return self.newSocket() - - def bindClient(self): - """Bind client socket and set self.cli_addr to its address.""" - self.bindSock(self.cli) - self.cli_addr = self.cli.getsockname() - - def clientTearDown(self): - self.cli.close() - self.cli = None - ThreadableTest.clientTearDown(self) - - -class ConnectedStreamTestMixin(SocketListeningTestMixin, - ThreadedSocketTestMixin): - """Mixin to allow client/server stream tests with connected client. - - Server's socket representing connection to client is self.cli_conn - and client's connection to server is self.serv_conn. (Based on - SocketConnectedTest.) - """ - - def setUp(self): - super().setUp() - # Indicate explicitly we're ready for the client thread to - # proceed and then perform the blocking call to accept - self.serverExplicitReady() - conn, addr = self.serv.accept() - self.cli_conn = conn - - def tearDown(self): - self.cli_conn.close() - self.cli_conn = None - super().tearDown() - - def clientSetUp(self): - super().clientSetUp() - self.cli.connect(self.serv_addr) - self.serv_conn = self.cli - - def clientTearDown(self): - try: - self.serv_conn.close() - self.serv_conn = None - except AttributeError: - pass - super().clientTearDown() - - -class UnixSocketTestBase(SocketTestBase): - """Base class for Unix-domain socket tests.""" - - # This class is used for file descriptor passing tests, so we - # create the sockets in a private directory so that other users - # can't send anything that might be problematic for a privileged - # user running the tests. - - def setUp(self): - self.dir_path = tempfile.mkdtemp() - self.addCleanup(os.rmdir, self.dir_path) - super().setUp() - - def bindSock(self, sock): - path = tempfile.mktemp(dir=self.dir_path) - support.bind_unix_socket(sock, path) - self.addCleanup(support.unlink, path) - -class UnixStreamBase(UnixSocketTestBase): - """Base class for Unix-domain SOCK_STREAM tests.""" - - def newSocket(self): - return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - - -class InetTestBase(SocketTestBase): - """Base class for IPv4 socket tests.""" - - host = HOST - - def setUp(self): - super().setUp() - self.port = self.serv_addr[1] - - def bindSock(self, sock): - support.bind_port(sock, host=self.host) - -class TCPTestBase(InetTestBase): - """Base class for TCP-over-IPv4 tests.""" - - def newSocket(self): - return socket.socket(socket.AF_INET, socket.SOCK_STREAM) - -class UDPTestBase(InetTestBase): - """Base class for UDP-over-IPv4 tests.""" - - def newSocket(self): - return socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - -class UDPLITETestBase(InetTestBase): - """Base class for UDPLITE-over-IPv4 tests.""" - - def newSocket(self): - return socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) - -class SCTPStreamBase(InetTestBase): - """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode.""" - - def newSocket(self): - return socket.socket(socket.AF_INET, socket.SOCK_STREAM, - socket.IPPROTO_SCTP) - - -class Inet6TestBase(InetTestBase): - """Base class for IPv6 socket tests.""" - - host = support.HOSTv6 - -class UDP6TestBase(Inet6TestBase): - """Base class for UDP-over-IPv6 tests.""" - - def newSocket(self): - return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) - -class UDPLITE6TestBase(Inet6TestBase): - """Base class for UDPLITE-over-IPv6 tests.""" - - def newSocket(self): - return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) - - -# Test-skipping decorators for use with ThreadableTest. - -def skipWithClientIf(condition, reason): - """Skip decorated test if condition is true, add client_skip decorator. - - If the decorated object is not a class, sets its attribute - "client_skip" to a decorator which will return an empty function - if the test is to be skipped, or the original function if it is - not. This can be used to avoid running the client part of a - skipped test when using ThreadableTest. - """ - def client_pass(*args, **kwargs): - pass - def skipdec(obj): - retval = unittest.skip(reason)(obj) - if not isinstance(obj, type): - retval.client_skip = lambda f: client_pass - return retval - def noskipdec(obj): - if not (isinstance(obj, type) or hasattr(obj, "client_skip")): - obj.client_skip = lambda f: f - return obj - return skipdec if condition else noskipdec - - -def requireAttrs(obj, *attributes): - """Skip decorated test if obj is missing any of the given attributes. - - Sets client_skip attribute as skipWithClientIf() does. - """ - missing = [name for name in attributes if not hasattr(obj, name)] - return skipWithClientIf( - missing, "don't have " + ", ".join(name for name in missing)) - - -def requireSocket(*args): - """Skip decorated test if a socket cannot be created with given arguments. - - When an argument is given as a string, will use the value of that - attribute of the socket module, or skip the test if it doesn't - exist. Sets client_skip attribute as skipWithClientIf() does. - """ - err = None - missing = [obj for obj in args if - isinstance(obj, str) and not hasattr(socket, obj)] - if missing: - err = "don't have " + ", ".join(name for name in missing) - else: - callargs = [getattr(socket, obj) if isinstance(obj, str) else obj - for obj in args] - try: - s = socket.socket(*callargs) - except OSError as e: - # XXX: check errno? - err = str(e) - else: - s.close() - return skipWithClientIf( - err is not None, - "can't create socket({0}): {1}".format( - ", ".join(str(o) for o in args), err)) - - ####################################################################### ## Begin Tests class GeneralModuleTests(unittest.TestCase): - def test_SocketType_is_socketobject(self): - import _socket - self.assertTrue(socket.SocketType is _socket.socket) - s = socket.socket() - self.assertIsInstance(s, socket.SocketType) - s.close() - - def test_repr(self): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - with s: - self.assertIn('fd=%i' % s.fileno(), repr(s)) - self.assertIn('family=%s' % socket.AF_INET, repr(s)) - self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s)) - self.assertIn('proto=0', repr(s)) - self.assertNotIn('raddr', repr(s)) - s.bind(('127.0.0.1', 0)) - self.assertIn('laddr', repr(s)) - self.assertIn(str(s.getsockname()), repr(s)) - self.assertIn('[closed]', repr(s)) - self.assertNotIn('laddr', repr(s)) - @unittest.skipUnless(_socket is not None, 'need _socket module') def test_csocket_repr(self): s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) try: - expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>' + expected = ('<socket object, fd=%s, family=%s, type=%s, protocol=%s>' % (s.fileno(), s.family, s.type, s.proto)) self.assertEqual(repr(s), expected) finally: s.close() - expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>' + expected = ('<socket object, fd=-1, family=%s, type=%s, protocol=%s>' % (s.family, s.type, s.proto)) self.assertEqual(repr(s), expected) def test_weakref(self): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - p = proxy(s) - self.assertEqual(p.fileno(), s.fileno()) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + p = weakref.proxy(s) + self.assertEqual(p.fileno(), s.fileno()) + s.close() s = None try: p.fileno() @@ -860,15 +278,28 @@ class GeneralModuleTests(unittest.TestCase): else: self.fail('Socket proxy still exists') + def test_weakref__sock(self): + s = socket.socket()._sock + w = weakref.ref(s) + self.assertIs(w(), s) + del s + test_support.gc_collect() + self.assertIsNone(w()) + def testSocketError(self): # Testing socket module exceptions - msg = "Error raising socket exception (%s)." - with self.assertRaises(OSError, msg=msg % 'OSError'): - raise OSError - with self.assertRaises(OSError, msg=msg % 'socket.herror'): + def raise_error(*args, **kwargs): + raise socket.error + def raise_herror(*args, **kwargs): raise socket.herror - with self.assertRaises(OSError, msg=msg % 'socket.gaierror'): + def raise_gaierror(*args, **kwargs): raise socket.gaierror + self.assertRaises(socket.error, raise_error, + "Error raising socket exception.") + self.assertRaises(socket.error, raise_herror, + "Error raising socket exception.") + self.assertRaises(socket.error, raise_gaierror, + "Error raising socket exception.") def testSendtoErrors(self): # Testing that sendto doesn't mask failures. See #10169. @@ -877,48 +308,41 @@ class GeneralModuleTests(unittest.TestCase): s.bind(('', 0)) sockname = s.getsockname() # 2 args - with self.assertRaises(TypeError) as cm: - s.sendto('\u2620', sockname) - self.assertEqual(str(cm.exception), - "a bytes-like object is required, not 'str'") + with self.assertRaises(UnicodeEncodeError): + s.sendto(u'\u2620', sockname) with self.assertRaises(TypeError) as cm: s.sendto(5j, sockname) - self.assertEqual(str(cm.exception), - "a bytes-like object is required, not 'complex'") + self.assertIn('not complex', str(cm.exception)) with self.assertRaises(TypeError) as cm: - s.sendto(b'foo', None) - self.assertIn('not NoneType',str(cm.exception)) + s.sendto('foo', None) + self.assertIn('not NoneType', str(cm.exception)) # 3 args - with self.assertRaises(TypeError) as cm: - s.sendto('\u2620', 0, sockname) - self.assertEqual(str(cm.exception), - "a bytes-like object is required, not 'str'") + with self.assertRaises(UnicodeEncodeError): + s.sendto(u'\u2620', 0, sockname) with self.assertRaises(TypeError) as cm: s.sendto(5j, 0, sockname) - self.assertEqual(str(cm.exception), - "a bytes-like object is required, not 'complex'") + self.assertIn('not complex', str(cm.exception)) with self.assertRaises(TypeError) as cm: - s.sendto(b'foo', 0, None) + s.sendto('foo', 0, None) self.assertIn('not NoneType', str(cm.exception)) with self.assertRaises(TypeError) as cm: - s.sendto(b'foo', 'bar', sockname) + s.sendto('foo', 'bar', sockname) self.assertIn('an integer is required', str(cm.exception)) with self.assertRaises(TypeError) as cm: - s.sendto(b'foo', None, None) + s.sendto('foo', None, None) self.assertIn('an integer is required', str(cm.exception)) # wrong number of args with self.assertRaises(TypeError) as cm: - s.sendto(b'foo') + s.sendto('foo') self.assertIn('(1 given)', str(cm.exception)) with self.assertRaises(TypeError) as cm: - s.sendto(b'foo', 0, sockname, 4) + s.sendto('foo', 0, sockname, 4) self.assertIn('(4 given)', str(cm.exception)) + def testCrucialConstants(self): # Testing for mission critical constants socket.AF_INET - if socket.has_ipv6: - socket.AF_INET6 socket.SOCK_STREAM socket.SOCK_DGRAM socket.SOCK_RAW @@ -927,114 +351,25 @@ class GeneralModuleTests(unittest.TestCase): socket.SOL_SOCKET socket.SO_REUSEADDR - def testCrucialIpProtoConstants(self): - socket.IPPROTO_TCP - socket.IPPROTO_UDP - if socket.has_ipv6: - socket.IPPROTO_IPV6 - - @unittest.skipUnless(os.name == "nt", "Windows specific") - def testWindowsSpecificConstants(self): - socket.IPPROTO_ICLFXBM - socket.IPPROTO_ST - socket.IPPROTO_CBT - socket.IPPROTO_IGP - socket.IPPROTO_RDP - socket.IPPROTO_PGM - socket.IPPROTO_L2TP - socket.IPPROTO_SCTP - def testHostnameRes(self): # Testing hostname resolution mechanisms hostname = socket.gethostname() try: ip = socket.gethostbyname(hostname) - except OSError: + except socket.error: # Probably name lookup wasn't set up right; skip this test self.skipTest('name lookup failure') self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") try: hname, aliases, ipaddrs = socket.gethostbyaddr(ip) - except OSError: + except socket.error: # Probably a similar problem as above; skip this test - self.skipTest('name lookup failure') + self.skipTest('address lookup failure') all_host_names = [hostname, hname] + aliases fqhn = socket.getfqdn(ip) if not fqhn in all_host_names: self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) - def test_host_resolution(self): - for addr in [support.HOSTv4, '10.0.0.1', '255.255.255.255']: - self.assertEqual(socket.gethostbyname(addr), addr) - - # we don't test support.HOSTv6 because there's a chance it doesn't have - # a matching name entry (e.g. 'ip6-localhost') - for host in [support.HOSTv4]: - self.assertIn(host, socket.gethostbyaddr(host)[2]) - - def test_host_resolution_bad_address(self): - # These are all malformed IP addresses and expected not to resolve to - # any result. But some ISPs, e.g. AWS, may successfully resolve these - # IPs. - explanation = ( - "resolving an invalid IP address did not raise OSError; " - "can be caused by a broken DNS server" - ) - for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2', - '1:1:1:1:1:1:1:1:1']: - with self.assertRaises(OSError, msg=addr): - socket.gethostbyname(addr) - with self.assertRaises(OSError, msg=explanation): - socket.gethostbyaddr(addr) - - @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()") - @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()") - def test_sethostname(self): - oldhn = socket.gethostname() - try: - socket.sethostname('new') - except OSError as e: - if e.errno == errno.EPERM: - self.skipTest("test should be run as root") - else: - raise - try: - # running test as root! - self.assertEqual(socket.gethostname(), 'new') - # Should work with bytes objects too - socket.sethostname(b'bar') - self.assertEqual(socket.gethostname(), 'bar') - finally: - socket.sethostname(oldhn) - - @unittest.skipUnless(hasattr(socket, 'if_nameindex'), - 'socket.if_nameindex() not available.') - def testInterfaceNameIndex(self): - interfaces = socket.if_nameindex() - for index, name in interfaces: - self.assertIsInstance(index, int) - self.assertIsInstance(name, str) - # interface indices are non-zero integers - self.assertGreater(index, 0) - _index = socket.if_nametoindex(name) - self.assertIsInstance(_index, int) - self.assertEqual(index, _index) - _name = socket.if_indextoname(index) - self.assertIsInstance(_name, str) - self.assertEqual(name, _name) - - @unittest.skipUnless(hasattr(socket, 'if_indextoname'), - 'socket.if_indextoname() not available.') - def testInvalidInterfaceIndexToName(self): - self.assertRaises(OSError, socket.if_indextoname, 0) - self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') - - @unittest.skipUnless(hasattr(socket, 'if_nametoindex'), - 'socket.if_nametoindex() not available.') - def testInvalidInterfaceNameToIndex(self): - self.assertRaises(TypeError, socket.if_nametoindex, 0) - self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF') - @unittest.skipUnless(hasattr(sys, 'getrefcount'), 'test needs sys.getrefcount()') def testRefCountGetNameInfo(self): @@ -1044,15 +379,15 @@ class GeneralModuleTests(unittest.TestCase): orig = sys.getrefcount(__name__) socket.getnameinfo(__name__,0) except TypeError: - if sys.getrefcount(__name__) != orig: - self.fail("socket.getnameinfo loses a reference") + self.assertEqual(sys.getrefcount(__name__), orig, + "socket.getnameinfo loses a reference") def testInterpreterCrash(self): # Making sure getnameinfo doesn't crash the interpreter try: # On some versions, this crashes the interpreter. socket.getnameinfo(('x', 0, 0, 0), 0) - except OSError: + except socket.error: pass def testNtoH(self): @@ -1061,46 +396,37 @@ class GeneralModuleTests(unittest.TestCase): sizes = {socket.htonl: 32, socket.ntohl: 32, socket.htons: 16, socket.ntohs: 16} for func, size in sizes.items(): - mask = (1<<size) - 1 + mask = (1L<<size) - 1 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): self.assertEqual(i & mask, func(func(i&mask)) & mask) swapped = func(mask) self.assertEqual(swapped & mask, mask) - self.assertRaises(OverflowError, func, 1<<34) + self.assertRaises(OverflowError, func, 1L<<34) - @support.cpython_only def testNtoHErrors(self): - import _testcapi - s_good_values = [0, 1, 2, 0xffff] - l_good_values = s_good_values + [0xffffffff] - l_bad_values = [-1, -2, 1<<32, 1<<1000] - s_bad_values = l_bad_values + [_testcapi.INT_MIN - 1, - _testcapi.INT_MAX + 1] - s_deprecated_values = [1<<16, _testcapi.INT_MAX] - for k in s_good_values: - socket.ntohs(k) - socket.htons(k) - for k in l_good_values: + good_values = [ 1, 2, 3, 1L, 2L, 3L ] + bad_values = [ -1, -2, -3, -1L, -2L, -3L ] + for k in good_values: socket.ntohl(k) + socket.ntohs(k) socket.htonl(k) - for k in s_bad_values: - self.assertRaises(OverflowError, socket.ntohs, k) - self.assertRaises(OverflowError, socket.htons, k) - for k in l_bad_values: + socket.htons(k) + for k in bad_values: self.assertRaises(OverflowError, socket.ntohl, k) + self.assertRaises(OverflowError, socket.ntohs, k) self.assertRaises(OverflowError, socket.htonl, k) - for k in s_deprecated_values: - self.assertWarns(DeprecationWarning, socket.ntohs, k) - self.assertWarns(DeprecationWarning, socket.htons, k) + self.assertRaises(OverflowError, socket.htons, k) def testGetServBy(self): eq = self.assertEqual # Find one service that exists, then check all the related interfaces. # I've ordered this by protocols that have both a tcp and udp # protocol, at least for modern Linuxes. - if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd')) - or sys.platform in ('linux', 'darwin')): + if (sys.platform.startswith('linux') or + sys.platform.startswith('freebsd') or + sys.platform.startswith('netbsd') or + sys.platform == 'darwin'): # avoid the 'echo' service on this platform, as there is an # assumption breaking non-standard port/protocol entry services = ('daytime', 'qotd', 'domain') @@ -1110,27 +436,22 @@ class GeneralModuleTests(unittest.TestCase): try: port = socket.getservbyname(service, 'tcp') break - except OSError: + except socket.error: pass else: - raise OSError + raise socket.error # Try same call with optional protocol omitted - # Issue #26936: Android getservbyname() was broken before API 23. - if (not hasattr(sys, 'getandroidapilevel') or - sys.getandroidapilevel() >= 23): - port2 = socket.getservbyname(service) - eq(port, port2) + port2 = socket.getservbyname(service) + eq(port, port2) # Try udp, but don't barf if it doesn't exist try: udpport = socket.getservbyname(service, 'udp') - except OSError: + except socket.error: udpport = None else: eq(udpport, port) # Now make sure the lookup by port returns the same service name - # Issue #26936: Android getservbyport() is broken. - if not support.is_android: - eq(socket.getservbyport(port2), service) + eq(socket.getservbyport(port2), service) eq(socket.getservbyport(port, 'tcp'), service) if udpport is not None: eq(socket.getservbyport(udpport, 'udp'), service) @@ -1142,20 +463,23 @@ class GeneralModuleTests(unittest.TestCase): # Testing default timeout # The default timeout should initially be None self.assertEqual(socket.getdefaulttimeout(), None) - with socket.socket() as s: - self.assertEqual(s.gettimeout(), None) + s = socket.socket() + self.assertEqual(s.gettimeout(), None) + s.close() # Set the default timeout to 10, and see if it propagates - with socket_setdefaulttimeout(10): - self.assertEqual(socket.getdefaulttimeout(), 10) - with socket.socket() as sock: - self.assertEqual(sock.gettimeout(), 10) + socket.setdefaulttimeout(10) + self.assertEqual(socket.getdefaulttimeout(), 10) + s = socket.socket() + self.assertEqual(s.gettimeout(), 10) + s.close() - # Reset the default timeout to None, and see if it propagates - socket.setdefaulttimeout(None) - self.assertEqual(socket.getdefaulttimeout(), None) - with socket.socket() as sock: - self.assertEqual(sock.gettimeout(), None) + # Reset the default timeout to None, and see if it propagates + socket.setdefaulttimeout(None) + self.assertEqual(socket.getdefaulttimeout(), None) + s = socket.socket() + self.assertEqual(s.gettimeout(), None) + s.close() # Check that setting it to an invalid value raises ValueError self.assertRaises(ValueError, socket.setdefaulttimeout, -1) @@ -1168,8 +492,8 @@ class GeneralModuleTests(unittest.TestCase): def testIPv4_inet_aton_fourbytes(self): # Test that issue1008086 and issue767150 are fixed. # It must return 4 bytes. - self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0')) - self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255')) + self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0')) + self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255')) @unittest.skipUnless(hasattr(socket, 'inet_pton'), 'test needs socket.inet_pton()') @@ -1177,32 +501,16 @@ class GeneralModuleTests(unittest.TestCase): from socket import inet_aton as f, inet_pton, AF_INET g = lambda a: inet_pton(AF_INET, a) - assertInvalid = lambda func,a: self.assertRaises( - (OSError, ValueError), func, a - ) + self.assertEqual('\x00\x00\x00\x00', f('0.0.0.0')) + self.assertEqual('\xff\x00\xff\x00', f('255.0.255.0')) + self.assertEqual('\xaa\xaa\xaa\xaa', f('170.170.170.170')) + self.assertEqual('\x01\x02\x03\x04', f('1.2.3.4')) + self.assertEqual('\xff\xff\xff\xff', f('255.255.255.255')) - self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0')) - self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0')) - self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170')) - self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4')) - self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255')) - # bpo-29972: inet_pton() doesn't fail on AIX - if not AIX: - assertInvalid(f, '0.0.0.') - assertInvalid(f, '300.0.0.0') - assertInvalid(f, 'a.0.0.0') - assertInvalid(f, '1.2.3.4.5') - assertInvalid(f, '::1') - - self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0')) - self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0')) - self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170')) - self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255')) - assertInvalid(g, '0.0.0.') - assertInvalid(g, '300.0.0.0') - assertInvalid(g, 'a.0.0.0') - assertInvalid(g, '1.2.3.4.5') - assertInvalid(g, '::1') + self.assertEqual('\x00\x00\x00\x00', g('0.0.0.0')) + self.assertEqual('\xff\x00\xff\x00', g('255.0.255.0')) + self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170')) + self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255')) @unittest.skipUnless(hasattr(socket, 'inet_pton'), 'test needs socket.inet_pton()') @@ -1213,86 +521,30 @@ class GeneralModuleTests(unittest.TestCase): self.skipTest('IPv6 not available') except ImportError: self.skipTest('could not import needed symbols from socket') - - if sys.platform == "win32": - try: - inet_pton(AF_INET6, '::') - except OSError as e: - if e.winerror == 10022: - self.skipTest('IPv6 might not be supported') - f = lambda a: inet_pton(AF_INET6, a) - assertInvalid = lambda a: self.assertRaises( - (OSError, ValueError), f, a - ) - self.assertEqual(b'\x00' * 16, f('::')) - self.assertEqual(b'\x00' * 16, f('0::0')) - self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::')) + self.assertEqual('\x00' * 16, f('::')) + self.assertEqual('\x00' * 16, f('0::0')) + self.assertEqual('\x00\x01' + '\x00' * 14, f('1::')) self.assertEqual( - b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', + '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') ) - self.assertEqual( - b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02', - f('ad42:abc::127:0:254:2') - ) - self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::')) - assertInvalid('0x20::') - assertInvalid(':::') - assertInvalid('::0::') - assertInvalid('1::abc::') - assertInvalid('1::abc::def') - assertInvalid('1:2:3:4:5:6') - assertInvalid('1:2:3:4:5:6:') - assertInvalid('1:2:3:4:5:6:7:8:0') - # bpo-29972: inet_pton() doesn't fail on AIX - if not AIX: - assertInvalid('1:2:3:4:5:6:7:8:') - - self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40', - f('::254.42.23.64') - ) - self.assertEqual( - b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40', - f('42::a29b:254.42.23.64') - ) - self.assertEqual( - b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40', - f('42:a8b9:0:2:ffff:a29b:254.42.23.64') - ) - assertInvalid('255.254.253.252') - assertInvalid('1::260.2.3.0') - assertInvalid('1::0.be.e.0') - assertInvalid('1:2:3:4:5:6:7:1.2.3.4') - assertInvalid('::1.2.3.4:0') - assertInvalid('0.100.200.0:3:4:5:6:7:8') @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 'test needs socket.inet_ntop()') def testStringToIPv4(self): from socket import inet_ntoa as f, inet_ntop, AF_INET g = lambda a: inet_ntop(AF_INET, a) - assertInvalid = lambda func,a: self.assertRaises( - (OSError, ValueError), func, a - ) - self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00')) - self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55')) - self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff')) - self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04')) - assertInvalid(f, b'\x00' * 3) - assertInvalid(f, b'\x00' * 5) - assertInvalid(f, b'\x00' * 16) - self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55'))) - - self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00')) - self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55')) - self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff')) - assertInvalid(g, b'\x00' * 3) - assertInvalid(g, b'\x00' * 5) - assertInvalid(g, b'\x00' * 16) - self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55'))) + self.assertEqual('1.0.1.0', f('\x01\x00\x01\x00')) + self.assertEqual('170.85.170.85', f('\xaa\x55\xaa\x55')) + self.assertEqual('255.255.255.255', f('\xff\xff\xff\xff')) + self.assertEqual('1.2.3.4', f('\x01\x02\x03\x04')) + + self.assertEqual('1.0.1.0', g('\x01\x00\x01\x00')) + self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55')) + self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff')) @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 'test needs socket.inet_ntop()') @@ -1303,36 +555,34 @@ class GeneralModuleTests(unittest.TestCase): self.skipTest('IPv6 not available') except ImportError: self.skipTest('could not import needed symbols from socket') - - if sys.platform == "win32": - try: - inet_ntop(AF_INET6, b'\x00' * 16) - except OSError as e: - if e.winerror == 10022: - self.skipTest('IPv6 might not be supported') - f = lambda a: inet_ntop(AF_INET6, a) - assertInvalid = lambda a: self.assertRaises( - (OSError, ValueError), f, a - ) - self.assertEqual('::', f(b'\x00' * 16)) - self.assertEqual('::1', f(b'\x00' * 15 + b'\x01')) + self.assertEqual('::', f('\x00' * 16)) + self.assertEqual('::1', f('\x00' * 15 + '\x01')) self.assertEqual( 'aef:b01:506:1001:ffff:9997:55:170', - f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') + f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') ) - self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01'))) - - assertInvalid(b'\x12' * 15) - assertInvalid(b'\x12' * 17) - assertInvalid(b'\x12' * 4) # XXX The following don't test module-level functionality... + def _get_unused_port(self, bind_address='0.0.0.0'): + """Use a temporary socket to elicit an unused ephemeral port. + + Args: + bind_address: Hostname or IP address to search for a port on. + + Returns: A most likely to be unused port. + """ + tempsock = socket.socket() + tempsock.bind((bind_address, 0)) + host, port = tempsock.getsockname() + tempsock.close() + return port + def testSockName(self): # Testing getsockname() - port = support.find_unused_port() + port = self._get_unused_port() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.addCleanup(sock.close) sock.bind(("0.0.0.0", port)) @@ -1342,7 +592,7 @@ class GeneralModuleTests(unittest.TestCase): # At least for eCos. This is required for the S/390 to pass. try: my_ip_addr = socket.gethostbyname(socket.gethostname()) - except OSError: + except socket.error: # Probably name lookup wasn't set up right; skip this test self.skipTest('name lookup failure') self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) @@ -1366,39 +616,23 @@ class GeneralModuleTests(unittest.TestCase): def testSendAfterClose(self): # testing send() after close() with timeout - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - sock.settimeout(1) - self.assertRaises(OSError, sock.send, b"spam") - - def testCloseException(self): - sock = socket.socket() - sock.bind((socket._LOCALHOST, 0)) - socket.socket(fileno=sock.fileno()).close() - try: - sock.close() - except OSError as err: - # Winsock apparently raises ENOTSOCK - self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK)) - else: - self.fail("close() should raise EBADF/ENOTSOCK") + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1) + sock.close() + self.assertRaises(socket.error, sock.send, "spam") def testNewAttributes(self): # testing .family, .type and .protocol - - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - self.assertEqual(sock.family, socket.AF_INET) - if hasattr(socket, 'SOCK_CLOEXEC'): - self.assertIn(sock.type, - (socket.SOCK_STREAM | socket.SOCK_CLOEXEC, - socket.SOCK_STREAM)) - else: - self.assertEqual(sock.type, socket.SOCK_STREAM) - self.assertEqual(sock.proto, 0) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.assertEqual(sock.family, socket.AF_INET) + self.assertEqual(sock.type, socket.SOCK_STREAM) + self.assertEqual(sock.proto, 0) + sock.close() def test_getsockaddrarg(self): sock = socket.socket() self.addCleanup(sock.close) - port = support.find_unused_port() + port = test_support.find_unused_port() big_port = port + 65536 neg_port = port - 65536 self.assertRaises(OverflowError, sock.bind, (HOST, big_port)) @@ -1406,7 +640,7 @@ class GeneralModuleTests(unittest.TestCase): # Since find_unused_port() is inherently subject to race conditions, we # call it a couple times if necessary. for i in itertools.count(): - port = support.find_unused_port() + port = test_support.find_unused_port() try: sock.bind((HOST, port)) except OSError as e: @@ -1427,22 +661,6 @@ class GeneralModuleTests(unittest.TestCase): self.assertRaises(ValueError, s.ioctl, -1, None) s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100)) - @unittest.skipUnless(os.name == "nt", "Windows specific") - @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'), - 'Loopback fast path support required for this test') - def test_sio_loopback_fast_path(self): - s = socket.socket() - self.addCleanup(s.close) - try: - s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True) - except OSError as exc: - WSAEOPNOTSUPP = 10045 - if exc.winerror == WSAEOPNOTSUPP: - self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but " - "doesn't implemented in this Windows version") - raise - self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None) - def testGetaddrinfo(self): try: socket.getaddrinfo('localhost', 80) @@ -1459,23 +677,18 @@ class GeneralModuleTests(unittest.TestCase): socket.getaddrinfo('localhost', 80) socket.getaddrinfo('127.0.0.1', 80) socket.getaddrinfo(None, 80) - if support.IPV6_ENABLED: + if SUPPORTS_IPV6: socket.getaddrinfo('::1', 80) # port can be a string service name such as "http", a numeric - # port number or None - # Issue #26936: Android getaddrinfo() was broken before API level 23. - if (not hasattr(sys, 'getandroidapilevel') or - sys.getandroidapilevel() >= 23): - socket.getaddrinfo(HOST, "http") + # port number (int or long), or None + socket.getaddrinfo(HOST, "http") socket.getaddrinfo(HOST, 80) + socket.getaddrinfo(HOST, 80L) socket.getaddrinfo(HOST, None) # test family and socktype filters - infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM) - for family, type, _, _, _ in infos: + infos = socket.getaddrinfo(HOST, None, socket.AF_INET) + for family, _, _, _, _ in infos: self.assertEqual(family, socket.AF_INET) - self.assertEqual(str(family), 'AddressFamily.AF_INET') - self.assertEqual(type, socket.SOCK_STREAM) - self.assertEqual(str(type), 'SocketKind.SOCK_STREAM') infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) for _, socktype, _, _, _ in infos: self.assertEqual(socktype, socket.SOCK_STREAM) @@ -1486,30 +699,6 @@ class GeneralModuleTests(unittest.TestCase): # usually do this socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE) - # test keyword arguments - a = socket.getaddrinfo(HOST, None) - b = socket.getaddrinfo(host=HOST, port=None) - self.assertEqual(a, b) - a = socket.getaddrinfo(HOST, None, socket.AF_INET) - b = socket.getaddrinfo(HOST, None, family=socket.AF_INET) - self.assertEqual(a, b) - a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) - b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM) - self.assertEqual(a, b) - a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) - b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP) - self.assertEqual(a, b) - a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) - b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE) - self.assertEqual(a, b) - a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, - socket.AI_PASSIVE) - b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC, - type=socket.SOCK_STREAM, proto=0, - flags=socket.AI_PASSIVE) - self.assertEqual(a, b) - # Issue #6697. - self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800') # Issue 17269: test workaround for OS X platform bug segfault if hasattr(socket, 'AI_NUMERICSERV'): @@ -1521,27 +710,6 @@ class GeneralModuleTests(unittest.TestCase): except socket.gaierror: pass - def test_getnameinfo(self): - # only IP addresses are allowed - self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0) - - @unittest.skipUnless(support.is_resource_enabled('network'), - 'network is not enabled') - def test_idna(self): - # Check for internet access before running test - # (issue #12804, issue #25138). - with support.transient_internet('python.org'): - socket.gethostbyname('python.org') - - # these should all be successful - domain = 'испытание.pythontest.net' - socket.gethostbyname(domain) - socket.gethostbyname_ex(domain) - socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM) - # this may not work if the forward lookup chooses the IPv6 address, as that doesn't - # have a reverse entry yet - # socket.gethostbyaddr('испытание.python.org') - def check_sendall_interrupted(self, with_timeout): # socketpair() is not strictly required, but it makes things easier. if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): @@ -1561,12 +729,12 @@ class GeneralModuleTests(unittest.TestCase): c.settimeout(1.5) with self.assertRaises(ZeroDivisionError): signal.alarm(1) - c.sendall(b"x" * support.SOCK_MAX_SIZE) + c.sendall(b"x" * test_support.SOCK_MAX_SIZE) if with_timeout: signal.signal(signal.SIGALRM, ok_handler) signal.alarm(1) self.assertRaises(socket.timeout, c.sendall, - b"x" * support.SOCK_MAX_SIZE) + b"x" * test_support.SOCK_MAX_SIZE) finally: signal.alarm(0) signal.signal(signal.SIGALRM, old_alarm) @@ -1579,739 +747,34 @@ class GeneralModuleTests(unittest.TestCase): def test_sendall_interrupted_with_timeout(self): self.check_sendall_interrupted(True) - def test_dealloc_warn(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - r = repr(sock) - with self.assertWarns(ResourceWarning) as cm: - sock = None - support.gc_collect() - self.assertIn(r, str(cm.warning.args[0])) - # An open socket file object gets dereferenced after the socket - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - f = sock.makefile('rb') - r = repr(sock) - sock = None - support.gc_collect() - with self.assertWarns(ResourceWarning): - f = None - support.gc_collect() - - def test_name_closed_socketio(self): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - fp = sock.makefile("rb") - fp.close() - self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>") - - def test_unusable_closed_socketio(self): - with socket.socket() as sock: - fp = sock.makefile("rb", buffering=0) - self.assertTrue(fp.readable()) - self.assertFalse(fp.writable()) - self.assertFalse(fp.seekable()) - fp.close() - self.assertRaises(ValueError, fp.readable) - self.assertRaises(ValueError, fp.writable) - self.assertRaises(ValueError, fp.seekable) - - def test_socket_close(self): - sock = socket.socket() - try: - sock.bind((HOST, 0)) - socket.close(sock.fileno()) - with self.assertRaises(OSError): - sock.listen(1) - finally: - with self.assertRaises(OSError): - # sock.close() fails with EBADF - sock.close() - with self.assertRaises(TypeError): - socket.close(None) - with self.assertRaises(OSError): - socket.close(-1) - - def test_makefile_mode(self): - for mode in 'r', 'rb', 'rw', 'w', 'wb': - with self.subTest(mode=mode): - with socket.socket() as sock: - with sock.makefile(mode) as fp: - self.assertEqual(fp.mode, mode) - - def test_makefile_invalid_mode(self): - for mode in 'rt', 'x', '+', 'a': - with self.subTest(mode=mode): - with socket.socket() as sock: - with self.assertRaisesRegex(ValueError, 'invalid mode'): - sock.makefile(mode) - - def test_pickle(self): - sock = socket.socket() - with sock: - for protocol in range(pickle.HIGHEST_PROTOCOL + 1): - self.assertRaises(TypeError, pickle.dumps, sock, protocol) - for protocol in range(pickle.HIGHEST_PROTOCOL + 1): - family = pickle.loads(pickle.dumps(socket.AF_INET, protocol)) - self.assertEqual(family, socket.AF_INET) - type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol)) - self.assertEqual(type, socket.SOCK_STREAM) - def test_listen_backlog(self): for backlog in 0, -1: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: - srv.bind((HOST, 0)) - srv.listen(backlog) - - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: + srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv.bind((HOST, 0)) - srv.listen() + srv.listen(backlog) + srv.close() - @support.cpython_only + @test_support.cpython_only def test_listen_backlog_overflow(self): # Issue 15989 import _testcapi - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: - srv.bind((HOST, 0)) - self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) + srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + srv.bind((HOST, 0)) + self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) + srv.close() - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') + @unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.') def test_flowinfo(self): self.assertRaises(OverflowError, socket.getnameinfo, - (support.HOSTv6, 0, 0xffffffff), 0) - with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: - self.assertRaises(OverflowError, s.bind, (support.HOSTv6, 0, -10)) - - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') - def test_getaddrinfo_ipv6_basic(self): - ((*_, sockaddr),) = socket.getaddrinfo( - 'ff02::1de:c0:face:8D', # Note capital letter `D`. - 1234, socket.AF_INET6, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP - ) - self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0)) - - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') - @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') - @unittest.skipIf(AIX, 'Symbolic scope id does not work') - def test_getaddrinfo_ipv6_scopeid_symbolic(self): - # Just pick up any network interface (Linux, Mac OS X) - (ifindex, test_interface) = socket.if_nameindex()[0] - ((*_, sockaddr),) = socket.getaddrinfo( - 'ff02::1de:c0:face:8D%' + test_interface, - 1234, socket.AF_INET6, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP - ) - # Note missing interface name part in IPv6 address - self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) - - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') - @unittest.skipUnless( - sys.platform == 'win32', - 'Numeric scope id does not work or undocumented') - def test_getaddrinfo_ipv6_scopeid_numeric(self): - # Also works on Linux and Mac OS X, but is not documented (?) - # Windows, Linux and Max OS X allow nonexistent interface numbers here. - ifindex = 42 - ((*_, sockaddr),) = socket.getaddrinfo( - 'ff02::1de:c0:face:8D%' + str(ifindex), - 1234, socket.AF_INET6, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP - ) - # Note missing interface name part in IPv6 address - self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) - - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') - @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') - @unittest.skipIf(AIX, 'Symbolic scope id does not work') - def test_getnameinfo_ipv6_scopeid_symbolic(self): - # Just pick up any network interface. - (ifindex, test_interface) = socket.if_nameindex()[0] - sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`. - nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) - self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234')) - - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') - @unittest.skipUnless( sys.platform == 'win32', - 'Numeric scope id does not work or undocumented') - def test_getnameinfo_ipv6_scopeid_numeric(self): - # Also works on Linux (undocumented), but does not work on Mac OS X - # Windows and Linux allow nonexistent interface numbers here. - ifindex = 42 - sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`. - nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) - self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + str(ifindex), '1234')) - - def test_str_for_enums(self): - # Make sure that the AF_* and SOCK_* constants have enum-like string - # reprs. - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - self.assertEqual(str(s.family), 'AddressFamily.AF_INET') - self.assertEqual(str(s.type), 'SocketKind.SOCK_STREAM') - - def test_socket_consistent_sock_type(self): - SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0) - SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0) - sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC - - with socket.socket(socket.AF_INET, sock_type) as s: - self.assertEqual(s.type, socket.SOCK_STREAM) - s.settimeout(1) - self.assertEqual(s.type, socket.SOCK_STREAM) - s.settimeout(0) - self.assertEqual(s.type, socket.SOCK_STREAM) - s.setblocking(True) - self.assertEqual(s.type, socket.SOCK_STREAM) - s.setblocking(False) - self.assertEqual(s.type, socket.SOCK_STREAM) - - def test_unknown_socket_family_repr(self): - # Test that when created with a family that's not one of the known - # AF_*/SOCK_* constants, socket.family just returns the number. - # - # To do this we fool socket.socket into believing it already has an - # open fd because on this path it doesn't actually verify the family and - # type and populates the socket object. - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - fd = sock.detach() - unknown_family = max(socket.AddressFamily.__members__.values()) + 1 - - unknown_type = max( - kind - for name, kind in socket.SocketKind.__members__.items() - if name not in {'SOCK_NONBLOCK', 'SOCK_CLOEXEC'} - ) + 1 - - with socket.socket( - family=unknown_family, type=unknown_type, proto=23, - fileno=fd) as s: - self.assertEqual(s.family, unknown_family) - self.assertEqual(s.type, unknown_type) - # some OS like macOS ignore proto - self.assertIn(s.proto, {0, 23}) - - @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()') - def test__sendfile_use_sendfile(self): - class File: - def __init__(self, fd): - self.fd = fd - - def fileno(self): - return self.fd - with socket.socket() as sock: - fd = os.open(os.curdir, os.O_RDONLY) - os.close(fd) - with self.assertRaises(socket._GiveupOnSendfile): - sock._sendfile_use_sendfile(File(fd)) - with self.assertRaises(OverflowError): - sock._sendfile_use_sendfile(File(2**1000)) - with self.assertRaises(TypeError): - sock._sendfile_use_sendfile(File(None)) - - def _test_socket_fileno(self, s, family, stype): - self.assertEqual(s.family, family) - self.assertEqual(s.type, stype) - - fd = s.fileno() - s2 = socket.socket(fileno=fd) - self.addCleanup(s2.close) - # detach old fd to avoid double close - s.detach() - self.assertEqual(s2.family, family) - self.assertEqual(s2.type, stype) - self.assertEqual(s2.fileno(), fd) - - def test_socket_fileno(self): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.addCleanup(s.close) - s.bind((support.HOST, 0)) - self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM) - - if hasattr(socket, "SOCK_DGRAM"): - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.addCleanup(s.close) - s.bind((support.HOST, 0)) - self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM) - - if support.IPV6_ENABLED: - s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - self.addCleanup(s.close) - s.bind((support.HOSTv6, 0, 0, 0)) - self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM) - - if hasattr(socket, "AF_UNIX"): - tmpdir = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, tmpdir) - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.addCleanup(s.close) - try: - s.bind(os.path.join(tmpdir, 'socket')) - except PermissionError: - pass - else: - self._test_socket_fileno(s, socket.AF_UNIX, - socket.SOCK_STREAM) - - def test_socket_fileno_rejects_float(self): - with self.assertRaisesRegex(TypeError, "integer argument expected"): - socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=42.5) - - def test_socket_fileno_rejects_other_types(self): - with self.assertRaisesRegex(TypeError, "integer is required"): - socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno="foo") - - def test_socket_fileno_rejects_invalid_socket(self): - with self.assertRaisesRegex(ValueError, "negative file descriptor"): - socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-1) - - @unittest.skipIf(os.name == "nt", "Windows disallows -1 only") - def test_socket_fileno_rejects_negative(self): - with self.assertRaisesRegex(ValueError, "negative file descriptor"): - socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-42) - - def test_socket_fileno_requires_valid_fd(self): - WSAENOTSOCK = 10038 - with self.assertRaises(OSError) as cm: - socket.socket(fileno=support.make_bad_fd()) - self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK)) - - with self.assertRaises(OSError) as cm: - socket.socket( - socket.AF_INET, - socket.SOCK_STREAM, - fileno=support.make_bad_fd()) - self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK)) - - def test_socket_fileno_requires_socket_fd(self): - with tempfile.NamedTemporaryFile() as afile: - with self.assertRaises(OSError): - socket.socket(fileno=afile.fileno()) - - with self.assertRaises(OSError) as cm: - socket.socket( - socket.AF_INET, - socket.SOCK_STREAM, - fileno=afile.fileno()) - self.assertEqual(cm.exception.errno, errno.ENOTSOCK) - - -@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') -class BasicCANTest(unittest.TestCase): - - def testCrucialConstants(self): - socket.AF_CAN - socket.PF_CAN - socket.CAN_RAW - - @unittest.skipUnless(hasattr(socket, "CAN_BCM"), - 'socket.CAN_BCM required for this test.') - def testBCMConstants(self): - socket.CAN_BCM - - # opcodes - socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task - socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task - socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task - socket.CAN_BCM_TX_SEND # send one CAN frame - socket.CAN_BCM_RX_SETUP # create RX content filter subscription - socket.CAN_BCM_RX_DELETE # remove RX content filter subscription - socket.CAN_BCM_RX_READ # read properties of RX content filter subscription - socket.CAN_BCM_TX_STATUS # reply to TX_READ request - socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0) - socket.CAN_BCM_RX_STATUS # reply to RX_READ request - socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent - socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change) - - # flags - socket.CAN_BCM_SETTIMER - socket.CAN_BCM_STARTTIMER - socket.CAN_BCM_TX_COUNTEVT - socket.CAN_BCM_TX_ANNOUNCE - socket.CAN_BCM_TX_CP_CAN_ID - socket.CAN_BCM_RX_FILTER_ID - socket.CAN_BCM_RX_CHECK_DLC - socket.CAN_BCM_RX_NO_AUTOTIMER - socket.CAN_BCM_RX_ANNOUNCE_RESUME - socket.CAN_BCM_TX_RESET_MULTI_IDX - socket.CAN_BCM_RX_RTR_FRAME - - def testCreateSocket(self): - with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: - pass - - @unittest.skipUnless(hasattr(socket, "CAN_BCM"), - 'socket.CAN_BCM required for this test.') - def testCreateBCMSocket(self): - with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s: - pass - - def testBindAny(self): - with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: - address = ('', ) - s.bind(address) - self.assertEqual(s.getsockname(), address) - - def testTooLongInterfaceName(self): - # most systems limit IFNAMSIZ to 16, take 1024 to be sure - with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: - self.assertRaisesRegex(OSError, 'interface name too long', - s.bind, ('x' * 1024,)) - - @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"), - 'socket.CAN_RAW_LOOPBACK required for this test.') - def testLoopback(self): - with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: - for loopback in (0, 1): - s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK, - loopback) - self.assertEqual(loopback, - s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK)) - - @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"), - 'socket.CAN_RAW_FILTER required for this test.') - def testFilter(self): - can_id, can_mask = 0x200, 0x700 - can_filter = struct.pack("=II", can_id, can_mask) - with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: - s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter) - self.assertEqual(can_filter, - s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8)) - s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter)) - - -@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') -class CANTest(ThreadedCANSocketTest): - - def __init__(self, methodName='runTest'): - ThreadedCANSocketTest.__init__(self, methodName=methodName) - - @classmethod - def build_can_frame(cls, can_id, data): - """Build a CAN frame.""" - can_dlc = len(data) - data = data.ljust(8, b'\x00') - return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data) - - @classmethod - def dissect_can_frame(cls, frame): - """Dissect a CAN frame.""" - can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame) - return (can_id, can_dlc, data[:can_dlc]) - - def testSendFrame(self): - cf, addr = self.s.recvfrom(self.bufsize) - self.assertEqual(self.cf, cf) - self.assertEqual(addr[0], self.interface) - self.assertEqual(addr[1], socket.AF_CAN) - - def _testSendFrame(self): - self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05') - self.cli.send(self.cf) - - def testSendMaxFrame(self): - cf, addr = self.s.recvfrom(self.bufsize) - self.assertEqual(self.cf, cf) - - def _testSendMaxFrame(self): - self.cf = self.build_can_frame(0x00, b'\x07' * 8) - self.cli.send(self.cf) - - def testSendMultiFrames(self): - cf, addr = self.s.recvfrom(self.bufsize) - self.assertEqual(self.cf1, cf) - - cf, addr = self.s.recvfrom(self.bufsize) - self.assertEqual(self.cf2, cf) - - def _testSendMultiFrames(self): - self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11') - self.cli.send(self.cf1) - - self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33') - self.cli.send(self.cf2) - - @unittest.skipUnless(hasattr(socket, "CAN_BCM"), - 'socket.CAN_BCM required for this test.') - def _testBCM(self): - cf, addr = self.cli.recvfrom(self.bufsize) - self.assertEqual(self.cf, cf) - can_id, can_dlc, data = self.dissect_can_frame(cf) - self.assertEqual(self.can_id, can_id) - self.assertEqual(self.data, data) - - @unittest.skipUnless(hasattr(socket, "CAN_BCM"), - 'socket.CAN_BCM required for this test.') - def testBCM(self): - bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) - self.addCleanup(bcm.close) - bcm.connect((self.interface,)) - self.can_id = 0x123 - self.data = bytes([0xc0, 0xff, 0xee]) - self.cf = self.build_can_frame(self.can_id, self.data) - opcode = socket.CAN_BCM_TX_SEND - flags = 0 - count = 0 - ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0 - bcm_can_id = 0x0222 - nframes = 1 - assert len(self.cf) == 16 - header = struct.pack(self.bcm_cmd_msg_fmt, - opcode, - flags, - count, - ival1_seconds, - ival1_usec, - ival2_seconds, - ival2_usec, - bcm_can_id, - nframes, - ) - header_plus_frame = header + self.cf - bytes_sent = bcm.send(header_plus_frame) - self.assertEqual(bytes_sent, len(header_plus_frame)) - - -@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.') -class ISOTPTest(unittest.TestCase): - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.interface = "vcan0" - - def testCrucialConstants(self): - socket.AF_CAN - socket.PF_CAN - socket.CAN_ISOTP - socket.SOCK_DGRAM - - def testCreateSocket(self): - with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: - pass - - @unittest.skipUnless(hasattr(socket, "CAN_ISOTP"), - 'socket.CAN_ISOTP required for this test.') - def testCreateISOTPSocket(self): - with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: - pass - - def testTooLongInterfaceName(self): - # most systems limit IFNAMSIZ to 16, take 1024 to be sure - with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: - with self.assertRaisesRegex(OSError, 'interface name too long'): - s.bind(('x' * 1024, 1, 2)) - - def testBind(self): + ('::1',0, 0xffffffff), 0) + s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) try: - with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: - addr = self.interface, 0x123, 0x456 - s.bind(addr) - self.assertEqual(s.getsockname(), addr) - except OSError as e: - if e.errno == errno.ENODEV: - self.skipTest('network interface `%s` does not exist' % - self.interface) - else: - raise - - -@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') -class BasicRDSTest(unittest.TestCase): - - def testCrucialConstants(self): - socket.AF_RDS - socket.PF_RDS - - def testCreateSocket(self): - with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: - pass - - def testSocketBufferSize(self): - bufsize = 16384 - with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: - s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize) - s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize) - - -@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') -class RDSTest(ThreadedRDSSocketTest): - - def __init__(self, methodName='runTest'): - ThreadedRDSSocketTest.__init__(self, methodName=methodName) - - def setUp(self): - super().setUp() - self.evt = threading.Event() - - def testSendAndRecv(self): - data, addr = self.serv.recvfrom(self.bufsize) - self.assertEqual(self.data, data) - self.assertEqual(self.cli_addr, addr) - - def _testSendAndRecv(self): - self.data = b'spam' - self.cli.sendto(self.data, 0, (HOST, self.port)) - - def testPeek(self): - data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK) - self.assertEqual(self.data, data) - data, addr = self.serv.recvfrom(self.bufsize) - self.assertEqual(self.data, data) - - def _testPeek(self): - self.data = b'spam' - self.cli.sendto(self.data, 0, (HOST, self.port)) - - @requireAttrs(socket.socket, 'recvmsg') - def testSendAndRecvMsg(self): - data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize) - self.assertEqual(self.data, data) - - @requireAttrs(socket.socket, 'sendmsg') - def _testSendAndRecvMsg(self): - self.data = b'hello ' * 10 - self.cli.sendmsg([self.data], (), 0, (HOST, self.port)) - - def testSendAndRecvMulti(self): - data, addr = self.serv.recvfrom(self.bufsize) - self.assertEqual(self.data1, data) - - data, addr = self.serv.recvfrom(self.bufsize) - self.assertEqual(self.data2, data) - - def _testSendAndRecvMulti(self): - self.data1 = b'bacon' - self.cli.sendto(self.data1, 0, (HOST, self.port)) - - self.data2 = b'egg' - self.cli.sendto(self.data2, 0, (HOST, self.port)) - - def testSelect(self): - r, w, x = select.select([self.serv], [], [], 3.0) - self.assertIn(self.serv, r) - data, addr = self.serv.recvfrom(self.bufsize) - self.assertEqual(self.data, data) - - def _testSelect(self): - self.data = b'select' - self.cli.sendto(self.data, 0, (HOST, self.port)) - -@unittest.skipUnless(HAVE_SOCKET_QIPCRTR, - 'QIPCRTR sockets required for this test.') -class BasicQIPCRTRTest(unittest.TestCase): - - def testCrucialConstants(self): - socket.AF_QIPCRTR - - def testCreateSocket(self): - with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: - pass - - def testUnbound(self): - with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: - self.assertEqual(s.getsockname()[1], 0) - - def testBindSock(self): - with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: - support.bind_port(s, host=s.getsockname()[0]) - self.assertNotEqual(s.getsockname()[1], 0) - - def testInvalidBindSock(self): - with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: - self.assertRaises(OSError, support.bind_port, s, host=-2) - - def testAutoBindSock(self): - with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: - s.connect((123, 123)) - self.assertNotEqual(s.getsockname()[1], 0) - -@unittest.skipIf(fcntl is None, "need fcntl") -@unittest.skipUnless(HAVE_SOCKET_VSOCK, - 'VSOCK sockets required for this test.') -class BasicVSOCKTest(unittest.TestCase): - - def testCrucialConstants(self): - socket.AF_VSOCK - - def testVSOCKConstants(self): - socket.SO_VM_SOCKETS_BUFFER_SIZE - socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE - socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE - socket.VMADDR_CID_ANY - socket.VMADDR_PORT_ANY - socket.VMADDR_CID_HOST - socket.VM_SOCKETS_INVALID_VERSION - socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID - - def testCreateSocket(self): - with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: - pass - - def testSocketBufferSize(self): - with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: - orig_max = s.getsockopt(socket.AF_VSOCK, - socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE) - orig = s.getsockopt(socket.AF_VSOCK, - socket.SO_VM_SOCKETS_BUFFER_SIZE) - orig_min = s.getsockopt(socket.AF_VSOCK, - socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE) - - s.setsockopt(socket.AF_VSOCK, - socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE, orig_max * 2) - s.setsockopt(socket.AF_VSOCK, - socket.SO_VM_SOCKETS_BUFFER_SIZE, orig * 2) - s.setsockopt(socket.AF_VSOCK, - socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE, orig_min * 2) - - self.assertEqual(orig_max * 2, - s.getsockopt(socket.AF_VSOCK, - socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE)) - self.assertEqual(orig * 2, - s.getsockopt(socket.AF_VSOCK, - socket.SO_VM_SOCKETS_BUFFER_SIZE)) - self.assertEqual(orig_min * 2, - s.getsockopt(socket.AF_VSOCK, - socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE)) - - -@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH, - 'Bluetooth sockets required for this test.') -class BasicBluetoothTest(unittest.TestCase): - - def testBluetoothConstants(self): - socket.BDADDR_ANY - socket.BDADDR_LOCAL - socket.AF_BLUETOOTH - socket.BTPROTO_RFCOMM - - if sys.platform != "win32": - socket.BTPROTO_HCI - socket.SOL_HCI - socket.BTPROTO_L2CAP - - if not sys.platform.startswith("freebsd"): - socket.BTPROTO_SCO - - def testCreateRfcommSocket(self): - with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s: - pass - - @unittest.skipIf(sys.platform == "win32", "windows does not support L2CAP sockets") - def testCreateL2capSocket(self): - with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as s: - pass - - @unittest.skipIf(sys.platform == "win32", "windows does not support HCI sockets") - def testCreateHciSocket(self): - with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s: - pass - - @unittest.skipIf(sys.platform == "win32" or sys.platform.startswith("freebsd"), - "windows and freebsd do not support SCO sockets") - def testCreateScoSocket(self): - with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s: - pass + self.assertRaises(OverflowError, s.bind, ('::1', 0, -10)) + finally: + s.close() +@unittest.skipUnless(thread, 'Threading required for this test.') class BasicTCPTest(SocketConnectedTest): def __init__(self, methodName='runTest'): @@ -2355,24 +818,25 @@ class BasicTCPTest(SocketConnectedTest): def testSendAll(self): # Testing sendall() with a 2048 byte string over TCP - msg = b'' + msg = '' while 1: read = self.cli_conn.recv(1024) if not read: break msg += read - self.assertEqual(msg, b'f' * 2048) + self.assertEqual(msg, 'f' * 2048) def _testSendAll(self): - big_chunk = b'f' * 2048 + big_chunk = 'f' * 2048 self.serv_conn.sendall(big_chunk) + @unittest.skipUnless(hasattr(socket, 'fromfd'), + 'socket.fromfd not available') def testFromFd(self): # Testing fromfd() fd = self.cli_conn.fileno() sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) self.addCleanup(sock.close) - self.assertIsInstance(sock, socket.socket) msg = sock.recv(1024) self.assertEqual(msg, MSG) @@ -2402,9 +866,9 @@ class BasicTCPTest(SocketConnectedTest): self.serv_conn.send(MSG) self.serv_conn.shutdown(2) - testShutdown_overflow = support.cpython_only(testShutdown) + testShutdown_overflow = test_support.cpython_only(testShutdown) - @support.cpython_only + @test_support.cpython_only def _testShutdown_overflow(self): import _testcapi self.serv_conn.send(MSG) @@ -2415,26 +879,7 @@ class BasicTCPTest(SocketConnectedTest): 2 + (_testcapi.UINT_MAX + 1)) self.serv_conn.shutdown(2) - def testDetach(self): - # Testing detach() - fileno = self.cli_conn.fileno() - f = self.cli_conn.detach() - self.assertEqual(f, fileno) - # cli_conn cannot be used anymore... - self.assertTrue(self.cli_conn._closed) - self.assertRaises(OSError, self.cli_conn.recv, 1024) - self.cli_conn.close() - # ...but we can create another socket using the (still open) - # file descriptor - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f) - self.addCleanup(sock.close) - msg = sock.recv(1024) - self.assertEqual(msg, MSG) - - def _testDetach(self): - self.serv_conn.send(MSG) - - +@unittest.skipUnless(thread, 'Threading required for this test.') class BasicUDPTest(ThreadedUDPSocketTest): def __init__(self, methodName='runTest'): @@ -2463,1984 +908,7 @@ class BasicUDPTest(ThreadedUDPSocketTest): def _testRecvFromNegative(self): self.cli.sendto(MSG, 0, (HOST, self.port)) - -@unittest.skipUnless(HAVE_SOCKET_UDPLITE, - 'UDPLITE sockets required for this test.') -class BasicUDPLITETest(ThreadedUDPLITESocketTest): - - def __init__(self, methodName='runTest'): - ThreadedUDPLITESocketTest.__init__(self, methodName=methodName) - - def testSendtoAndRecv(self): - # Testing sendto() and Recv() over UDPLITE - msg = self.serv.recv(len(MSG)) - self.assertEqual(msg, MSG) - - def _testSendtoAndRecv(self): - self.cli.sendto(MSG, 0, (HOST, self.port)) - - def testRecvFrom(self): - # Testing recvfrom() over UDPLITE - msg, addr = self.serv.recvfrom(len(MSG)) - self.assertEqual(msg, MSG) - - def _testRecvFrom(self): - self.cli.sendto(MSG, 0, (HOST, self.port)) - - def testRecvFromNegative(self): - # Negative lengths passed to recvfrom should give ValueError. - self.assertRaises(ValueError, self.serv.recvfrom, -1) - - def _testRecvFromNegative(self): - self.cli.sendto(MSG, 0, (HOST, self.port)) - -# Tests for the sendmsg()/recvmsg() interface. Where possible, the -# same test code is used with different families and types of socket -# (e.g. stream, datagram), and tests using recvmsg() are repeated -# using recvmsg_into(). -# -# The generic test classes such as SendmsgTests and -# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be -# supplied with sockets cli_sock and serv_sock representing the -# client's and the server's end of the connection respectively, and -# attributes cli_addr and serv_addr holding their (numeric where -# appropriate) addresses. -# -# The final concrete test classes combine these with subclasses of -# SocketTestBase which set up client and server sockets of a specific -# type, and with subclasses of SendrecvmsgBase such as -# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these -# sockets to cli_sock and serv_sock and override the methods and -# attributes of SendrecvmsgBase to fill in destination addresses if -# needed when sending, check for specific flags in msg_flags, etc. -# -# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using -# recvmsg_into(). - -# XXX: like the other datagram (UDP) tests in this module, the code -# here assumes that datagram delivery on the local machine will be -# reliable. - -class SendrecvmsgBase(ThreadSafeCleanupTestCase): - # Base class for sendmsg()/recvmsg() tests. - - # Time in seconds to wait before considering a test failed, or - # None for no timeout. Not all tests actually set a timeout. - fail_timeout = support.LOOPBACK_TIMEOUT - - def setUp(self): - self.misc_event = threading.Event() - super().setUp() - - def sendToServer(self, msg): - # Send msg to the server. - return self.cli_sock.send(msg) - - # Tuple of alternative default arguments for sendmsg() when called - # via sendmsgToServer() (e.g. to include a destination address). - sendmsg_to_server_defaults = () - - def sendmsgToServer(self, *args): - # Call sendmsg() on self.cli_sock with the given arguments, - # filling in any arguments which are not supplied with the - # corresponding items of self.sendmsg_to_server_defaults, if - # any. - return self.cli_sock.sendmsg( - *(args + self.sendmsg_to_server_defaults[len(args):])) - - def doRecvmsg(self, sock, bufsize, *args): - # Call recvmsg() on sock with given arguments and return its - # result. Should be used for tests which can use either - # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides - # this method with one which emulates it using recvmsg_into(), - # thus allowing the same test to be used for both methods. - result = sock.recvmsg(bufsize, *args) - self.registerRecvmsgResult(result) - return result - - def registerRecvmsgResult(self, result): - # Called by doRecvmsg() with the return value of recvmsg() or - # recvmsg_into(). Can be overridden to arrange cleanup based - # on the returned ancillary data, for instance. - pass - - def checkRecvmsgAddress(self, addr1, addr2): - # Called to compare the received address with the address of - # the peer. - self.assertEqual(addr1, addr2) - - # Flags that are normally unset in msg_flags - msg_flags_common_unset = 0 - for name in ("MSG_CTRUNC", "MSG_OOB"): - msg_flags_common_unset |= getattr(socket, name, 0) - - # Flags that are normally set - msg_flags_common_set = 0 - - # Flags set when a complete record has been received (e.g. MSG_EOR - # for SCTP) - msg_flags_eor_indicator = 0 - - # Flags set when a complete record has not been received - # (e.g. MSG_TRUNC for datagram sockets) - msg_flags_non_eor_indicator = 0 - - def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0): - # Method to check the value of msg_flags returned by recvmsg[_into](). - # - # Checks that all bits in msg_flags_common_set attribute are - # set in "flags" and all bits in msg_flags_common_unset are - # unset. - # - # The "eor" argument specifies whether the flags should - # indicate that a full record (or datagram) has been received. - # If "eor" is None, no checks are done; otherwise, checks - # that: - # - # * if "eor" is true, all bits in msg_flags_eor_indicator are - # set and all bits in msg_flags_non_eor_indicator are unset - # - # * if "eor" is false, all bits in msg_flags_non_eor_indicator - # are set and all bits in msg_flags_eor_indicator are unset - # - # If "checkset" and/or "checkunset" are supplied, they require - # the given bits to be set or unset respectively, overriding - # what the attributes require for those bits. - # - # If any bits are set in "ignore", they will not be checked, - # regardless of the other inputs. - # - # Will raise Exception if the inputs require a bit to be both - # set and unset, and it is not ignored. - - defaultset = self.msg_flags_common_set - defaultunset = self.msg_flags_common_unset - - if eor: - defaultset |= self.msg_flags_eor_indicator - defaultunset |= self.msg_flags_non_eor_indicator - elif eor is not None: - defaultset |= self.msg_flags_non_eor_indicator - defaultunset |= self.msg_flags_eor_indicator - - # Function arguments override defaults - defaultset &= ~checkunset - defaultunset &= ~checkset - - # Merge arguments with remaining defaults, and check for conflicts - checkset |= defaultset - checkunset |= defaultunset - inboth = checkset & checkunset & ~ignore - if inboth: - raise Exception("contradictory set, unset requirements for flags " - "{0:#x}".format(inboth)) - - # Compare with given msg_flags value - mask = (checkset | checkunset) & ~ignore - self.assertEqual(flags & mask, checkset & mask) - - -class RecvmsgIntoMixin(SendrecvmsgBase): - # Mixin to implement doRecvmsg() using recvmsg_into(). - - def doRecvmsg(self, sock, bufsize, *args): - buf = bytearray(bufsize) - result = sock.recvmsg_into([buf], *args) - self.registerRecvmsgResult(result) - self.assertGreaterEqual(result[0], 0) - self.assertLessEqual(result[0], bufsize) - return (bytes(buf[:result[0]]),) + result[1:] - - -class SendrecvmsgDgramFlagsBase(SendrecvmsgBase): - # Defines flags to be checked in msg_flags for datagram sockets. - - @property - def msg_flags_non_eor_indicator(self): - return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC - - -class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase): - # Defines flags to be checked in msg_flags for SCTP sockets. - - @property - def msg_flags_eor_indicator(self): - return super().msg_flags_eor_indicator | socket.MSG_EOR - - -class SendrecvmsgConnectionlessBase(SendrecvmsgBase): - # Base class for tests on connectionless-mode sockets. Users must - # supply sockets on attributes cli and serv to be mapped to - # cli_sock and serv_sock respectively. - - @property - def serv_sock(self): - return self.serv - - @property - def cli_sock(self): - return self.cli - - @property - def sendmsg_to_server_defaults(self): - return ([], [], 0, self.serv_addr) - - def sendToServer(self, msg): - return self.cli_sock.sendto(msg, self.serv_addr) - - -class SendrecvmsgConnectedBase(SendrecvmsgBase): - # Base class for tests on connected sockets. Users must supply - # sockets on attributes serv_conn and cli_conn (representing the - # connections *to* the server and the client), to be mapped to - # cli_sock and serv_sock respectively. - - @property - def serv_sock(self): - return self.cli_conn - - @property - def cli_sock(self): - return self.serv_conn - - def checkRecvmsgAddress(self, addr1, addr2): - # Address is currently "unspecified" for a connected socket, - # so we don't examine it - pass - - -class SendrecvmsgServerTimeoutBase(SendrecvmsgBase): - # Base class to set a timeout on server's socket. - - def setUp(self): - super().setUp() - self.serv_sock.settimeout(self.fail_timeout) - - -class SendmsgTests(SendrecvmsgServerTimeoutBase): - # Tests for sendmsg() which can use any socket type and do not - # involve recvmsg() or recvmsg_into(). - - def testSendmsg(self): - # Send a simple message with sendmsg(). - self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) - - def _testSendmsg(self): - self.assertEqual(self.sendmsgToServer([MSG]), len(MSG)) - - def testSendmsgDataGenerator(self): - # Send from buffer obtained from a generator (not a sequence). - self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) - - def _testSendmsgDataGenerator(self): - self.assertEqual(self.sendmsgToServer((o for o in [MSG])), - len(MSG)) - - def testSendmsgAncillaryGenerator(self): - # Gather (empty) ancillary data from a generator. - self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) - - def _testSendmsgAncillaryGenerator(self): - self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])), - len(MSG)) - - def testSendmsgArray(self): - # Send data from an array instead of the usual bytes object. - self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) - - def _testSendmsgArray(self): - self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]), - len(MSG)) - - def testSendmsgGather(self): - # Send message data from more than one buffer (gather write). - self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) - - def _testSendmsgGather(self): - self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) - - def testSendmsgBadArgs(self): - # Check that sendmsg() rejects invalid arguments. - self.assertEqual(self.serv_sock.recv(1000), b"done") - - def _testSendmsgBadArgs(self): - self.assertRaises(TypeError, self.cli_sock.sendmsg) - self.assertRaises(TypeError, self.sendmsgToServer, - b"not in an iterable") - self.assertRaises(TypeError, self.sendmsgToServer, - object()) - self.assertRaises(TypeError, self.sendmsgToServer, - [object()]) - self.assertRaises(TypeError, self.sendmsgToServer, - [MSG, object()]) - self.assertRaises(TypeError, self.sendmsgToServer, - [MSG], object()) - self.assertRaises(TypeError, self.sendmsgToServer, - [MSG], [], object()) - self.assertRaises(TypeError, self.sendmsgToServer, - [MSG], [], 0, object()) - self.sendToServer(b"done") - - def testSendmsgBadCmsg(self): - # Check that invalid ancillary data items are rejected. - self.assertEqual(self.serv_sock.recv(1000), b"done") - - def _testSendmsgBadCmsg(self): - self.assertRaises(TypeError, self.sendmsgToServer, - [MSG], [object()]) - self.assertRaises(TypeError, self.sendmsgToServer, - [MSG], [(object(), 0, b"data")]) - self.assertRaises(TypeError, self.sendmsgToServer, - [MSG], [(0, object(), b"data")]) - self.assertRaises(TypeError, self.sendmsgToServer, - [MSG], [(0, 0, object())]) - self.assertRaises(TypeError, self.sendmsgToServer, - [MSG], [(0, 0)]) - self.assertRaises(TypeError, self.sendmsgToServer, - [MSG], [(0, 0, b"data", 42)]) - self.sendToServer(b"done") - - @requireAttrs(socket, "CMSG_SPACE") - def testSendmsgBadMultiCmsg(self): - # Check that invalid ancillary data items are rejected when - # more than one item is present. - self.assertEqual(self.serv_sock.recv(1000), b"done") - - @testSendmsgBadMultiCmsg.client_skip - def _testSendmsgBadMultiCmsg(self): - self.assertRaises(TypeError, self.sendmsgToServer, - [MSG], [0, 0, b""]) - self.assertRaises(TypeError, self.sendmsgToServer, - [MSG], [(0, 0, b""), object()]) - self.sendToServer(b"done") - - def testSendmsgExcessCmsgReject(self): - # Check that sendmsg() rejects excess ancillary data items - # when the number that can be sent is limited. - self.assertEqual(self.serv_sock.recv(1000), b"done") - - def _testSendmsgExcessCmsgReject(self): - if not hasattr(socket, "CMSG_SPACE"): - # Can only send one item - with self.assertRaises(OSError) as cm: - self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")]) - self.assertIsNone(cm.exception.errno) - self.sendToServer(b"done") - - def testSendmsgAfterClose(self): - # Check that sendmsg() fails on a closed socket. - pass - - def _testSendmsgAfterClose(self): - self.cli_sock.close() - self.assertRaises(OSError, self.sendmsgToServer, [MSG]) - - -class SendmsgStreamTests(SendmsgTests): - # Tests for sendmsg() which require a stream socket and do not - # involve recvmsg() or recvmsg_into(). - - def testSendmsgExplicitNoneAddr(self): - # Check that peer address can be specified as None. - self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) - - def _testSendmsgExplicitNoneAddr(self): - self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG)) - - def testSendmsgTimeout(self): - # Check that timeout works with sendmsg(). - self.assertEqual(self.serv_sock.recv(512), b"a"*512) - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - - def _testSendmsgTimeout(self): - try: - self.cli_sock.settimeout(0.03) - try: - while True: - self.sendmsgToServer([b"a"*512]) - except socket.timeout: - pass - except OSError as exc: - if exc.errno != errno.ENOMEM: - raise - # bpo-33937 the test randomly fails on Travis CI with - # "OSError: [Errno 12] Cannot allocate memory" - else: - self.fail("socket.timeout not raised") - finally: - self.misc_event.set() - - # XXX: would be nice to have more tests for sendmsg flags argument. - - # Linux supports MSG_DONTWAIT when sending, but in general, it - # only works when receiving. Could add other platforms if they - # support it too. - @skipWithClientIf(sys.platform not in {"linux"}, - "MSG_DONTWAIT not known to work on this platform when " - "sending") - def testSendmsgDontWait(self): - # Check that MSG_DONTWAIT in flags causes non-blocking behaviour. - self.assertEqual(self.serv_sock.recv(512), b"a"*512) - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - - @testSendmsgDontWait.client_skip - def _testSendmsgDontWait(self): - try: - with self.assertRaises(OSError) as cm: - while True: - self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT) - # bpo-33937: catch also ENOMEM, the test randomly fails on Travis CI - # with "OSError: [Errno 12] Cannot allocate memory" - self.assertIn(cm.exception.errno, - (errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOMEM)) - finally: - self.misc_event.set() - - -class SendmsgConnectionlessTests(SendmsgTests): - # Tests for sendmsg() which require a connectionless-mode - # (e.g. datagram) socket, and do not involve recvmsg() or - # recvmsg_into(). - - def testSendmsgNoDestAddr(self): - # Check that sendmsg() fails when no destination address is - # given for unconnected socket. - pass - - def _testSendmsgNoDestAddr(self): - self.assertRaises(OSError, self.cli_sock.sendmsg, - [MSG]) - self.assertRaises(OSError, self.cli_sock.sendmsg, - [MSG], [], 0, None) - - -class RecvmsgGenericTests(SendrecvmsgBase): - # Tests for recvmsg() which can also be emulated using - # recvmsg_into(), and can use any socket type. - - def testRecvmsg(self): - # Receive a simple message with recvmsg[_into](). - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True) - - def _testRecvmsg(self): - self.sendToServer(MSG) - - def testRecvmsgExplicitDefaults(self): - # Test recvmsg[_into]() with default arguments provided explicitly. - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG), 0, 0) - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True) - - def _testRecvmsgExplicitDefaults(self): - self.sendToServer(MSG) - - def testRecvmsgShorter(self): - # Receive a message smaller than buffer. - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG) + 42) - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True) - - def _testRecvmsgShorter(self): - self.sendToServer(MSG) - - def testRecvmsgTrunc(self): - # Receive part of message, check for truncation indicators. - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG) - 3) - self.assertEqual(msg, MSG[:-3]) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=False) - - def _testRecvmsgTrunc(self): - self.sendToServer(MSG) - - def testRecvmsgShortAncillaryBuf(self): - # Test ancillary data buffer too small to hold any ancillary data. - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG), 1) - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True) - - def _testRecvmsgShortAncillaryBuf(self): - self.sendToServer(MSG) - - def testRecvmsgLongAncillaryBuf(self): - # Test large ancillary data buffer. - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG), 10240) - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True) - - def _testRecvmsgLongAncillaryBuf(self): - self.sendToServer(MSG) - - def testRecvmsgAfterClose(self): - # Check that recvmsg[_into]() fails on a closed socket. - self.serv_sock.close() - self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024) - - def _testRecvmsgAfterClose(self): - pass - - def testRecvmsgTimeout(self): - # Check that timeout works. - try: - self.serv_sock.settimeout(0.03) - self.assertRaises(socket.timeout, - self.doRecvmsg, self.serv_sock, len(MSG)) - finally: - self.misc_event.set() - - def _testRecvmsgTimeout(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - - @requireAttrs(socket, "MSG_PEEK") - def testRecvmsgPeek(self): - # Check that MSG_PEEK in flags enables examination of pending - # data without consuming it. - - # Receive part of data with MSG_PEEK. - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG) - 3, 0, - socket.MSG_PEEK) - self.assertEqual(msg, MSG[:-3]) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - # Ignoring MSG_TRUNC here (so this test is the same for stream - # and datagram sockets). Some wording in POSIX seems to - # suggest that it needn't be set when peeking, but that may - # just be a slip. - self.checkFlags(flags, eor=False, - ignore=getattr(socket, "MSG_TRUNC", 0)) - - # Receive all data with MSG_PEEK. - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG), 0, - socket.MSG_PEEK) - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True) - - # Check that the same data can still be received normally. - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True) - - @testRecvmsgPeek.client_skip - def _testRecvmsgPeek(self): - self.sendToServer(MSG) - - @requireAttrs(socket.socket, "sendmsg") - def testRecvmsgFromSendmsg(self): - # Test receiving with recvmsg[_into]() when message is sent - # using sendmsg(). - self.serv_sock.settimeout(self.fail_timeout) - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True) - - @testRecvmsgFromSendmsg.client_skip - def _testRecvmsgFromSendmsg(self): - self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) - - -class RecvmsgGenericStreamTests(RecvmsgGenericTests): - # Tests which require a stream socket and can use either recvmsg() - # or recvmsg_into(). - - def testRecvmsgEOF(self): - # Receive end-of-stream indicator (b"", peer socket closed). - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) - self.assertEqual(msg, b"") - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=None) # Might not have end-of-record marker - - def _testRecvmsgEOF(self): - self.cli_sock.close() - - def testRecvmsgOverflow(self): - # Receive a message in more than one chunk. - seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG) - 3) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=False) - - seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True) - - msg = seg1 + seg2 - self.assertEqual(msg, MSG) - - def _testRecvmsgOverflow(self): - self.sendToServer(MSG) - - -class RecvmsgTests(RecvmsgGenericTests): - # Tests for recvmsg() which can use any socket type. - - def testRecvmsgBadArgs(self): - # Check that recvmsg() rejects invalid arguments. - self.assertRaises(TypeError, self.serv_sock.recvmsg) - self.assertRaises(ValueError, self.serv_sock.recvmsg, - -1, 0, 0) - self.assertRaises(ValueError, self.serv_sock.recvmsg, - len(MSG), -1, 0) - self.assertRaises(TypeError, self.serv_sock.recvmsg, - [bytearray(10)], 0, 0) - self.assertRaises(TypeError, self.serv_sock.recvmsg, - object(), 0, 0) - self.assertRaises(TypeError, self.serv_sock.recvmsg, - len(MSG), object(), 0) - self.assertRaises(TypeError, self.serv_sock.recvmsg, - len(MSG), 0, object()) - - msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0) - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True) - - def _testRecvmsgBadArgs(self): - self.sendToServer(MSG) - - -class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests): - # Tests for recvmsg_into() which can use any socket type. - - def testRecvmsgIntoBadArgs(self): - # Check that recvmsg_into() rejects invalid arguments. - buf = bytearray(len(MSG)) - self.assertRaises(TypeError, self.serv_sock.recvmsg_into) - self.assertRaises(TypeError, self.serv_sock.recvmsg_into, - len(MSG), 0, 0) - self.assertRaises(TypeError, self.serv_sock.recvmsg_into, - buf, 0, 0) - self.assertRaises(TypeError, self.serv_sock.recvmsg_into, - [object()], 0, 0) - self.assertRaises(TypeError, self.serv_sock.recvmsg_into, - [b"I'm not writable"], 0, 0) - self.assertRaises(TypeError, self.serv_sock.recvmsg_into, - [buf, object()], 0, 0) - self.assertRaises(ValueError, self.serv_sock.recvmsg_into, - [buf], -1, 0) - self.assertRaises(TypeError, self.serv_sock.recvmsg_into, - [buf], object(), 0) - self.assertRaises(TypeError, self.serv_sock.recvmsg_into, - [buf], 0, object()) - - nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0) - self.assertEqual(nbytes, len(MSG)) - self.assertEqual(buf, bytearray(MSG)) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True) - - def _testRecvmsgIntoBadArgs(self): - self.sendToServer(MSG) - - def testRecvmsgIntoGenerator(self): - # Receive into buffer obtained from a generator (not a sequence). - buf = bytearray(len(MSG)) - nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( - (o for o in [buf])) - self.assertEqual(nbytes, len(MSG)) - self.assertEqual(buf, bytearray(MSG)) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True) - - def _testRecvmsgIntoGenerator(self): - self.sendToServer(MSG) - - def testRecvmsgIntoArray(self): - # Receive into an array rather than the usual bytearray. - buf = array.array("B", [0] * len(MSG)) - nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf]) - self.assertEqual(nbytes, len(MSG)) - self.assertEqual(buf.tobytes(), MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True) - - def _testRecvmsgIntoArray(self): - self.sendToServer(MSG) - - def testRecvmsgIntoScatter(self): - # Receive into multiple buffers (scatter write). - b1 = bytearray(b"----") - b2 = bytearray(b"0123456789") - b3 = bytearray(b"--------------") - nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( - [b1, memoryview(b2)[2:9], b3]) - self.assertEqual(nbytes, len(b"Mary had a little lamb")) - self.assertEqual(b1, bytearray(b"Mary")) - self.assertEqual(b2, bytearray(b"01 had a 9")) - self.assertEqual(b3, bytearray(b"little lamb---")) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True) - - def _testRecvmsgIntoScatter(self): - self.sendToServer(b"Mary had a little lamb") - - -class CmsgMacroTests(unittest.TestCase): - # Test the functions CMSG_LEN() and CMSG_SPACE(). Tests - # assumptions used by sendmsg() and recvmsg[_into](), which share - # code with these functions. - - # Match the definition in socketmodule.c - try: - import _testcapi - except ImportError: - socklen_t_limit = 0x7fffffff - else: - socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX) - - @requireAttrs(socket, "CMSG_LEN") - def testCMSG_LEN(self): - # Test CMSG_LEN() with various valid and invalid values, - # checking the assumptions used by recvmsg() and sendmsg(). - toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1 - values = list(range(257)) + list(range(toobig - 257, toobig)) - - # struct cmsghdr has at least three members, two of which are ints - self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2) - for n in values: - ret = socket.CMSG_LEN(n) - # This is how recvmsg() calculates the data size - self.assertEqual(ret - socket.CMSG_LEN(0), n) - self.assertLessEqual(ret, self.socklen_t_limit) - - self.assertRaises(OverflowError, socket.CMSG_LEN, -1) - # sendmsg() shares code with these functions, and requires - # that it reject values over the limit. - self.assertRaises(OverflowError, socket.CMSG_LEN, toobig) - self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize) - - @requireAttrs(socket, "CMSG_SPACE") - def testCMSG_SPACE(self): - # Test CMSG_SPACE() with various valid and invalid values, - # checking the assumptions used by sendmsg(). - toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1 - values = list(range(257)) + list(range(toobig - 257, toobig)) - - last = socket.CMSG_SPACE(0) - # struct cmsghdr has at least three members, two of which are ints - self.assertGreater(last, array.array("i").itemsize * 2) - for n in values: - ret = socket.CMSG_SPACE(n) - self.assertGreaterEqual(ret, last) - self.assertGreaterEqual(ret, socket.CMSG_LEN(n)) - self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0)) - self.assertLessEqual(ret, self.socklen_t_limit) - last = ret - - self.assertRaises(OverflowError, socket.CMSG_SPACE, -1) - # sendmsg() shares code with these functions, and requires - # that it reject values over the limit. - self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig) - self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize) - - -class SCMRightsTest(SendrecvmsgServerTimeoutBase): - # Tests for file descriptor passing on Unix-domain sockets. - - # Invalid file descriptor value that's unlikely to evaluate to a - # real FD even if one of its bytes is replaced with a different - # value (which shouldn't actually happen). - badfd = -0x5555 - - def newFDs(self, n): - # Return a list of n file descriptors for newly-created files - # containing their list indices as ASCII numbers. - fds = [] - for i in range(n): - fd, path = tempfile.mkstemp() - self.addCleanup(os.unlink, path) - self.addCleanup(os.close, fd) - os.write(fd, str(i).encode()) - fds.append(fd) - return fds - - def checkFDs(self, fds): - # Check that the file descriptors in the given list contain - # their correct list indices as ASCII numbers. - for n, fd in enumerate(fds): - os.lseek(fd, 0, os.SEEK_SET) - self.assertEqual(os.read(fd, 1024), str(n).encode()) - - def registerRecvmsgResult(self, result): - self.addCleanup(self.closeRecvmsgFDs, result) - - def closeRecvmsgFDs(self, recvmsg_result): - # Close all file descriptors specified in the ancillary data - # of the given return value from recvmsg() or recvmsg_into(). - for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]: - if (cmsg_level == socket.SOL_SOCKET and - cmsg_type == socket.SCM_RIGHTS): - fds = array.array("i") - fds.frombytes(cmsg_data[: - len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) - for fd in fds: - os.close(fd) - - def createAndSendFDs(self, n): - # Send n new file descriptors created by newFDs() to the - # server, with the constant MSG as the non-ancillary data. - self.assertEqual( - self.sendmsgToServer([MSG], - [(socket.SOL_SOCKET, - socket.SCM_RIGHTS, - array.array("i", self.newFDs(n)))]), - len(MSG)) - - def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0): - # Check that constant MSG was received with numfds file - # descriptors in a maximum of maxcmsgs control messages (which - # must contain only complete integers). By default, check - # that MSG_CTRUNC is unset, but ignore any flags in - # ignoreflags. - msg, ancdata, flags, addr = result - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, - ignore=ignoreflags) - - self.assertIsInstance(ancdata, list) - self.assertLessEqual(len(ancdata), maxcmsgs) - fds = array.array("i") - for item in ancdata: - self.assertIsInstance(item, tuple) - cmsg_level, cmsg_type, cmsg_data = item - self.assertEqual(cmsg_level, socket.SOL_SOCKET) - self.assertEqual(cmsg_type, socket.SCM_RIGHTS) - self.assertIsInstance(cmsg_data, bytes) - self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0) - fds.frombytes(cmsg_data) - - self.assertEqual(len(fds), numfds) - self.checkFDs(fds) - - def testFDPassSimple(self): - # Pass a single FD (array read from bytes object). - self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock, - len(MSG), 10240)) - - def _testFDPassSimple(self): - self.assertEqual( - self.sendmsgToServer( - [MSG], - [(socket.SOL_SOCKET, - socket.SCM_RIGHTS, - array.array("i", self.newFDs(1)).tobytes())]), - len(MSG)) - - def testMultipleFDPass(self): - # Pass multiple FDs in a single array. - self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock, - len(MSG), 10240)) - - def _testMultipleFDPass(self): - self.createAndSendFDs(4) - - @requireAttrs(socket, "CMSG_SPACE") - def testFDPassCMSG_SPACE(self): - # Test using CMSG_SPACE() to calculate ancillary buffer size. - self.checkRecvmsgFDs( - 4, self.doRecvmsg(self.serv_sock, len(MSG), - socket.CMSG_SPACE(4 * SIZEOF_INT))) - - @testFDPassCMSG_SPACE.client_skip - def _testFDPassCMSG_SPACE(self): - self.createAndSendFDs(4) - - def testFDPassCMSG_LEN(self): - # Test using CMSG_LEN() to calculate ancillary buffer size. - self.checkRecvmsgFDs(1, - self.doRecvmsg(self.serv_sock, len(MSG), - socket.CMSG_LEN(4 * SIZEOF_INT)), - # RFC 3542 says implementations may set - # MSG_CTRUNC if there isn't enough space - # for trailing padding. - ignoreflags=socket.MSG_CTRUNC) - - def _testFDPassCMSG_LEN(self): - self.createAndSendFDs(1) - - @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") - @unittest.skipIf(AIX, "skipping, see issue #22397") - @requireAttrs(socket, "CMSG_SPACE") - def testFDPassSeparate(self): - # Pass two FDs in two separate arrays. Arrays may be combined - # into a single control message by the OS. - self.checkRecvmsgFDs(2, - self.doRecvmsg(self.serv_sock, len(MSG), 10240), - maxcmsgs=2) - - @testFDPassSeparate.client_skip - @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") - @unittest.skipIf(AIX, "skipping, see issue #22397") - def _testFDPassSeparate(self): - fd0, fd1 = self.newFDs(2) - self.assertEqual( - self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, - socket.SCM_RIGHTS, - array.array("i", [fd0])), - (socket.SOL_SOCKET, - socket.SCM_RIGHTS, - array.array("i", [fd1]))]), - len(MSG)) - - @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") - @unittest.skipIf(AIX, "skipping, see issue #22397") - @requireAttrs(socket, "CMSG_SPACE") - def testFDPassSeparateMinSpace(self): - # Pass two FDs in two separate arrays, receiving them into the - # minimum space for two arrays. - num_fds = 2 - self.checkRecvmsgFDs(num_fds, - self.doRecvmsg(self.serv_sock, len(MSG), - socket.CMSG_SPACE(SIZEOF_INT) + - socket.CMSG_LEN(SIZEOF_INT * num_fds)), - maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC) - - @testFDPassSeparateMinSpace.client_skip - @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") - @unittest.skipIf(AIX, "skipping, see issue #22397") - def _testFDPassSeparateMinSpace(self): - fd0, fd1 = self.newFDs(2) - self.assertEqual( - self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, - socket.SCM_RIGHTS, - array.array("i", [fd0])), - (socket.SOL_SOCKET, - socket.SCM_RIGHTS, - array.array("i", [fd1]))]), - len(MSG)) - - def sendAncillaryIfPossible(self, msg, ancdata): - # Try to send msg and ancdata to server, but if the system - # call fails, just send msg with no ancillary data. - try: - nbytes = self.sendmsgToServer([msg], ancdata) - except OSError as e: - # Check that it was the system call that failed - self.assertIsInstance(e.errno, int) - nbytes = self.sendmsgToServer([msg]) - self.assertEqual(nbytes, len(msg)) - - @unittest.skipIf(sys.platform == "darwin", "see issue #24725") - def testFDPassEmpty(self): - # Try to pass an empty FD array. Can receive either no array - # or an empty array. - self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock, - len(MSG), 10240), - ignoreflags=socket.MSG_CTRUNC) - - def _testFDPassEmpty(self): - self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET, - socket.SCM_RIGHTS, - b"")]) - - def testFDPassPartialInt(self): - # Try to pass a truncated FD array. - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG), 10240) - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) - self.assertLessEqual(len(ancdata), 1) - for cmsg_level, cmsg_type, cmsg_data in ancdata: - self.assertEqual(cmsg_level, socket.SOL_SOCKET) - self.assertEqual(cmsg_type, socket.SCM_RIGHTS) - self.assertLess(len(cmsg_data), SIZEOF_INT) - - def _testFDPassPartialInt(self): - self.sendAncillaryIfPossible( - MSG, - [(socket.SOL_SOCKET, - socket.SCM_RIGHTS, - array.array("i", [self.badfd]).tobytes()[:-1])]) - - @requireAttrs(socket, "CMSG_SPACE") - def testFDPassPartialIntInMiddle(self): - # Try to pass two FD arrays, the first of which is truncated. - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG), 10240) - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) - self.assertLessEqual(len(ancdata), 2) - fds = array.array("i") - # Arrays may have been combined in a single control message - for cmsg_level, cmsg_type, cmsg_data in ancdata: - self.assertEqual(cmsg_level, socket.SOL_SOCKET) - self.assertEqual(cmsg_type, socket.SCM_RIGHTS) - fds.frombytes(cmsg_data[: - len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) - self.assertLessEqual(len(fds), 2) - self.checkFDs(fds) - - @testFDPassPartialIntInMiddle.client_skip - def _testFDPassPartialIntInMiddle(self): - fd0, fd1 = self.newFDs(2) - self.sendAncillaryIfPossible( - MSG, - [(socket.SOL_SOCKET, - socket.SCM_RIGHTS, - array.array("i", [fd0, self.badfd]).tobytes()[:-1]), - (socket.SOL_SOCKET, - socket.SCM_RIGHTS, - array.array("i", [fd1]))]) - - def checkTruncatedHeader(self, result, ignoreflags=0): - # Check that no ancillary data items are returned when data is - # truncated inside the cmsghdr structure. - msg, ancdata, flags, addr = result - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, - ignore=ignoreflags) - - def testCmsgTruncNoBufSize(self): - # Check that no ancillary data is received when no buffer size - # is specified. - self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)), - # BSD seems to set MSG_CTRUNC only - # if an item has been partially - # received. - ignoreflags=socket.MSG_CTRUNC) - - def _testCmsgTruncNoBufSize(self): - self.createAndSendFDs(1) - - def testCmsgTrunc0(self): - # Check that no ancillary data is received when buffer size is 0. - self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0), - ignoreflags=socket.MSG_CTRUNC) - - def _testCmsgTrunc0(self): - self.createAndSendFDs(1) - - # Check that no ancillary data is returned for various non-zero - # (but still too small) buffer sizes. - - def testCmsgTrunc1(self): - self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1)) - - def _testCmsgTrunc1(self): - self.createAndSendFDs(1) - - def testCmsgTrunc2Int(self): - # The cmsghdr structure has at least three members, two of - # which are ints, so we still shouldn't see any ancillary - # data. - self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), - SIZEOF_INT * 2)) - - def _testCmsgTrunc2Int(self): - self.createAndSendFDs(1) - - def testCmsgTruncLen0Minus1(self): - self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), - socket.CMSG_LEN(0) - 1)) - - def _testCmsgTruncLen0Minus1(self): - self.createAndSendFDs(1) - - # The following tests try to truncate the control message in the - # middle of the FD array. - - def checkTruncatedArray(self, ancbuf, maxdata, mindata=0): - # Check that file descriptor data is truncated to between - # mindata and maxdata bytes when received with buffer size - # ancbuf, and that any complete file descriptor numbers are - # valid. - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG), ancbuf) - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) - - if mindata == 0 and ancdata == []: - return - self.assertEqual(len(ancdata), 1) - cmsg_level, cmsg_type, cmsg_data = ancdata[0] - self.assertEqual(cmsg_level, socket.SOL_SOCKET) - self.assertEqual(cmsg_type, socket.SCM_RIGHTS) - self.assertGreaterEqual(len(cmsg_data), mindata) - self.assertLessEqual(len(cmsg_data), maxdata) - fds = array.array("i") - fds.frombytes(cmsg_data[: - len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) - self.checkFDs(fds) - - def testCmsgTruncLen0(self): - self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0) - - def _testCmsgTruncLen0(self): - self.createAndSendFDs(1) - - def testCmsgTruncLen0Plus1(self): - self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1) - - def _testCmsgTruncLen0Plus1(self): - self.createAndSendFDs(2) - - def testCmsgTruncLen1(self): - self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT), - maxdata=SIZEOF_INT) - - def _testCmsgTruncLen1(self): - self.createAndSendFDs(2) - - def testCmsgTruncLen2Minus1(self): - self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1, - maxdata=(2 * SIZEOF_INT) - 1) - - def _testCmsgTruncLen2Minus1(self): - self.createAndSendFDs(2) - - -class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase): - # Test sendmsg() and recvmsg[_into]() using the ancillary data - # features of the RFC 3542 Advanced Sockets API for IPv6. - # Currently we can only handle certain data items (e.g. traffic - # class, hop limit, MTU discovery and fragmentation settings) - # without resorting to unportable means such as the struct module, - # but the tests here are aimed at testing the ancillary data - # handling in sendmsg() and recvmsg() rather than the IPv6 API - # itself. - - # Test value to use when setting hop limit of packet - hop_limit = 2 - - # Test value to use when setting traffic class of packet. - # -1 means "use kernel default". - traffic_class = -1 - - def ancillaryMapping(self, ancdata): - # Given ancillary data list ancdata, return a mapping from - # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data. - # Check that no (level, type) pair appears more than once. - d = {} - for cmsg_level, cmsg_type, cmsg_data in ancdata: - self.assertNotIn((cmsg_level, cmsg_type), d) - d[(cmsg_level, cmsg_type)] = cmsg_data - return d - - def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0): - # Receive hop limit into ancbufsize bytes of ancillary data - # space. Check that data is MSG, ancillary data is not - # truncated (but ignore any flags in ignoreflags), and hop - # limit is between 0 and maxhop inclusive. - self.serv_sock.setsockopt(socket.IPPROTO_IPV6, - socket.IPV6_RECVHOPLIMIT, 1) - self.misc_event.set() - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG), ancbufsize) - - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, - ignore=ignoreflags) - - self.assertEqual(len(ancdata), 1) - self.assertIsInstance(ancdata[0], tuple) - cmsg_level, cmsg_type, cmsg_data = ancdata[0] - self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) - self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) - self.assertIsInstance(cmsg_data, bytes) - self.assertEqual(len(cmsg_data), SIZEOF_INT) - a = array.array("i") - a.frombytes(cmsg_data) - self.assertGreaterEqual(a[0], 0) - self.assertLessEqual(a[0], maxhop) - - @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") - def testRecvHopLimit(self): - # Test receiving the packet hop limit as ancillary data. - self.checkHopLimit(ancbufsize=10240) - - @testRecvHopLimit.client_skip - def _testRecvHopLimit(self): - # Need to wait until server has asked to receive ancillary - # data, as implementations are not required to buffer it - # otherwise. - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") - def testRecvHopLimitCMSG_SPACE(self): - # Test receiving hop limit, using CMSG_SPACE to calculate buffer size. - self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT)) - - @testRecvHopLimitCMSG_SPACE.client_skip - def _testRecvHopLimitCMSG_SPACE(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - # Could test receiving into buffer sized using CMSG_LEN, but RFC - # 3542 says portable applications must provide space for trailing - # padding. Implementations may set MSG_CTRUNC if there isn't - # enough space for the padding. - - @requireAttrs(socket.socket, "sendmsg") - @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") - def testSetHopLimit(self): - # Test setting hop limit on outgoing packet and receiving it - # at the other end. - self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit) - - @testSetHopLimit.client_skip - def _testSetHopLimit(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.assertEqual( - self.sendmsgToServer([MSG], - [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, - array.array("i", [self.hop_limit]))]), - len(MSG)) - - def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255, - ignoreflags=0): - # Receive traffic class and hop limit into ancbufsize bytes of - # ancillary data space. Check that data is MSG, ancillary - # data is not truncated (but ignore any flags in ignoreflags), - # and traffic class and hop limit are in range (hop limit no - # more than maxhop). - self.serv_sock.setsockopt(socket.IPPROTO_IPV6, - socket.IPV6_RECVHOPLIMIT, 1) - self.serv_sock.setsockopt(socket.IPPROTO_IPV6, - socket.IPV6_RECVTCLASS, 1) - self.misc_event.set() - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG), ancbufsize) - - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, - ignore=ignoreflags) - self.assertEqual(len(ancdata), 2) - ancmap = self.ancillaryMapping(ancdata) - - tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)] - self.assertEqual(len(tcdata), SIZEOF_INT) - a = array.array("i") - a.frombytes(tcdata) - self.assertGreaterEqual(a[0], 0) - self.assertLessEqual(a[0], 255) - - hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)] - self.assertEqual(len(hldata), SIZEOF_INT) - a = array.array("i") - a.frombytes(hldata) - self.assertGreaterEqual(a[0], 0) - self.assertLessEqual(a[0], maxhop) - - @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", - "IPV6_RECVTCLASS", "IPV6_TCLASS") - def testRecvTrafficClassAndHopLimit(self): - # Test receiving traffic class and hop limit as ancillary data. - self.checkTrafficClassAndHopLimit(ancbufsize=10240) - - @testRecvTrafficClassAndHopLimit.client_skip - def _testRecvTrafficClassAndHopLimit(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", - "IPV6_RECVTCLASS", "IPV6_TCLASS") - def testRecvTrafficClassAndHopLimitCMSG_SPACE(self): - # Test receiving traffic class and hop limit, using - # CMSG_SPACE() to calculate buffer size. - self.checkTrafficClassAndHopLimit( - ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2) - - @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip - def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - @requireAttrs(socket.socket, "sendmsg") - @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", - "IPV6_RECVTCLASS", "IPV6_TCLASS") - def testSetTrafficClassAndHopLimit(self): - # Test setting traffic class and hop limit on outgoing packet, - # and receiving them at the other end. - self.checkTrafficClassAndHopLimit(ancbufsize=10240, - maxhop=self.hop_limit) - - @testSetTrafficClassAndHopLimit.client_skip - def _testSetTrafficClassAndHopLimit(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.assertEqual( - self.sendmsgToServer([MSG], - [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, - array.array("i", [self.traffic_class])), - (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, - array.array("i", [self.hop_limit]))]), - len(MSG)) - - @requireAttrs(socket.socket, "sendmsg") - @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", - "IPV6_RECVTCLASS", "IPV6_TCLASS") - def testOddCmsgSize(self): - # Try to send ancillary data with first item one byte too - # long. Fall back to sending with correct size if this fails, - # and check that second item was handled correctly. - self.checkTrafficClassAndHopLimit(ancbufsize=10240, - maxhop=self.hop_limit) - - @testOddCmsgSize.client_skip - def _testOddCmsgSize(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - try: - nbytes = self.sendmsgToServer( - [MSG], - [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, - array.array("i", [self.traffic_class]).tobytes() + b"\x00"), - (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, - array.array("i", [self.hop_limit]))]) - except OSError as e: - self.assertIsInstance(e.errno, int) - nbytes = self.sendmsgToServer( - [MSG], - [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, - array.array("i", [self.traffic_class])), - (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, - array.array("i", [self.hop_limit]))]) - self.assertEqual(nbytes, len(MSG)) - - # Tests for proper handling of truncated ancillary data - - def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0): - # Receive hop limit into ancbufsize bytes of ancillary data - # space, which should be too small to contain the ancillary - # data header (if ancbufsize is None, pass no second argument - # to recvmsg()). Check that data is MSG, MSG_CTRUNC is set - # (unless included in ignoreflags), and no ancillary data is - # returned. - self.serv_sock.setsockopt(socket.IPPROTO_IPV6, - socket.IPV6_RECVHOPLIMIT, 1) - self.misc_event.set() - args = () if ancbufsize is None else (ancbufsize,) - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG), *args) - - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.assertEqual(ancdata, []) - self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, - ignore=ignoreflags) - - @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") - def testCmsgTruncNoBufSize(self): - # Check that no ancillary data is received when no ancillary - # buffer size is provided. - self.checkHopLimitTruncatedHeader(ancbufsize=None, - # BSD seems to set - # MSG_CTRUNC only if an item - # has been partially - # received. - ignoreflags=socket.MSG_CTRUNC) - - @testCmsgTruncNoBufSize.client_skip - def _testCmsgTruncNoBufSize(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") - def testSingleCmsgTrunc0(self): - # Check that no ancillary data is received when ancillary - # buffer size is zero. - self.checkHopLimitTruncatedHeader(ancbufsize=0, - ignoreflags=socket.MSG_CTRUNC) - - @testSingleCmsgTrunc0.client_skip - def _testSingleCmsgTrunc0(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - # Check that no ancillary data is returned for various non-zero - # (but still too small) buffer sizes. - - @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") - def testSingleCmsgTrunc1(self): - self.checkHopLimitTruncatedHeader(ancbufsize=1) - - @testSingleCmsgTrunc1.client_skip - def _testSingleCmsgTrunc1(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") - def testSingleCmsgTrunc2Int(self): - self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT) - - @testSingleCmsgTrunc2Int.client_skip - def _testSingleCmsgTrunc2Int(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") - def testSingleCmsgTruncLen0Minus1(self): - self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1) - - @testSingleCmsgTruncLen0Minus1.client_skip - def _testSingleCmsgTruncLen0Minus1(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") - def testSingleCmsgTruncInData(self): - # Test truncation of a control message inside its associated - # data. The message may be returned with its data truncated, - # or not returned at all. - self.serv_sock.setsockopt(socket.IPPROTO_IPV6, - socket.IPV6_RECVHOPLIMIT, 1) - self.misc_event.set() - msg, ancdata, flags, addr = self.doRecvmsg( - self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1) - - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) - - self.assertLessEqual(len(ancdata), 1) - if ancdata: - cmsg_level, cmsg_type, cmsg_data = ancdata[0] - self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) - self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) - self.assertLess(len(cmsg_data), SIZEOF_INT) - - @testSingleCmsgTruncInData.client_skip - def _testSingleCmsgTruncInData(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0): - # Receive traffic class and hop limit into ancbufsize bytes of - # ancillary data space, which should be large enough to - # contain the first item, but too small to contain the header - # of the second. Check that data is MSG, MSG_CTRUNC is set - # (unless included in ignoreflags), and only one ancillary - # data item is returned. - self.serv_sock.setsockopt(socket.IPPROTO_IPV6, - socket.IPV6_RECVHOPLIMIT, 1) - self.serv_sock.setsockopt(socket.IPPROTO_IPV6, - socket.IPV6_RECVTCLASS, 1) - self.misc_event.set() - msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, - len(MSG), ancbufsize) - - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, - ignore=ignoreflags) - - self.assertEqual(len(ancdata), 1) - cmsg_level, cmsg_type, cmsg_data = ancdata[0] - self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) - self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}) - self.assertEqual(len(cmsg_data), SIZEOF_INT) - a = array.array("i") - a.frombytes(cmsg_data) - self.assertGreaterEqual(a[0], 0) - self.assertLessEqual(a[0], 255) - - # Try the above test with various buffer sizes. - - @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", - "IPV6_RECVTCLASS", "IPV6_TCLASS") - def testSecondCmsgTrunc0(self): - self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT), - ignoreflags=socket.MSG_CTRUNC) - - @testSecondCmsgTrunc0.client_skip - def _testSecondCmsgTrunc0(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", - "IPV6_RECVTCLASS", "IPV6_TCLASS") - def testSecondCmsgTrunc1(self): - self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1) - - @testSecondCmsgTrunc1.client_skip - def _testSecondCmsgTrunc1(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", - "IPV6_RECVTCLASS", "IPV6_TCLASS") - def testSecondCmsgTrunc2Int(self): - self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + - 2 * SIZEOF_INT) - - @testSecondCmsgTrunc2Int.client_skip - def _testSecondCmsgTrunc2Int(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", - "IPV6_RECVTCLASS", "IPV6_TCLASS") - def testSecondCmsgTruncLen0Minus1(self): - self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + - socket.CMSG_LEN(0) - 1) - - @testSecondCmsgTruncLen0Minus1.client_skip - def _testSecondCmsgTruncLen0Minus1(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", - "IPV6_RECVTCLASS", "IPV6_TCLASS") - def testSecomdCmsgTruncInData(self): - # Test truncation of the second of two control messages inside - # its associated data. - self.serv_sock.setsockopt(socket.IPPROTO_IPV6, - socket.IPV6_RECVHOPLIMIT, 1) - self.serv_sock.setsockopt(socket.IPPROTO_IPV6, - socket.IPV6_RECVTCLASS, 1) - self.misc_event.set() - msg, ancdata, flags, addr = self.doRecvmsg( - self.serv_sock, len(MSG), - socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1) - - self.assertEqual(msg, MSG) - self.checkRecvmsgAddress(addr, self.cli_addr) - self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) - - cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT} - - cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) - self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) - cmsg_types.remove(cmsg_type) - self.assertEqual(len(cmsg_data), SIZEOF_INT) - a = array.array("i") - a.frombytes(cmsg_data) - self.assertGreaterEqual(a[0], 0) - self.assertLessEqual(a[0], 255) - - if ancdata: - cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) - self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) - cmsg_types.remove(cmsg_type) - self.assertLess(len(cmsg_data), SIZEOF_INT) - - self.assertEqual(ancdata, []) - - @testSecomdCmsgTruncInData.client_skip - def _testSecomdCmsgTruncInData(self): - self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) - self.sendToServer(MSG) - - -# Derive concrete test classes for different socket types. - -class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase, - SendrecvmsgConnectionlessBase, - ThreadedSocketTestMixin, UDPTestBase): - pass - -@requireAttrs(socket.socket, "sendmsg") -class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase): - pass - -@requireAttrs(socket.socket, "recvmsg") -class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase): - pass - -@requireAttrs(socket.socket, "recvmsg_into") -class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase): - pass - - -class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase, - SendrecvmsgConnectionlessBase, - ThreadedSocketTestMixin, UDP6TestBase): - - def checkRecvmsgAddress(self, addr1, addr2): - # Called to compare the received address with the address of - # the peer, ignoring scope ID - self.assertEqual(addr1[:-1], addr2[:-1]) - -@requireAttrs(socket.socket, "sendmsg") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') -@requireSocket("AF_INET6", "SOCK_DGRAM") -class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase): - pass - -@requireAttrs(socket.socket, "recvmsg") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') -@requireSocket("AF_INET6", "SOCK_DGRAM") -class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase): - pass - -@requireAttrs(socket.socket, "recvmsg_into") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') -@requireSocket("AF_INET6", "SOCK_DGRAM") -class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase): - pass - -@requireAttrs(socket.socket, "recvmsg") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') -@requireAttrs(socket, "IPPROTO_IPV6") -@requireSocket("AF_INET6", "SOCK_DGRAM") -class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest, - SendrecvmsgUDP6TestBase): - pass - -@requireAttrs(socket.socket, "recvmsg_into") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') -@requireAttrs(socket, "IPPROTO_IPV6") -@requireSocket("AF_INET6", "SOCK_DGRAM") -class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin, - RFC3542AncillaryTest, - SendrecvmsgUDP6TestBase): - pass - - -@unittest.skipUnless(HAVE_SOCKET_UDPLITE, - 'UDPLITE sockets required for this test.') -class SendrecvmsgUDPLITETestBase(SendrecvmsgDgramFlagsBase, - SendrecvmsgConnectionlessBase, - ThreadedSocketTestMixin, UDPLITETestBase): - pass - -@unittest.skipUnless(HAVE_SOCKET_UDPLITE, - 'UDPLITE sockets required for this test.') -@requireAttrs(socket.socket, "sendmsg") -class SendmsgUDPLITETest(SendmsgConnectionlessTests, SendrecvmsgUDPLITETestBase): - pass - -@unittest.skipUnless(HAVE_SOCKET_UDPLITE, - 'UDPLITE sockets required for this test.') -@requireAttrs(socket.socket, "recvmsg") -class RecvmsgUDPLITETest(RecvmsgTests, SendrecvmsgUDPLITETestBase): - pass - -@unittest.skipUnless(HAVE_SOCKET_UDPLITE, - 'UDPLITE sockets required for this test.') -@requireAttrs(socket.socket, "recvmsg_into") -class RecvmsgIntoUDPLITETest(RecvmsgIntoTests, SendrecvmsgUDPLITETestBase): - pass - - -@unittest.skipUnless(HAVE_SOCKET_UDPLITE, - 'UDPLITE sockets required for this test.') -class SendrecvmsgUDPLITE6TestBase(SendrecvmsgDgramFlagsBase, - SendrecvmsgConnectionlessBase, - ThreadedSocketTestMixin, UDPLITE6TestBase): - - def checkRecvmsgAddress(self, addr1, addr2): - # Called to compare the received address with the address of - # the peer, ignoring scope ID - self.assertEqual(addr1[:-1], addr2[:-1]) - -@requireAttrs(socket.socket, "sendmsg") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') -@unittest.skipUnless(HAVE_SOCKET_UDPLITE, - 'UDPLITE sockets required for this test.') -@requireSocket("AF_INET6", "SOCK_DGRAM") -class SendmsgUDPLITE6Test(SendmsgConnectionlessTests, SendrecvmsgUDPLITE6TestBase): - pass - -@requireAttrs(socket.socket, "recvmsg") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') -@unittest.skipUnless(HAVE_SOCKET_UDPLITE, - 'UDPLITE sockets required for this test.') -@requireSocket("AF_INET6", "SOCK_DGRAM") -class RecvmsgUDPLITE6Test(RecvmsgTests, SendrecvmsgUDPLITE6TestBase): - pass - -@requireAttrs(socket.socket, "recvmsg_into") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') -@unittest.skipUnless(HAVE_SOCKET_UDPLITE, - 'UDPLITE sockets required for this test.') -@requireSocket("AF_INET6", "SOCK_DGRAM") -class RecvmsgIntoUDPLITE6Test(RecvmsgIntoTests, SendrecvmsgUDPLITE6TestBase): - pass - -@requireAttrs(socket.socket, "recvmsg") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') -@unittest.skipUnless(HAVE_SOCKET_UDPLITE, - 'UDPLITE sockets required for this test.') -@requireAttrs(socket, "IPPROTO_IPV6") -@requireSocket("AF_INET6", "SOCK_DGRAM") -class RecvmsgRFC3542AncillaryUDPLITE6Test(RFC3542AncillaryTest, - SendrecvmsgUDPLITE6TestBase): - pass - -@requireAttrs(socket.socket, "recvmsg_into") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') -@unittest.skipUnless(HAVE_SOCKET_UDPLITE, - 'UDPLITE sockets required for this test.') -@requireAttrs(socket, "IPPROTO_IPV6") -@requireSocket("AF_INET6", "SOCK_DGRAM") -class RecvmsgIntoRFC3542AncillaryUDPLITE6Test(RecvmsgIntoMixin, - RFC3542AncillaryTest, - SendrecvmsgUDPLITE6TestBase): - pass - - -class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase, - ConnectedStreamTestMixin, TCPTestBase): - pass - -@requireAttrs(socket.socket, "sendmsg") -class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase): - pass - -@requireAttrs(socket.socket, "recvmsg") -class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests, - SendrecvmsgTCPTestBase): - pass - -@requireAttrs(socket.socket, "recvmsg_into") -class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, - SendrecvmsgTCPTestBase): - pass - - -class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase, - SendrecvmsgConnectedBase, - ConnectedStreamTestMixin, SCTPStreamBase): - pass - -@requireAttrs(socket.socket, "sendmsg") -@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") -@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") -class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase): - pass - -@requireAttrs(socket.socket, "recvmsg") -@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") -@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") -class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, - SendrecvmsgSCTPStreamTestBase): - - def testRecvmsgEOF(self): - try: - super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF() - except OSError as e: - if e.errno != errno.ENOTCONN: - raise - self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") - -@requireAttrs(socket.socket, "recvmsg_into") -@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") -@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") -class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, - SendrecvmsgSCTPStreamTestBase): - - def testRecvmsgEOF(self): - try: - super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF() - except OSError as e: - if e.errno != errno.ENOTCONN: - raise - self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") - - -class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase, - ConnectedStreamTestMixin, UnixStreamBase): - pass - -@requireAttrs(socket.socket, "sendmsg") -@requireAttrs(socket, "AF_UNIX") -class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase): - pass - -@requireAttrs(socket.socket, "recvmsg") -@requireAttrs(socket, "AF_UNIX") -class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, - SendrecvmsgUnixStreamTestBase): - pass - -@requireAttrs(socket.socket, "recvmsg_into") -@requireAttrs(socket, "AF_UNIX") -class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, - SendrecvmsgUnixStreamTestBase): - pass - -@requireAttrs(socket.socket, "sendmsg", "recvmsg") -@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") -class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase): - pass - -@requireAttrs(socket.socket, "sendmsg", "recvmsg_into") -@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") -class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest, - SendrecvmsgUnixStreamTestBase): - pass - - -# Test interrupting the interruptible send/receive methods with a -# signal when a timeout is set. These tests avoid having multiple -# threads alive during the test so that the OS cannot deliver the -# signal to the wrong one. - -class InterruptedTimeoutBase(unittest.TestCase): - # Base class for interrupted send/receive tests. Installs an - # empty handler for SIGALRM and removes it on teardown, along with - # any scheduled alarms. - - def setUp(self): - super().setUp() - orig_alrm_handler = signal.signal(signal.SIGALRM, - lambda signum, frame: 1 / 0) - self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler) - - # Timeout for socket operations - timeout = support.LOOPBACK_TIMEOUT - - # Provide setAlarm() method to schedule delivery of SIGALRM after - # given number of seconds, or cancel it if zero, and an - # appropriate time value to use. Use setitimer() if available. - if hasattr(signal, "setitimer"): - alarm_time = 0.05 - - def setAlarm(self, seconds): - signal.setitimer(signal.ITIMER_REAL, seconds) - else: - # Old systems may deliver the alarm up to one second early - alarm_time = 2 - - def setAlarm(self, seconds): - signal.alarm(seconds) - - -# Require siginterrupt() in order to ensure that system calls are -# interrupted by default. -@requireAttrs(signal, "siginterrupt") -@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), - "Don't have signal.alarm or signal.setitimer") -class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase): - # Test interrupting the recv*() methods with signals when a - # timeout is set. - - def setUp(self): - super().setUp() - self.serv.settimeout(self.timeout) - - def checkInterruptedRecv(self, func, *args, **kwargs): - # Check that func(*args, **kwargs) raises - # errno of EINTR when interrupted by a signal. - try: - self.setAlarm(self.alarm_time) - with self.assertRaises(ZeroDivisionError) as cm: - func(*args, **kwargs) - finally: - self.setAlarm(0) - - def testInterruptedRecvTimeout(self): - self.checkInterruptedRecv(self.serv.recv, 1024) - - def testInterruptedRecvIntoTimeout(self): - self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024)) - - def testInterruptedRecvfromTimeout(self): - self.checkInterruptedRecv(self.serv.recvfrom, 1024) - - def testInterruptedRecvfromIntoTimeout(self): - self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024)) - - @requireAttrs(socket.socket, "recvmsg") - def testInterruptedRecvmsgTimeout(self): - self.checkInterruptedRecv(self.serv.recvmsg, 1024) - - @requireAttrs(socket.socket, "recvmsg_into") - def testInterruptedRecvmsgIntoTimeout(self): - self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)]) - - -# Require siginterrupt() in order to ensure that system calls are -# interrupted by default. -@requireAttrs(signal, "siginterrupt") -@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), - "Don't have signal.alarm or signal.setitimer") -class InterruptedSendTimeoutTest(InterruptedTimeoutBase, - ThreadSafeCleanupTestCase, - SocketListeningTestMixin, TCPTestBase): - # Test interrupting the interruptible send*() methods with signals - # when a timeout is set. - - def setUp(self): - super().setUp() - self.serv_conn = self.newSocket() - self.addCleanup(self.serv_conn.close) - # Use a thread to complete the connection, but wait for it to - # terminate before running the test, so that there is only one - # thread to accept the signal. - cli_thread = threading.Thread(target=self.doConnect) - cli_thread.start() - self.cli_conn, addr = self.serv.accept() - self.addCleanup(self.cli_conn.close) - cli_thread.join() - self.serv_conn.settimeout(self.timeout) - - def doConnect(self): - self.serv_conn.connect(self.serv_addr) - - def checkInterruptedSend(self, func, *args, **kwargs): - # Check that func(*args, **kwargs), run in a loop, raises - # OSError with an errno of EINTR when interrupted by a - # signal. - try: - with self.assertRaises(ZeroDivisionError) as cm: - while True: - self.setAlarm(self.alarm_time) - func(*args, **kwargs) - finally: - self.setAlarm(0) - - # Issue #12958: The following tests have problems on OS X prior to 10.7 - @support.requires_mac_ver(10, 7) - def testInterruptedSendTimeout(self): - self.checkInterruptedSend(self.serv_conn.send, b"a"*512) - - @support.requires_mac_ver(10, 7) - def testInterruptedSendtoTimeout(self): - # Passing an actual address here as Python's wrapper for - # sendto() doesn't allow passing a zero-length one; POSIX - # requires that the address is ignored since the socket is - # connection-mode, however. - self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512, - self.serv_addr) - - @support.requires_mac_ver(10, 7) - @requireAttrs(socket.socket, "sendmsg") - def testInterruptedSendmsgTimeout(self): - self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512]) - - +@unittest.skipUnless(thread, 'Threading required for this test.') class TCPCloserTest(ThreadedTCPSocketTest): def testClose(self): @@ -4450,37 +918,20 @@ class TCPCloserTest(ThreadedTCPSocketTest): sd = self.cli read, write, err = select.select([sd], [], [], 1.0) self.assertEqual(read, [sd]) - self.assertEqual(sd.recv(1), b'') - - # Calling close() many times should be safe. - conn.close() - conn.close() + self.assertEqual(sd.recv(1), '') def _testClose(self): self.cli.connect((HOST, self.port)) time.sleep(1.0) - +@unittest.skipUnless(hasattr(socket, 'socketpair'), + 'test needs socket.socketpair()') +@unittest.skipUnless(thread, 'Threading required for this test.') class BasicSocketPairTest(SocketPairTest): def __init__(self, methodName='runTest'): SocketPairTest.__init__(self, methodName=methodName) - def _check_defaults(self, sock): - self.assertIsInstance(sock, socket.socket) - if hasattr(socket, 'AF_UNIX'): - self.assertEqual(sock.family, socket.AF_UNIX) - else: - self.assertEqual(sock.family, socket.AF_INET) - self.assertEqual(sock.type, socket.SOCK_STREAM) - self.assertEqual(sock.proto, 0) - - def _testDefaults(self): - self._check_defaults(self.cli) - - def testDefaults(self): - self._check_defaults(self.serv) - def testRecv(self): msg = self.serv.recv(1024) self.assertEqual(msg, MSG) @@ -4495,121 +946,54 @@ class BasicSocketPairTest(SocketPairTest): msg = self.cli.recv(1024) self.assertEqual(msg, MSG) - +@unittest.skipUnless(thread, 'Threading required for this test.') class NonBlockingTCPTests(ThreadedTCPSocketTest): def __init__(self, methodName='runTest'): self.event = threading.Event() ThreadedTCPSocketTest.__init__(self, methodName=methodName) - def assert_sock_timeout(self, sock, timeout): - self.assertEqual(self.serv.gettimeout(), timeout) - - blocking = (timeout != 0.0) - self.assertEqual(sock.getblocking(), blocking) - - if fcntl is not None: - # When a Python socket has a non-zero timeout, it's switched - # internally to a non-blocking mode. Later, sock.sendall(), - # sock.recv(), and other socket operations use a select() call and - # handle EWOULDBLOCK/EGAIN on all socket operations. That's how - # timeouts are enforced. - fd_blocking = (timeout is None) - - flag = fcntl.fcntl(sock, fcntl.F_GETFL, os.O_NONBLOCK) - self.assertEqual(not bool(flag & os.O_NONBLOCK), fd_blocking) - def testSetBlocking(self): - # Test setblocking() and settimeout() methods + # Testing whether set blocking works self.serv.setblocking(True) - self.assert_sock_timeout(self.serv, None) - + self.assertIsNone(self.serv.gettimeout()) self.serv.setblocking(False) - self.assert_sock_timeout(self.serv, 0.0) - - self.serv.settimeout(None) - self.assert_sock_timeout(self.serv, None) - - self.serv.settimeout(0) - self.assert_sock_timeout(self.serv, 0) - - self.serv.settimeout(10) - self.assert_sock_timeout(self.serv, 10) - - self.serv.settimeout(0) - self.assert_sock_timeout(self.serv, 0) + self.assertEqual(self.serv.gettimeout(), 0.0) + start = time.time() + try: + self.serv.accept() + except socket.error: + pass + end = time.time() + self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.") def _testSetBlocking(self): pass - @support.cpython_only + @test_support.cpython_only def testSetBlocking_overflow(self): # Issue 15989 import _testcapi if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX: self.skipTest('needs UINT_MAX < ULONG_MAX') - self.serv.setblocking(False) self.assertEqual(self.serv.gettimeout(), 0.0) - self.serv.setblocking(_testcapi.UINT_MAX + 1) self.assertIsNone(self.serv.gettimeout()) - _testSetBlocking_overflow = support.cpython_only(_testSetBlocking) - - @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), - 'test needs socket.SOCK_NONBLOCK') - @support.requires_linux_version(2, 6, 28) - def testInitNonBlocking(self): - # create a socket with SOCK_NONBLOCK - self.serv.close() - self.serv = socket.socket(socket.AF_INET, - socket.SOCK_STREAM | socket.SOCK_NONBLOCK) - self.assert_sock_timeout(self.serv, 0) - - def _testInitNonBlocking(self): - pass - - def testInheritFlagsBlocking(self): - # bpo-7995: accept() on a listening socket with a timeout and the - # default timeout is None, the resulting socket must be blocking. - with socket_setdefaulttimeout(None): - self.serv.settimeout(10) - conn, addr = self.serv.accept() - self.addCleanup(conn.close) - self.assertIsNone(conn.gettimeout()) - - def _testInheritFlagsBlocking(self): - self.cli.connect((HOST, self.port)) - - def testInheritFlagsTimeout(self): - # bpo-7995: accept() on a listening socket with a timeout and the - # default timeout is None, the resulting socket must inherit - # the default timeout. - default_timeout = 20.0 - with socket_setdefaulttimeout(default_timeout): - self.serv.settimeout(10) - conn, addr = self.serv.accept() - self.addCleanup(conn.close) - self.assertEqual(conn.gettimeout(), default_timeout) - - def _testInheritFlagsTimeout(self): - self.cli.connect((HOST, self.port)) + _testSetBlocking_overflow = test_support.cpython_only(_testSetBlocking) def testAccept(self): # Testing non-blocking accept - self.serv.setblocking(False) + self.serv.setblocking(0) # connect() didn't start: non-blocking accept() fails - start_time = time.monotonic() - with self.assertRaises(BlockingIOError): + with self.assertRaises(socket.error): conn, addr = self.serv.accept() - dt = time.monotonic() - start_time - self.assertLess(dt, 1.0) self.event.set() - read, write, err = select.select([self.serv], [], [], support.LONG_TIMEOUT) + read, write, err = select.select([self.serv], [], [], MAIN_TIMEOUT) if self.serv not in read: self.fail("Error trying to do accept after select.") @@ -4620,24 +1004,33 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest): def _testAccept(self): # don't connect before event is set to check - # that non-blocking accept() raises BlockingIOError + # that non-blocking accept() raises socket.error self.event.wait() self.cli.connect((HOST, self.port)) + def testConnect(self): + # Testing non-blocking connect + conn, addr = self.serv.accept() + conn.close() + + def _testConnect(self): + self.cli.settimeout(10) + self.cli.connect((HOST, self.port)) + def testRecv(self): # Testing non-blocking recv conn, addr = self.serv.accept() self.addCleanup(conn.close) - conn.setblocking(False) + conn.setblocking(0) # the server didn't send data yet: non-blocking recv() fails - with self.assertRaises(BlockingIOError): + with self.assertRaises(socket.error): msg = conn.recv(len(MSG)) self.event.set() - read, write, err = select.select([conn], [], [], support.LONG_TIMEOUT) + read, write, err = select.select([conn], [], [], MAIN_TIMEOUT) if conn not in read: self.fail("Error during select call to non-blocking socket.") @@ -4649,174 +1042,185 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest): self.cli.connect((HOST, self.port)) # don't send anything before event is set to check - # that non-blocking recv() raises BlockingIOError + # that non-blocking recv() raises socket.error self.event.wait() # send data: recv() will no longer block self.cli.sendall(MSG) - +@unittest.skipUnless(thread, 'Threading required for this test.') class FileObjectClassTestCase(SocketConnectedTest): - """Unit tests for the object returned by socket.makefile() - - self.read_file is the io object returned by makefile() on - the client connection. You can read from this file to - get output from the server. - - self.write_file is the io object returned by makefile() on the - server connection. You can write to this file to send output - to the client. - """ bufsize = -1 # Use default buffer size - encoding = 'utf-8' - errors = 'strict' - newline = None - - read_mode = 'rb' - read_msg = MSG - write_mode = 'wb' - write_msg = MSG def __init__(self, methodName='runTest'): SocketConnectedTest.__init__(self, methodName=methodName) def setUp(self): - self.evt1, self.evt2, self.serv_finished, self.cli_finished = [ - threading.Event() for i in range(4)] SocketConnectedTest.setUp(self) - self.read_file = self.cli_conn.makefile( - self.read_mode, self.bufsize, - encoding = self.encoding, - errors = self.errors, - newline = self.newline) + self.serv_file = self.cli_conn.makefile('rb', self.bufsize) def tearDown(self): - self.serv_finished.set() - self.read_file.close() - self.assertTrue(self.read_file.closed) - self.read_file = None + self.serv_file.close() + self.assertTrue(self.serv_file.closed) SocketConnectedTest.tearDown(self) + self.serv_file = None def clientSetUp(self): SocketConnectedTest.clientSetUp(self) - self.write_file = self.serv_conn.makefile( - self.write_mode, self.bufsize, - encoding = self.encoding, - errors = self.errors, - newline = self.newline) + self.cli_file = self.serv_conn.makefile('wb') def clientTearDown(self): - self.cli_finished.set() - self.write_file.close() - self.assertTrue(self.write_file.closed) - self.write_file = None + self.cli_file.close() + self.assertTrue(self.cli_file.closed) + self.cli_file = None SocketConnectedTest.clientTearDown(self) - def testReadAfterTimeout(self): - # Issue #7322: A file object must disallow further reads - # after a timeout has occurred. - self.cli_conn.settimeout(1) - self.read_file.read(3) - # First read raises a timeout - self.assertRaises(socket.timeout, self.read_file.read, 1) - # Second read is disallowed - with self.assertRaises(OSError) as ctx: - self.read_file.read(1) - self.assertIn("cannot read from timed out object", str(ctx.exception)) - - def _testReadAfterTimeout(self): - self.write_file.write(self.write_msg[0:3]) - self.write_file.flush() - self.serv_finished.wait() - def testSmallRead(self): # Performing small file read test - first_seg = self.read_file.read(len(self.read_msg)-3) - second_seg = self.read_file.read(3) + first_seg = self.serv_file.read(len(MSG)-3) + second_seg = self.serv_file.read(3) msg = first_seg + second_seg - self.assertEqual(msg, self.read_msg) + self.assertEqual(msg, MSG) def _testSmallRead(self): - self.write_file.write(self.write_msg) - self.write_file.flush() + self.cli_file.write(MSG) + self.cli_file.flush() def testFullRead(self): # read until EOF - msg = self.read_file.read() - self.assertEqual(msg, self.read_msg) + msg = self.serv_file.read() + self.assertEqual(msg, MSG) def _testFullRead(self): - self.write_file.write(self.write_msg) - self.write_file.close() + self.cli_file.write(MSG) + self.cli_file.close() def testUnbufferedRead(self): # Performing unbuffered file read test - buf = type(self.read_msg)() + buf = '' while 1: - char = self.read_file.read(1) + char = self.serv_file.read(1) if not char: break buf += char - self.assertEqual(buf, self.read_msg) + self.assertEqual(buf, MSG) def _testUnbufferedRead(self): - self.write_file.write(self.write_msg) - self.write_file.flush() + self.cli_file.write(MSG) + self.cli_file.flush() def testReadline(self): # Performing file readline test - line = self.read_file.readline() - self.assertEqual(line, self.read_msg) + line = self.serv_file.readline() + self.assertEqual(line, MSG) def _testReadline(self): - self.write_file.write(self.write_msg) - self.write_file.flush() - - def testCloseAfterMakefile(self): - # The file returned by makefile should keep the socket open. - self.cli_conn.close() - # read until EOF - msg = self.read_file.read() - self.assertEqual(msg, self.read_msg) - - def _testCloseAfterMakefile(self): - self.write_file.write(self.write_msg) - self.write_file.flush() - - def testMakefileAfterMakefileClose(self): - self.read_file.close() - msg = self.cli_conn.recv(len(MSG)) - if isinstance(self.read_msg, str): - msg = msg.decode() - self.assertEqual(msg, self.read_msg) - - def _testMakefileAfterMakefileClose(self): - self.write_file.write(self.write_msg) - self.write_file.flush() + self.cli_file.write(MSG) + self.cli_file.flush() + + def testReadlineAfterRead(self): + a_baloo_is = self.serv_file.read(len("A baloo is")) + self.assertEqual("A baloo is", a_baloo_is) + _a_bear = self.serv_file.read(len(" a bear")) + self.assertEqual(" a bear", _a_bear) + line = self.serv_file.readline() + self.assertEqual("\n", line) + line = self.serv_file.readline() + self.assertEqual("A BALOO IS A BEAR.\n", line) + line = self.serv_file.readline() + self.assertEqual(MSG, line) + + def _testReadlineAfterRead(self): + self.cli_file.write("A baloo is a bear\n") + self.cli_file.write("A BALOO IS A BEAR.\n") + self.cli_file.write(MSG) + self.cli_file.flush() + + def testReadlineAfterReadNoNewline(self): + end_of_ = self.serv_file.read(len("End Of ")) + self.assertEqual("End Of ", end_of_) + line = self.serv_file.readline() + self.assertEqual("Line", line) + + def _testReadlineAfterReadNoNewline(self): + self.cli_file.write("End Of Line") def testClosedAttr(self): - self.assertTrue(not self.read_file.closed) + self.assertTrue(not self.serv_file.closed) def _testClosedAttr(self): - self.assertTrue(not self.write_file.closed) - - def testAttributes(self): - self.assertEqual(self.read_file.mode, self.read_mode) - self.assertEqual(self.read_file.name, self.cli_conn.fileno()) - - def _testAttributes(self): - self.assertEqual(self.write_file.mode, self.write_mode) - self.assertEqual(self.write_file.name, self.serv_conn.fileno()) - - def testRealClose(self): - self.read_file.close() - self.assertRaises(ValueError, self.read_file.fileno) - self.cli_conn.close() - self.assertRaises(OSError, self.cli_conn.getsockname) - - def _testRealClose(self): - pass + self.assertTrue(not self.cli_file.closed) + + +class FileObjectInterruptedTestCase(unittest.TestCase): + """Test that the file object correctly handles EINTR internally.""" + + class MockSocket(object): + def __init__(self, recv_funcs=()): + # A generator that returns callables that we'll call for each + # call to recv(). + self._recv_step = iter(recv_funcs) + + def recv(self, size): + return self._recv_step.next()() + + @staticmethod + def _raise_eintr(): + raise socket.error(errno.EINTR) + + def _test_readline(self, size=-1, **kwargs): + mock_sock = self.MockSocket(recv_funcs=[ + lambda : "This is the first line\nAnd the sec", + self._raise_eintr, + lambda : "ond line is here\n", + lambda : "", + ]) + fo = socket._fileobject(mock_sock, **kwargs) + self.assertEqual(fo.readline(size), "This is the first line\n") + self.assertEqual(fo.readline(size), "And the second line is here\n") + + def _test_read(self, size=-1, **kwargs): + mock_sock = self.MockSocket(recv_funcs=[ + lambda : "This is the first line\nAnd the sec", + self._raise_eintr, + lambda : "ond line is here\n", + lambda : "", + ]) + fo = socket._fileobject(mock_sock, **kwargs) + self.assertEqual(fo.read(size), "This is the first line\n" + "And the second line is here\n") + + def test_default(self): + self._test_readline() + self._test_readline(size=100) + self._test_read() + self._test_read(size=100) + + def test_with_1k_buffer(self): + self._test_readline(bufsize=1024) + self._test_readline(size=100, bufsize=1024) + self._test_read(bufsize=1024) + self._test_read(size=100, bufsize=1024) + + def _test_readline_no_buffer(self, size=-1): + mock_sock = self.MockSocket(recv_funcs=[ + lambda : "aa", + lambda : "\n", + lambda : "BB", + self._raise_eintr, + lambda : "bb", + lambda : "", + ]) + fo = socket._fileobject(mock_sock, bufsize=0) + self.assertEqual(fo.readline(size), "aa\n") + self.assertEqual(fo.readline(size), "BBbb") + + def test_no_buffer(self): + self._test_readline_no_buffer() + self._test_readline_no_buffer(size=4) + self._test_read(bufsize=0) + self._test_read(size=100, bufsize=0) class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): @@ -4826,150 +1230,94 @@ class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): In this case (and in this case only), it should be possible to create a file object, read a line from it, create another file object, read another line from it, without loss of data in the - first file object's buffer. Note that http.client relies on this + first file object's buffer. Note that httplib relies on this when reading multiple requests from the same socket.""" bufsize = 0 # Use unbuffered mode def testUnbufferedReadline(self): # Read a line, create a new file object, read another line with it - line = self.read_file.readline() # first line - self.assertEqual(line, b"A. " + self.write_msg) # first line - self.read_file = self.cli_conn.makefile('rb', 0) - line = self.read_file.readline() # second line - self.assertEqual(line, b"B. " + self.write_msg) # second line + line = self.serv_file.readline() # first line + self.assertEqual(line, "A. " + MSG) # first line + self.serv_file = self.cli_conn.makefile('rb', 0) + line = self.serv_file.readline() # second line + self.assertEqual(line, "B. " + MSG) # second line def _testUnbufferedReadline(self): - self.write_file.write(b"A. " + self.write_msg) - self.write_file.write(b"B. " + self.write_msg) - self.write_file.flush() - - def testMakefileClose(self): - # The file returned by makefile should keep the socket open... - self.cli_conn.close() - msg = self.cli_conn.recv(1024) - self.assertEqual(msg, self.read_msg) - # ...until the file is itself closed - self.read_file.close() - self.assertRaises(OSError, self.cli_conn.recv, 1024) - - def _testMakefileClose(self): - self.write_file.write(self.write_msg) - self.write_file.flush() - - def testMakefileCloseSocketDestroy(self): - refcount_before = sys.getrefcount(self.cli_conn) - self.read_file.close() - refcount_after = sys.getrefcount(self.cli_conn) - self.assertEqual(refcount_before - 1, refcount_after) - - def _testMakefileCloseSocketDestroy(self): - pass - - # Non-blocking ops - # NOTE: to set `read_file` as non-blocking, we must call - # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp). - - def testSmallReadNonBlocking(self): - self.cli_conn.setblocking(False) - self.assertEqual(self.read_file.readinto(bytearray(10)), None) - self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None) - self.evt1.set() - self.evt2.wait(1.0) - first_seg = self.read_file.read(len(self.read_msg) - 3) - if first_seg is None: - # Data not arrived (can happen under Windows), wait a bit - time.sleep(0.5) - first_seg = self.read_file.read(len(self.read_msg) - 3) - buf = bytearray(10) - n = self.read_file.readinto(buf) - self.assertEqual(n, 3) - msg = first_seg + buf[:n] - self.assertEqual(msg, self.read_msg) - self.assertEqual(self.read_file.readinto(bytearray(16)), None) - self.assertEqual(self.read_file.read(1), None) - - def _testSmallReadNonBlocking(self): - self.evt1.wait(1.0) - self.write_file.write(self.write_msg) - self.write_file.flush() - self.evt2.set() - # Avoid closing the socket before the server test has finished, - # otherwise system recv() will return 0 instead of EWOULDBLOCK. - self.serv_finished.wait(5.0) - - def testWriteNonBlocking(self): - self.cli_finished.wait(5.0) - # The client thread can't skip directly - the SkipTest exception - # would appear as a failure. - if self.serv_skipped: - self.skipTest(self.serv_skipped) - - def _testWriteNonBlocking(self): - self.serv_skipped = None - self.serv_conn.setblocking(False) - # Try to saturate the socket buffer pipe with repeated large writes. - BIG = b"x" * support.SOCK_MAX_SIZE - LIMIT = 10 - # The first write() succeeds since a chunk of data can be buffered - n = self.write_file.write(BIG) - self.assertGreater(n, 0) - for i in range(LIMIT): - n = self.write_file.write(BIG) - if n is None: - # Succeeded - break - self.assertGreater(n, 0) - else: - # Let us know that this test didn't manage to establish - # the expected conditions. This is not a failure in itself but, - # if it happens repeatedly, the test should be fixed. - self.serv_skipped = "failed to saturate the socket buffer" - + self.cli_file.write("A. " + MSG) + self.cli_file.write("B. " + MSG) + self.cli_file.flush() class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): bufsize = 1 # Default-buffered for reading; line-buffered for writing + class SocketMemo(object): + """A wrapper to keep track of sent data, needed to examine write behaviour""" + def __init__(self, sock): + self._sock = sock + self.sent = [] -class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): - - bufsize = 2 # Exercise the buffering code - - -class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase): - """Tests for socket.makefile() in text mode (rather than binary)""" + def send(self, data, flags=0): + n = self._sock.send(data, flags) + self.sent.append(data[:n]) + return n - read_mode = 'r' - read_msg = MSG.decode('utf-8') - write_mode = 'wb' - write_msg = MSG - newline = '' + def sendall(self, data, flags=0): + self._sock.sendall(data, flags) + self.sent.append(data) + def __getattr__(self, attr): + return getattr(self._sock, attr) -class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase): - """Tests for socket.makefile() in text mode (rather than binary)""" + def getsent(self): + return [e.tobytes() if isinstance(e, memoryview) else e for e in self.sent] - read_mode = 'rb' - read_msg = MSG - write_mode = 'w' - write_msg = MSG.decode('utf-8') - newline = '' + def setUp(self): + FileObjectClassTestCase.setUp(self) + self.serv_file._sock = self.SocketMemo(self.serv_file._sock) + + def testLinebufferedWrite(self): + # Write two lines, in small chunks + msg = MSG.strip() + print >> self.serv_file, msg, + print >> self.serv_file, msg + + # second line: + print >> self.serv_file, msg, + print >> self.serv_file, msg, + print >> self.serv_file, msg + + # third line + print >> self.serv_file, '' + + self.serv_file.flush() + + msg1 = "%s %s\n"%(msg, msg) + msg2 = "%s %s %s\n"%(msg, msg, msg) + msg3 = "\n" + self.assertEqual(self.serv_file._sock.getsent(), [msg1, msg2, msg3]) + + def _testLinebufferedWrite(self): + msg = MSG.strip() + msg1 = "%s %s\n"%(msg, msg) + msg2 = "%s %s %s\n"%(msg, msg, msg) + msg3 = "\n" + l1 = self.cli_file.readline() + self.assertEqual(l1, msg1) + l2 = self.cli_file.readline() + self.assertEqual(l2, msg2) + l3 = self.cli_file.readline() + self.assertEqual(l3, msg3) -class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase): - """Tests for socket.makefile() in text mode (rather than binary)""" +class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): - read_mode = 'r' - read_msg = MSG.decode('utf-8') - write_mode = 'w' - write_msg = MSG.decode('utf-8') - newline = '' + bufsize = 2 # Exercise the buffering code class NetworkConnectionTest(object): """Prove network connection.""" - def clientSetUp(self): # We're inherited below by BasicTCPTest2, which also inherits # BasicTCPTest, which defines self.port referenced below. @@ -4981,7 +1329,6 @@ class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest): """ class NetworkConnectionNoServer(unittest.TestCase): - class MockSocket(socket.socket): def connect(self, *args): raise socket.timeout('timed out') @@ -4997,18 +1344,18 @@ class NetworkConnectionNoServer(unittest.TestCase): socket.socket = old_socket def test_connect(self): - port = support.find_unused_port() + port = test_support.find_unused_port() cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.addCleanup(cli.close) - with self.assertRaises(OSError) as cm: + with self.assertRaises(socket.error) as cm: cli.connect((HOST, port)) self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) def test_create_connection(self): # Issue #9792: errors raised by create_connection() should have # a proper errno attribute. - port = support.find_unused_port() - with self.assertRaises(OSError) as cm: + port = test_support.find_unused_port() + with self.assertRaises(socket.error) as cm: socket.create_connection((HOST, port)) # Issue #16257: create_connection() calls getaddrinfo() against @@ -5025,24 +1372,25 @@ class NetworkConnectionNoServer(unittest.TestCase): # On Solaris, ENETUNREACH is returned in this circumstance instead # of ECONNREFUSED. So, if that errno exists, add it to our list of # expected errnos. - expected_errnos = support.get_socket_conn_refused_errs() + expected_errnos = [ errno.ECONNREFUSED, ] + if hasattr(errno, 'ENETUNREACH'): + expected_errnos.append(errno.ENETUNREACH) + if hasattr(errno, 'EADDRNOTAVAIL'): + # bpo-31910: socket.create_connection() fails randomly + # with EADDRNOTAVAIL on Travis CI + expected_errnos.append(errno.EADDRNOTAVAIL) + self.assertIn(cm.exception.errno, expected_errnos) def test_create_connection_timeout(self): # Issue #9792: create_connection() should not recast timeout errors # as generic socket errors. with self.mocked_socket_module(): - try: + with self.assertRaises(socket.timeout): socket.create_connection((HOST, 1234)) - except socket.timeout: - pass - except OSError as exc: - if support.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT: - raise - else: - self.fail('socket.timeout not raised') +@unittest.skipUnless(thread, 'Threading required for this test.') class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): def __init__(self, methodName='runTest'): @@ -5050,7 +1398,7 @@ class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): ThreadableTest.__init__(self) def clientSetUp(self): - self.source_port = support.find_unused_port() + self.source_port = test_support.find_unused_port() def clientTearDown(self): self.cli.close() @@ -5111,7 +1459,7 @@ class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): self.addCleanup(self.cli.close) self.assertEqual(self.cli.gettimeout(), 30) - +@unittest.skipUnless(thread, 'Threading required for this test.') class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): def __init__(self, methodName='runTest'): @@ -5130,19 +1478,43 @@ class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): conn, addr = self.serv.accept() self.addCleanup(conn.close) time.sleep(3) - conn.send(b"done!") + conn.send("done!") testOutsideTimeout = testInsideTimeout def _testInsideTimeout(self): self.cli = sock = socket.create_connection((HOST, self.port)) data = sock.recv(5) - self.assertEqual(data, b"done!") + self.assertEqual(data, "done!") def _testOutsideTimeout(self): self.cli = sock = socket.create_connection((HOST, self.port), timeout=1) self.assertRaises(socket.timeout, lambda: sock.recv(5)) +class Urllib2FileobjectTest(unittest.TestCase): + + # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that + # it close the socket if the close c'tor argument is true + + def testClose(self): + class MockSocket: + closed = False + def flush(self): pass + def close(self): self.closed = True + + # must not close unless we request it: the original use of _fileobject + # by module socket requires that the underlying socket not be closed until + # the _socketobject that created the _fileobject is closed + s = MockSocket() + f = socket._fileobject(s) + f.close() + self.assertTrue(not s.closed) + + s = MockSocket() + f = socket._fileobject(s, close=True) + f.close() + self.assertTrue(s.closed) + class TCPTimeoutTest(SocketTCPTest): def testTCPTimeout(self): @@ -5159,7 +1531,7 @@ class TCPTimeoutTest(SocketTCPTest): foo = self.serv.accept() except socket.timeout: self.fail("caught timeout instead of error (TCP)") - except OSError: + except socket.error: ok = True except: self.fail("caught unexpected exception (TCP)") @@ -5170,7 +1542,7 @@ class TCPTimeoutTest(SocketTCPTest): 'test needs signal.alarm()') def testInterruptedTimeout(self): # XXX I don't know how to do this test on MSWindows or any other - # platform that doesn't support signal.alarm() or os.kill(), though + # plaform that doesn't support signal.alarm() or os.kill(), though # the bug should have existed on all platforms. self.serv.settimeout(5.0) # must be longer than alarm class Alarm(Exception): @@ -5216,58 +1588,20 @@ class UDPTimeoutTest(SocketUDPTest): foo = self.serv.recv(1024) except socket.timeout: self.fail("caught timeout instead of error (UDP)") - except OSError: + except socket.error: ok = True except: self.fail("caught unexpected exception (UDP)") if not ok: self.fail("recv() returned success when we did not expect it") -@unittest.skipUnless(HAVE_SOCKET_UDPLITE, - 'UDPLITE sockets required for this test.') -class UDPLITETimeoutTest(SocketUDPLITETest): - - def testUDPLITETimeout(self): - def raise_timeout(*args, **kwargs): - self.serv.settimeout(1.0) - self.serv.recv(1024) - self.assertRaises(socket.timeout, raise_timeout, - "Error generating a timeout exception (UDPLITE)") - - def testTimeoutZero(self): - ok = False - try: - self.serv.settimeout(0.0) - foo = self.serv.recv(1024) - except socket.timeout: - self.fail("caught timeout instead of error (UDPLITE)") - except OSError: - ok = True - except: - self.fail("caught unexpected exception (UDPLITE)") - if not ok: - self.fail("recv() returned success when we did not expect it") - class TestExceptions(unittest.TestCase): def testExceptionTree(self): - self.assertTrue(issubclass(OSError, Exception)) - self.assertTrue(issubclass(socket.herror, OSError)) - self.assertTrue(issubclass(socket.gaierror, OSError)) - self.assertTrue(issubclass(socket.timeout, OSError)) - - def test_setblocking_invalidfd(self): - # Regression test for issue #28471 - - sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - sock = socket.socket( - socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno()) - sock0.close() - self.addCleanup(sock.detach) - - with self.assertRaises(OSError): - sock.setblocking(False) - + self.assertTrue(issubclass(socket.error, Exception)) + self.assertTrue(issubclass(socket.herror, socket.error)) + self.assertTrue(issubclass(socket.gaierror, socket.error)) + self.assertTrue(issubclass(socket.timeout, socket.error)) @unittest.skipUnless(sys.platform == 'linux', 'Linux specific test') class TestLinuxAbstractNamespace(unittest.TestCase): @@ -5275,112 +1609,29 @@ class TestLinuxAbstractNamespace(unittest.TestCase): UNIX_PATH_MAX = 108 def testLinuxAbstractNamespace(self): - address = b"\x00python-test-hello\x00\xff" - with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1: - s1.bind(address) - s1.listen() - with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2: - s2.connect(s1.getsockname()) - with s1.accept()[0] as s3: - self.assertEqual(s1.getsockname(), address) - self.assertEqual(s2.getpeername(), address) + address = "\x00python-test-hello\x00\xff" + s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s1.bind(address) + s1.listen(1) + s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s2.connect(s1.getsockname()) + s1.accept() + self.assertEqual(s1.getsockname(), address) + self.assertEqual(s2.getpeername(), address) def testMaxName(self): - address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1) - with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: - s.bind(address) - self.assertEqual(s.getsockname(), address) + address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1) + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.bind(address) + self.assertEqual(s.getsockname(), address) def testNameOverflow(self): address = "\x00" + "h" * self.UNIX_PATH_MAX - with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: - self.assertRaises(OSError, s.bind, address) - - def testStrName(self): - # Check that an abstract name can be passed as a string. s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - try: - s.bind("\x00python\x00test\x00") - self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") - finally: - s.close() - - def testBytearrayName(self): - # Check that an abstract name can be passed as a bytearray. - with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: - s.bind(bytearray(b"\x00python\x00test\x00")) - self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") - -@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX') -class TestUnixDomain(unittest.TestCase): - - def setUp(self): - self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - - def tearDown(self): - self.sock.close() - - def encoded(self, path): - # Return the given path encoded in the file system encoding, - # or skip the test if this is not possible. - try: - return os.fsencode(path) - except UnicodeEncodeError: - self.skipTest( - "Pathname {0!a} cannot be represented in file " - "system encoding {1!r}".format( - path, sys.getfilesystemencoding())) - - def bind(self, sock, path): - # Bind the socket - try: - support.bind_unix_socket(sock, path) - except OSError as e: - if str(e) == "AF_UNIX path too long": - self.skipTest( - "Pathname {0!a} is too long to serve as an AF_UNIX path" - .format(path)) - else: - raise - - def testUnbound(self): - # Issue #30205 (note getsockname() can return None on OS X) - self.assertIn(self.sock.getsockname(), ('', None)) - - def testStrAddr(self): - # Test binding to and retrieving a normal string pathname. - path = os.path.abspath(support.TESTFN) - self.bind(self.sock, path) - self.addCleanup(support.unlink, path) - self.assertEqual(self.sock.getsockname(), path) - - def testBytesAddr(self): - # Test binding to a bytes pathname. - path = os.path.abspath(support.TESTFN) - self.bind(self.sock, self.encoded(path)) - self.addCleanup(support.unlink, path) - self.assertEqual(self.sock.getsockname(), path) - - def testSurrogateescapeBind(self): - # Test binding to a valid non-ASCII pathname, with the - # non-ASCII bytes supplied using surrogateescape encoding. - path = os.path.abspath(support.TESTFN_UNICODE) - b = self.encoded(path) - self.bind(self.sock, b.decode("ascii", "surrogateescape")) - self.addCleanup(support.unlink, path) - self.assertEqual(self.sock.getsockname(), path) - - def testUnencodableAddr(self): - # Test binding to a pathname that cannot be encoded in the - # file system encoding. - if support.TESTFN_UNENCODABLE is None: - self.skipTest("No unencodable filename available") - path = os.path.abspath(support.TESTFN_UNENCODABLE) - self.bind(self.sock, path) - self.addCleanup(support.unlink, path) - self.assertEqual(self.sock.getsockname(), path) + self.assertRaises(socket.error, s.bind, address) +@unittest.skipUnless(thread, 'Threading required for this test.') class BufferIOTest(SocketConnectedTest): """ Test the buffer versions of socket.recv() and socket.send(). @@ -5389,15 +1640,15 @@ class BufferIOTest(SocketConnectedTest): SocketConnectedTest.__init__(self, methodName=methodName) def testRecvIntoArray(self): - buf = array.array("B", [0] * len(MSG)) + buf = array.array('c', ' '*1024) nbytes = self.cli_conn.recv_into(buf) self.assertEqual(nbytes, len(MSG)) - buf = buf.tobytes() - msg = buf[:len(MSG)] + msg = buf.tostring()[:len(MSG)] self.assertEqual(msg, MSG) def _testRecvIntoArray(self): - buf = bytes(MSG) + with test_support.check_py3k_warnings(): + buf = buffer(MSG) self.serv_conn.send(buf) def testRecvIntoBytearray(self): @@ -5419,15 +1670,15 @@ class BufferIOTest(SocketConnectedTest): _testRecvIntoMemoryview = _testRecvIntoArray def testRecvFromIntoArray(self): - buf = array.array("B", [0] * len(MSG)) + buf = array.array('c', ' '*1024) nbytes, addr = self.cli_conn.recvfrom_into(buf) self.assertEqual(nbytes, len(MSG)) - buf = buf.tobytes() - msg = buf[:len(MSG)] + msg = buf.tostring()[:len(MSG)] self.assertEqual(msg, MSG) def _testRecvFromIntoArray(self): - buf = bytes(MSG) + with test_support.check_py3k_warnings(): + buf = buffer(MSG) self.serv_conn.send(buf) def testRecvFromIntoBytearray(self): @@ -5454,7 +1705,9 @@ class BufferIOTest(SocketConnectedTest): self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024) def _testRecvFromIntoSmallBuffer(self): - self.serv_conn.send(MSG) + with test_support.check_py3k_warnings(): + buf = buffer(MSG) + self.serv_conn.send(buf) def testRecvFromIntoEmptyBuffer(self): buf = bytearray() @@ -5478,10 +1731,14 @@ def isTipcAvailable(): return False try: f = open("/proc/modules") - except (FileNotFoundError, IsADirectoryError, PermissionError): + except IOError as e: # It's ok if the file does not exist, is a directory or if we - # have not the permission to read it. - return False + # have not the permission to read it. In any other case it's a + # real error, so raise it again. + if e.errno in (errno.ENOENT, errno.EISDIR, errno.EACCES): + return False + else: + raise with f: for line in f: if line.startswith("tipc "): @@ -5494,8 +1751,6 @@ class TIPCTest(unittest.TestCase): def testRDM(self): srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) - self.addCleanup(srv.close) - self.addCleanup(cli.close) srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, @@ -5503,7 +1758,7 @@ class TIPCTest(unittest.TestCase): srv.bind(srvaddr) sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, - TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) + TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0) cli.sendto(MSG, sendaddr) msg, recvaddr = srv.recvfrom(1024) @@ -5521,15 +1776,13 @@ class TIPCThreadableTest(unittest.TestCase, ThreadableTest): def setUp(self): self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) - self.addCleanup(self.srv.close) self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, TIPC_LOWER, TIPC_UPPER) self.srv.bind(srvaddr) - self.srv.listen() + self.srv.listen(5) self.serverExplicitReady() self.conn, self.connaddr = self.srv.accept() - self.addCleanup(self.conn.close) def clientSetUp(self): # There is a hittable race between serverExplicitReady() and the @@ -5537,9 +1790,8 @@ class TIPCThreadableTest(unittest.TestCase, ThreadableTest): # we could get an exception time.sleep(0.1) self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) - self.addCleanup(self.cli.close) addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, - TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) + TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0) self.cli.connect(addr) self.cliaddr = self.cli.getsockname() @@ -5553,1008 +1805,30 @@ class TIPCThreadableTest(unittest.TestCase, ThreadableTest): self.cli.close() -class ContextManagersTest(ThreadedTCPSocketTest): - - def _testSocketClass(self): - # base test - with socket.socket() as sock: - self.assertFalse(sock._closed) - self.assertTrue(sock._closed) - # close inside with block - with socket.socket() as sock: - sock.close() - self.assertTrue(sock._closed) - # exception inside with block - with socket.socket() as sock: - self.assertRaises(OSError, sock.sendall, b'foo') - self.assertTrue(sock._closed) - - def testCreateConnectionBase(self): - conn, addr = self.serv.accept() - self.addCleanup(conn.close) - data = conn.recv(1024) - conn.sendall(data) - - def _testCreateConnectionBase(self): - address = self.serv.getsockname() - with socket.create_connection(address) as sock: - self.assertFalse(sock._closed) - sock.sendall(b'foo') - self.assertEqual(sock.recv(1024), b'foo') - self.assertTrue(sock._closed) - - def testCreateConnectionClose(self): - conn, addr = self.serv.accept() - self.addCleanup(conn.close) - data = conn.recv(1024) - conn.sendall(data) - - def _testCreateConnectionClose(self): - address = self.serv.getsockname() - with socket.create_connection(address) as sock: - sock.close() - self.assertTrue(sock._closed) - self.assertRaises(OSError, sock.sendall, b'foo') - - -class InheritanceTest(unittest.TestCase): - @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"), - "SOCK_CLOEXEC not defined") - @support.requires_linux_version(2, 6, 28) - def test_SOCK_CLOEXEC(self): - with socket.socket(socket.AF_INET, - socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s: - self.assertEqual(s.type, socket.SOCK_STREAM) - self.assertFalse(s.get_inheritable()) - - def test_default_inheritable(self): - sock = socket.socket() - with sock: - self.assertEqual(sock.get_inheritable(), False) - - def test_dup(self): - sock = socket.socket() - with sock: - newsock = sock.dup() - sock.close() - with newsock: - self.assertEqual(newsock.get_inheritable(), False) - - def test_set_inheritable(self): - sock = socket.socket() - with sock: - sock.set_inheritable(True) - self.assertEqual(sock.get_inheritable(), True) - - sock.set_inheritable(False) - self.assertEqual(sock.get_inheritable(), False) - - @unittest.skipIf(fcntl is None, "need fcntl") - def test_get_inheritable_cloexec(self): - sock = socket.socket() - with sock: - fd = sock.fileno() - self.assertEqual(sock.get_inheritable(), False) - - # clear FD_CLOEXEC flag - flags = fcntl.fcntl(fd, fcntl.F_GETFD) - flags &= ~fcntl.FD_CLOEXEC - fcntl.fcntl(fd, fcntl.F_SETFD, flags) - - self.assertEqual(sock.get_inheritable(), True) - - @unittest.skipIf(fcntl is None, "need fcntl") - def test_set_inheritable_cloexec(self): - sock = socket.socket() - with sock: - fd = sock.fileno() - self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, - fcntl.FD_CLOEXEC) - - sock.set_inheritable(True) - self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, - 0) - - - def test_socketpair(self): - s1, s2 = socket.socketpair() - self.addCleanup(s1.close) - self.addCleanup(s2.close) - self.assertEqual(s1.get_inheritable(), False) - self.assertEqual(s2.get_inheritable(), False) - - -@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"), - "SOCK_NONBLOCK not defined") -class NonblockConstantTest(unittest.TestCase): - def checkNonblock(self, s, nonblock=True, timeout=0.0): - if nonblock: - self.assertEqual(s.type, socket.SOCK_STREAM) - self.assertEqual(s.gettimeout(), timeout) - self.assertTrue( - fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK) - if timeout == 0: - # timeout == 0: means that getblocking() must be False. - self.assertFalse(s.getblocking()) - else: - # If timeout > 0, the socket will be in a "blocking" mode - # from the standpoint of the Python API. For Python socket - # object, "blocking" means that operations like 'sock.recv()' - # will block. Internally, file descriptors for - # "blocking" Python sockets *with timeouts* are in a - # *non-blocking* mode, and 'sock.recv()' uses 'select()' - # and handles EWOULDBLOCK/EAGAIN to enforce the timeout. - self.assertTrue(s.getblocking()) - else: - self.assertEqual(s.type, socket.SOCK_STREAM) - self.assertEqual(s.gettimeout(), None) - self.assertFalse( - fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK) - self.assertTrue(s.getblocking()) - - @support.requires_linux_version(2, 6, 28) - def test_SOCK_NONBLOCK(self): - # a lot of it seems silly and redundant, but I wanted to test that - # changing back and forth worked ok - with socket.socket(socket.AF_INET, - socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s: - self.checkNonblock(s) - s.setblocking(True) - self.checkNonblock(s, nonblock=False) - s.setblocking(False) - self.checkNonblock(s) - s.settimeout(None) - self.checkNonblock(s, nonblock=False) - s.settimeout(2.0) - self.checkNonblock(s, timeout=2.0) - s.setblocking(True) - self.checkNonblock(s, nonblock=False) - # defaulttimeout - t = socket.getdefaulttimeout() - socket.setdefaulttimeout(0.0) - with socket.socket() as s: - self.checkNonblock(s) - socket.setdefaulttimeout(None) - with socket.socket() as s: - self.checkNonblock(s, False) - socket.setdefaulttimeout(2.0) - with socket.socket() as s: - self.checkNonblock(s, timeout=2.0) - socket.setdefaulttimeout(None) - with socket.socket() as s: - self.checkNonblock(s, False) - socket.setdefaulttimeout(t) - - -@unittest.skipUnless(os.name == "nt", "Windows specific") -@unittest.skipUnless(multiprocessing, "need multiprocessing") -class TestSocketSharing(SocketTCPTest): - # This must be classmethod and not staticmethod or multiprocessing - # won't be able to bootstrap it. - @classmethod - def remoteProcessServer(cls, q): - # Recreate socket from shared data - sdata = q.get() - message = q.get() - - s = socket.fromshare(sdata) - s2, c = s.accept() - - # Send the message - s2.sendall(message) - s2.close() - s.close() - - def testShare(self): - # Transfer the listening server socket to another process - # and service it from there. - - # Create process: - q = multiprocessing.Queue() - p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,)) - p.start() - - # Get the shared socket data - data = self.serv.share(p.pid) - - # Pass the shared socket to the other process - addr = self.serv.getsockname() - self.serv.close() - q.put(data) - - # The data that the server will send us - message = b"slapmahfro" - q.put(message) - - # Connect - s = socket.create_connection(addr) - # listen for the data - m = [] - while True: - data = s.recv(100) - if not data: - break - m.append(data) - s.close() - received = b"".join(m) - self.assertEqual(received, message) - p.join() - - def testShareLength(self): - data = self.serv.share(os.getpid()) - self.assertRaises(ValueError, socket.fromshare, data[:-1]) - self.assertRaises(ValueError, socket.fromshare, data+b"foo") - - def compareSockets(self, org, other): - # socket sharing is expected to work only for blocking socket - # since the internal python timeout value isn't transferred. - self.assertEqual(org.gettimeout(), None) - self.assertEqual(org.gettimeout(), other.gettimeout()) - - self.assertEqual(org.family, other.family) - self.assertEqual(org.type, other.type) - # If the user specified "0" for proto, then - # internally windows will have picked the correct value. - # Python introspection on the socket however will still return - # 0. For the shared socket, the python value is recreated - # from the actual value, so it may not compare correctly. - if org.proto != 0: - self.assertEqual(org.proto, other.proto) - - def testShareLocal(self): - data = self.serv.share(os.getpid()) - s = socket.fromshare(data) - try: - self.compareSockets(self.serv, s) - finally: - s.close() - - def testTypes(self): - families = [socket.AF_INET, socket.AF_INET6] - types = [socket.SOCK_STREAM, socket.SOCK_DGRAM] - for f in families: - for t in types: - try: - source = socket.socket(f, t) - except OSError: - continue # This combination is not supported - try: - data = source.share(os.getpid()) - shared = socket.fromshare(data) - try: - self.compareSockets(source, shared) - finally: - shared.close() - finally: - source.close() - - -class SendfileUsingSendTest(ThreadedTCPSocketTest): - """ - Test the send() implementation of socket.sendfile(). - """ - - FILESIZE = (10 * 1024 * 1024) # 10 MiB - BUFSIZE = 8192 - FILEDATA = b"" - TIMEOUT = support.LOOPBACK_TIMEOUT - - @classmethod - def setUpClass(cls): - def chunks(total, step): - assert total >= step - while total > step: - yield step - total -= step - if total: - yield total - - chunk = b"".join([random.choice(string.ascii_letters).encode() - for i in range(cls.BUFSIZE)]) - with open(support.TESTFN, 'wb') as f: - for csize in chunks(cls.FILESIZE, cls.BUFSIZE): - f.write(chunk) - with open(support.TESTFN, 'rb') as f: - cls.FILEDATA = f.read() - assert len(cls.FILEDATA) == cls.FILESIZE - - @classmethod - def tearDownClass(cls): - support.unlink(support.TESTFN) - - def accept_conn(self): - self.serv.settimeout(support.LONG_TIMEOUT) - conn, addr = self.serv.accept() - conn.settimeout(self.TIMEOUT) - self.addCleanup(conn.close) - return conn - - def recv_data(self, conn): - received = [] - while True: - chunk = conn.recv(self.BUFSIZE) - if not chunk: - break - received.append(chunk) - return b''.join(received) - - def meth_from_sock(self, sock): - # Depending on the mixin class being run return either send() - # or sendfile() method implementation. - return getattr(sock, "_sendfile_use_send") - - # regular file - - def _testRegularFile(self): - address = self.serv.getsockname() - file = open(support.TESTFN, 'rb') - with socket.create_connection(address) as sock, file as file: - meth = self.meth_from_sock(sock) - sent = meth(file) - self.assertEqual(sent, self.FILESIZE) - self.assertEqual(file.tell(), self.FILESIZE) - - def testRegularFile(self): - conn = self.accept_conn() - data = self.recv_data(conn) - self.assertEqual(len(data), self.FILESIZE) - self.assertEqual(data, self.FILEDATA) - - # non regular file - - def _testNonRegularFile(self): - address = self.serv.getsockname() - file = io.BytesIO(self.FILEDATA) - with socket.create_connection(address) as sock, file as file: - sent = sock.sendfile(file) - self.assertEqual(sent, self.FILESIZE) - self.assertEqual(file.tell(), self.FILESIZE) - self.assertRaises(socket._GiveupOnSendfile, - sock._sendfile_use_sendfile, file) - - def testNonRegularFile(self): - conn = self.accept_conn() - data = self.recv_data(conn) - self.assertEqual(len(data), self.FILESIZE) - self.assertEqual(data, self.FILEDATA) - - # empty file - - def _testEmptyFileSend(self): - address = self.serv.getsockname() - filename = support.TESTFN + "2" - with open(filename, 'wb'): - self.addCleanup(support.unlink, filename) - file = open(filename, 'rb') - with socket.create_connection(address) as sock, file as file: - meth = self.meth_from_sock(sock) - sent = meth(file) - self.assertEqual(sent, 0) - self.assertEqual(file.tell(), 0) - - def testEmptyFileSend(self): - conn = self.accept_conn() - data = self.recv_data(conn) - self.assertEqual(data, b"") - - # offset - - def _testOffset(self): - address = self.serv.getsockname() - file = open(support.TESTFN, 'rb') - with socket.create_connection(address) as sock, file as file: - meth = self.meth_from_sock(sock) - sent = meth(file, offset=5000) - self.assertEqual(sent, self.FILESIZE - 5000) - self.assertEqual(file.tell(), self.FILESIZE) - - def testOffset(self): - conn = self.accept_conn() - data = self.recv_data(conn) - self.assertEqual(len(data), self.FILESIZE - 5000) - self.assertEqual(data, self.FILEDATA[5000:]) - - # count - - def _testCount(self): - address = self.serv.getsockname() - file = open(support.TESTFN, 'rb') - with socket.create_connection(address, timeout=2) as sock, file as file: - count = 5000007 - meth = self.meth_from_sock(sock) - sent = meth(file, count=count) - self.assertEqual(sent, count) - self.assertEqual(file.tell(), count) - - def testCount(self): - count = 5000007 - conn = self.accept_conn() - data = self.recv_data(conn) - self.assertEqual(len(data), count) - self.assertEqual(data, self.FILEDATA[:count]) - - # count small - - def _testCountSmall(self): - address = self.serv.getsockname() - file = open(support.TESTFN, 'rb') - with socket.create_connection(address, timeout=2) as sock, file as file: - count = 1 - meth = self.meth_from_sock(sock) - sent = meth(file, count=count) - self.assertEqual(sent, count) - self.assertEqual(file.tell(), count) - - def testCountSmall(self): - count = 1 - conn = self.accept_conn() - data = self.recv_data(conn) - self.assertEqual(len(data), count) - self.assertEqual(data, self.FILEDATA[:count]) - - # count + offset - - def _testCountWithOffset(self): - address = self.serv.getsockname() - file = open(support.TESTFN, 'rb') - with socket.create_connection(address, timeout=2) as sock, file as file: - count = 100007 - meth = self.meth_from_sock(sock) - sent = meth(file, offset=2007, count=count) - self.assertEqual(sent, count) - self.assertEqual(file.tell(), count + 2007) - - def testCountWithOffset(self): - count = 100007 - conn = self.accept_conn() - data = self.recv_data(conn) - self.assertEqual(len(data), count) - self.assertEqual(data, self.FILEDATA[2007:count+2007]) - - # non blocking sockets are not supposed to work - - def _testNonBlocking(self): - address = self.serv.getsockname() - file = open(support.TESTFN, 'rb') - with socket.create_connection(address) as sock, file as file: - sock.setblocking(False) - meth = self.meth_from_sock(sock) - self.assertRaises(ValueError, meth, file) - self.assertRaises(ValueError, sock.sendfile, file) - - def testNonBlocking(self): - conn = self.accept_conn() - if conn.recv(8192): - self.fail('was not supposed to receive any data') - - # timeout (non-triggered) - - def _testWithTimeout(self): - address = self.serv.getsockname() - file = open(support.TESTFN, 'rb') - with socket.create_connection(address, timeout=2) as sock, file as file: - meth = self.meth_from_sock(sock) - sent = meth(file) - self.assertEqual(sent, self.FILESIZE) - - def testWithTimeout(self): - conn = self.accept_conn() - data = self.recv_data(conn) - self.assertEqual(len(data), self.FILESIZE) - self.assertEqual(data, self.FILEDATA) - - # timeout (triggered) - - def _testWithTimeoutTriggeredSend(self): - address = self.serv.getsockname() - with open(support.TESTFN, 'rb') as file: - with socket.create_connection(address) as sock: - sock.settimeout(0.01) - meth = self.meth_from_sock(sock) - self.assertRaises(socket.timeout, meth, file) - - def testWithTimeoutTriggeredSend(self): - conn = self.accept_conn() - conn.recv(88192) - - # errors - - def _test_errors(self): - pass - - def test_errors(self): - with open(support.TESTFN, 'rb') as file: - with socket.socket(type=socket.SOCK_DGRAM) as s: - meth = self.meth_from_sock(s) - self.assertRaisesRegex( - ValueError, "SOCK_STREAM", meth, file) - with open(support.TESTFN, 'rt') as file: - with socket.socket() as s: - meth = self.meth_from_sock(s) - self.assertRaisesRegex( - ValueError, "binary mode", meth, file) - with open(support.TESTFN, 'rb') as file: - with socket.socket() as s: - meth = self.meth_from_sock(s) - self.assertRaisesRegex(TypeError, "positive integer", - meth, file, count='2') - self.assertRaisesRegex(TypeError, "positive integer", - meth, file, count=0.1) - self.assertRaisesRegex(ValueError, "positive integer", - meth, file, count=0) - self.assertRaisesRegex(ValueError, "positive integer", - meth, file, count=-1) - - -@unittest.skipUnless(hasattr(os, "sendfile"), - 'os.sendfile() required for this test.') -class SendfileUsingSendfileTest(SendfileUsingSendTest): - """ - Test the sendfile() implementation of socket.sendfile(). - """ - def meth_from_sock(self, sock): - return getattr(sock, "_sendfile_use_sendfile") - - -@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required') -class LinuxKernelCryptoAPI(unittest.TestCase): - # tests for AF_ALG - def create_alg(self, typ, name): - sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) - try: - sock.bind((typ, name)) - except FileNotFoundError as e: - # type / algorithm is not available - sock.close() - raise unittest.SkipTest(str(e), typ, name) - else: - return sock - - # bpo-31705: On kernel older than 4.5, sendto() failed with ENOKEY, - # at least on ppc64le architecture - @support.requires_linux_version(4, 5) - def test_sha256(self): - expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396" - "177a9cb410ff61f20015ad") - with self.create_alg('hash', 'sha256') as algo: - op, _ = algo.accept() - with op: - op.sendall(b"abc") - self.assertEqual(op.recv(512), expected) - - op, _ = algo.accept() - with op: - op.send(b'a', socket.MSG_MORE) - op.send(b'b', socket.MSG_MORE) - op.send(b'c', socket.MSG_MORE) - op.send(b'') - self.assertEqual(op.recv(512), expected) - - def test_hmac_sha1(self): - expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79") - with self.create_alg('hash', 'hmac(sha1)') as algo: - algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe") - op, _ = algo.accept() - with op: - op.sendall(b"what do ya want for nothing?") - self.assertEqual(op.recv(512), expected) - - # Although it should work with 3.19 and newer the test blocks on - # Ubuntu 15.10 with Kernel 4.2.0-19. - @support.requires_linux_version(4, 3) - def test_aes_cbc(self): - key = bytes.fromhex('06a9214036b8a15b512e03d534120006') - iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41') - msg = b"Single block msg" - ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a') - msglen = len(msg) - with self.create_alg('skcipher', 'cbc(aes)') as algo: - algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) - op, _ = algo.accept() - with op: - op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, - flags=socket.MSG_MORE) - op.sendall(msg) - self.assertEqual(op.recv(msglen), ciphertext) - - op, _ = algo.accept() - with op: - op.sendmsg_afalg([ciphertext], - op=socket.ALG_OP_DECRYPT, iv=iv) - self.assertEqual(op.recv(msglen), msg) - - # long message - multiplier = 1024 - longmsg = [msg] * multiplier - op, _ = algo.accept() - with op: - op.sendmsg_afalg(longmsg, - op=socket.ALG_OP_ENCRYPT, iv=iv) - enc = op.recv(msglen * multiplier) - self.assertEqual(len(enc), msglen * multiplier) - self.assertEqual(enc[:msglen], ciphertext) - - op, _ = algo.accept() - with op: - op.sendmsg_afalg([enc], - op=socket.ALG_OP_DECRYPT, iv=iv) - dec = op.recv(msglen * multiplier) - self.assertEqual(len(dec), msglen * multiplier) - self.assertEqual(dec, msg * multiplier) - - @support.requires_linux_version(4, 9) # see issue29324 - def test_aead_aes_gcm(self): - key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c') - iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2') - plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069') - assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f') - expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354') - expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd') - - taglen = len(expected_tag) - assoclen = len(assoc) - - with self.create_alg('aead', 'gcm(aes)') as algo: - algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) - algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE, - None, taglen) - - # send assoc, plain and tag buffer in separate steps - op, _ = algo.accept() - with op: - op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, - assoclen=assoclen, flags=socket.MSG_MORE) - op.sendall(assoc, socket.MSG_MORE) - op.sendall(plain) - res = op.recv(assoclen + len(plain) + taglen) - self.assertEqual(expected_ct, res[assoclen:-taglen]) - self.assertEqual(expected_tag, res[-taglen:]) - - # now with msg - op, _ = algo.accept() - with op: - msg = assoc + plain - op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv, - assoclen=assoclen) - res = op.recv(assoclen + len(plain) + taglen) - self.assertEqual(expected_ct, res[assoclen:-taglen]) - self.assertEqual(expected_tag, res[-taglen:]) - - # create anc data manually - pack_uint32 = struct.Struct('I').pack - op, _ = algo.accept() - with op: - msg = assoc + plain - op.sendmsg( - [msg], - ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)], - [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv], - [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)], - ) - ) - res = op.recv(len(msg) + taglen) - self.assertEqual(expected_ct, res[assoclen:-taglen]) - self.assertEqual(expected_tag, res[-taglen:]) - - # decrypt and verify - op, _ = algo.accept() - with op: - msg = assoc + expected_ct + expected_tag - op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv, - assoclen=assoclen) - res = op.recv(len(msg) - taglen) - self.assertEqual(plain, res[assoclen:]) - - @support.requires_linux_version(4, 3) # see test_aes_cbc - def test_drbg_pr_sha256(self): - # deterministic random bit generator, prediction resistance, sha256 - with self.create_alg('rng', 'drbg_pr_sha256') as algo: - extra_seed = os.urandom(32) - algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed) - op, _ = algo.accept() - with op: - rn = op.recv(32) - self.assertEqual(len(rn), 32) - - def test_sendmsg_afalg_args(self): - sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) - with sock: - with self.assertRaises(TypeError): - sock.sendmsg_afalg() - - with self.assertRaises(TypeError): - sock.sendmsg_afalg(op=None) - - with self.assertRaises(TypeError): - sock.sendmsg_afalg(1) - - with self.assertRaises(TypeError): - sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None) - - with self.assertRaises(TypeError): - sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1) - - def test_length_restriction(self): - # bpo-35050, off-by-one error in length check - sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) - self.addCleanup(sock.close) - - # salg_type[14] - with self.assertRaises(FileNotFoundError): - sock.bind(("t" * 13, "name")) - with self.assertRaisesRegex(ValueError, "type too long"): - sock.bind(("t" * 14, "name")) - - # salg_name[64] - with self.assertRaises(FileNotFoundError): - sock.bind(("type", "n" * 63)) - with self.assertRaisesRegex(ValueError, "name too long"): - sock.bind(("type", "n" * 64)) - - -@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows") -class TestMSWindowsTCPFlags(unittest.TestCase): - knownTCPFlags = { - # available since long time ago - 'TCP_MAXSEG', - 'TCP_NODELAY', - # available starting with Windows 10 1607 - 'TCP_FASTOPEN', - # available starting with Windows 10 1703 - 'TCP_KEEPCNT', - # available starting with Windows 10 1709 - 'TCP_KEEPIDLE', - 'TCP_KEEPINTVL' - } - - def test_new_tcp_flags(self): - provided = [s for s in dir(socket) if s.startswith('TCP')] - unknown = [s for s in provided if s not in self.knownTCPFlags] - - self.assertEqual([], unknown, - "New TCP flags were discovered. See bpo-32394 for more information") - - -class CreateServerTest(unittest.TestCase): - - def test_address(self): - port = support.find_unused_port() - with socket.create_server(("127.0.0.1", port)) as sock: - self.assertEqual(sock.getsockname()[0], "127.0.0.1") - self.assertEqual(sock.getsockname()[1], port) - if support.IPV6_ENABLED: - with socket.create_server(("::1", port), - family=socket.AF_INET6) as sock: - self.assertEqual(sock.getsockname()[0], "::1") - self.assertEqual(sock.getsockname()[1], port) - - def test_family_and_type(self): - with socket.create_server(("127.0.0.1", 0)) as sock: - self.assertEqual(sock.family, socket.AF_INET) - self.assertEqual(sock.type, socket.SOCK_STREAM) - if support.IPV6_ENABLED: - with socket.create_server(("::1", 0), family=socket.AF_INET6) as s: - self.assertEqual(s.family, socket.AF_INET6) - self.assertEqual(sock.type, socket.SOCK_STREAM) - - def test_reuse_port(self): - if not hasattr(socket, "SO_REUSEPORT"): - with self.assertRaises(ValueError): - socket.create_server(("localhost", 0), reuse_port=True) - else: - with socket.create_server(("localhost", 0)) as sock: - opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) - self.assertEqual(opt, 0) - with socket.create_server(("localhost", 0), reuse_port=True) as sock: - opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) - self.assertNotEqual(opt, 0) - - @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or - not hasattr(_socket, 'IPV6_V6ONLY'), - "IPV6_V6ONLY option not supported") - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') - def test_ipv6_only_default(self): - with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock: - assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY) - - @unittest.skipIf(not socket.has_dualstack_ipv6(), - "dualstack_ipv6 not supported") - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') - def test_dualstack_ipv6_family(self): - with socket.create_server(("::1", 0), family=socket.AF_INET6, - dualstack_ipv6=True) as sock: - self.assertEqual(sock.family, socket.AF_INET6) - - -class CreateServerFunctionalTest(unittest.TestCase): - timeout = support.LOOPBACK_TIMEOUT - - def setUp(self): - self.thread = None - - def tearDown(self): - if self.thread is not None: - self.thread.join(self.timeout) - - def echo_server(self, sock): - def run(sock): - with sock: - conn, _ = sock.accept() - with conn: - event.wait(self.timeout) - msg = conn.recv(1024) - if not msg: - return - conn.sendall(msg) - - event = threading.Event() - sock.settimeout(self.timeout) - self.thread = threading.Thread(target=run, args=(sock, )) - self.thread.start() - event.set() - - def echo_client(self, addr, family): - with socket.socket(family=family) as sock: - sock.settimeout(self.timeout) - sock.connect(addr) - sock.sendall(b'foo') - self.assertEqual(sock.recv(1024), b'foo') - - def test_tcp4(self): - port = support.find_unused_port() - with socket.create_server(("", port)) as sock: - self.echo_server(sock) - self.echo_client(("127.0.0.1", port), socket.AF_INET) - - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') - def test_tcp6(self): - port = support.find_unused_port() - with socket.create_server(("", port), - family=socket.AF_INET6) as sock: - self.echo_server(sock) - self.echo_client(("::1", port), socket.AF_INET6) - - # --- dual stack tests - - @unittest.skipIf(not socket.has_dualstack_ipv6(), - "dualstack_ipv6 not supported") - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') - def test_dual_stack_client_v4(self): - port = support.find_unused_port() - with socket.create_server(("", port), family=socket.AF_INET6, - dualstack_ipv6=True) as sock: - self.echo_server(sock) - self.echo_client(("127.0.0.1", port), socket.AF_INET) - - @unittest.skipIf(not socket.has_dualstack_ipv6(), - "dualstack_ipv6 not supported") - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') - def test_dual_stack_client_v6(self): - port = support.find_unused_port() - with socket.create_server(("", port), family=socket.AF_INET6, - dualstack_ipv6=True) as sock: - self.echo_server(sock) - self.echo_client(("::1", port), socket.AF_INET6) - -@requireAttrs(socket, "send_fds") -@requireAttrs(socket, "recv_fds") -@requireAttrs(socket, "AF_UNIX") -class SendRecvFdsTests(unittest.TestCase): - def testSendAndRecvFds(self): - def close_pipes(pipes): - for fd1, fd2 in pipes: - os.close(fd1) - os.close(fd2) - - def close_fds(fds): - for fd in fds: - os.close(fd) - - # send 10 file descriptors - pipes = [os.pipe() for _ in range(10)] - self.addCleanup(close_pipes, pipes) - fds = [rfd for rfd, wfd in pipes] - - # use a UNIX socket pair to exchange file descriptors locally - sock1, sock2 = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - with sock1, sock2: - socket.send_fds(sock1, [MSG], fds) - # request more data and file descriptors than expected - msg, fds2, flags, addr = socket.recv_fds(sock2, len(MSG) * 2, len(fds) * 2) - self.addCleanup(close_fds, fds2) - - self.assertEqual(msg, MSG) - self.assertEqual(len(fds2), len(fds)) - self.assertEqual(flags, 0) - # don't test addr - - # test that file descriptors are connected - for index, fds in enumerate(pipes): - rfd, wfd = fds - os.write(wfd, str(index).encode()) - - for index, rfd in enumerate(fds2): - data = os.read(rfd, 100) - self.assertEqual(data, str(index).encode()) - - def test_main(): tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest, TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, - UDPTimeoutTest, CreateServerTest, CreateServerFunctionalTest, - SendRecvFdsTests] + UDPTimeoutTest ] tests.extend([ NonBlockingTCPTests, FileObjectClassTestCase, + FileObjectInterruptedTestCase, UnbufferedFileObjectClassTestCase, LineBufferedFileObjectClassTestCase, SmallBufferedFileObjectClassTestCase, - UnicodeReadFileObjectClassTestCase, - UnicodeWriteFileObjectClassTestCase, - UnicodeReadWriteFileObjectClassTestCase, + Urllib2FileobjectTest, NetworkConnectionNoServer, NetworkConnectionAttributesTest, NetworkConnectionBehaviourTest, - ContextManagersTest, - InheritanceTest, - NonblockConstantTest ]) tests.append(BasicSocketPairTest) - tests.append(TestUnixDomain) tests.append(TestLinuxAbstractNamespace) tests.extend([TIPCTest, TIPCThreadableTest]) - tests.extend([BasicCANTest, CANTest]) - tests.extend([BasicRDSTest, RDSTest]) - tests.append(LinuxKernelCryptoAPI) - tests.append(BasicQIPCRTRTest) - tests.extend([ - BasicVSOCKTest, - ThreadedVSOCKSocketStreamTest, - ]) - tests.append(BasicBluetoothTest) - tests.extend([ - CmsgMacroTests, - SendmsgUDPTest, - RecvmsgUDPTest, - RecvmsgIntoUDPTest, - SendmsgUDP6Test, - RecvmsgUDP6Test, - RecvmsgRFC3542AncillaryUDP6Test, - RecvmsgIntoRFC3542AncillaryUDP6Test, - RecvmsgIntoUDP6Test, - SendmsgUDPLITETest, - RecvmsgUDPLITETest, - RecvmsgIntoUDPLITETest, - SendmsgUDPLITE6Test, - RecvmsgUDPLITE6Test, - RecvmsgRFC3542AncillaryUDPLITE6Test, - RecvmsgIntoRFC3542AncillaryUDPLITE6Test, - RecvmsgIntoUDPLITE6Test, - SendmsgTCPTest, - RecvmsgTCPTest, - RecvmsgIntoTCPTest, - SendmsgSCTPStreamTest, - RecvmsgSCTPStreamTest, - RecvmsgIntoSCTPStreamTest, - SendmsgUnixStreamTest, - RecvmsgUnixStreamTest, - RecvmsgIntoUnixStreamTest, - RecvmsgSCMRightsStreamTest, - RecvmsgIntoSCMRightsStreamTest, - # These are slow when setitimer() is not available - InterruptedRecvTimeoutTest, - InterruptedSendTimeoutTest, - TestSocketSharing, - SendfileUsingSendTest, - SendfileUsingSendfileTest, - ]) - tests.append(TestMSWindowsTCPFlags) - - thread_info = support.threading_setup() - support.run_unittest(*tests) - support.threading_cleanup(*thread_info) + thread_info = test_support.threading_setup() + test_support.run_unittest(*tests) + test_support.threading_cleanup(*thread_info) if __name__ == "__main__": test_main() |