diff options
Diffstat (limited to 'Lib/test/test_asyncore.py')
-rw-r--r-- | Lib/test/test_asyncore.py | 197 |
1 files changed, 141 insertions, 56 deletions
diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py index 5f55df8..42a2525 100644 --- a/Lib/test/test_asyncore.py +++ b/Lib/test/test_asyncore.py @@ -21,6 +21,8 @@ except ImportError: HOST = support.HOST +HAS_UNIX_SOCKETS = hasattr(socket, 'AF_UNIX') + class dummysocket: def __init__(self): self.closed = False @@ -72,15 +74,16 @@ def capture_server(evt, buf, serv): pass else: n = 200 - while n > 0: - r, w, e = select.select([conn], [], []) + start = time.time() + while n > 0 and time.time() - start < 3.0: + r, w, e = select.select([conn], [], [], 0.1) if r: + n -= 1 data = conn.recv(10) # keep everything except for the newline terminator buf.write(data.replace(b'\n', b'')) if b'\n' in data: break - n -= 1 time.sleep(0.01) conn.close() @@ -88,6 +91,13 @@ def capture_server(evt, buf, serv): serv.close() evt.set() +def bind_af_aware(sock, addr): + """Helper function to bind a socket according to its family.""" + if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: + # Make sure the path doesn't exist. + unlink(addr) + sock.bind(addr) + class HelperFunctionTests(unittest.TestCase): def test_readwriteexc(self): @@ -353,7 +363,7 @@ class DispatcherWithSendTests(unittest.TestCase): @support.reap_threads def test_send(self): evt = threading.Event() - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket() sock.settimeout(3) port = support.bind_port(sock) @@ -368,7 +378,7 @@ class DispatcherWithSendTests(unittest.TestCase): data = b"Suppose there isn't a 16-ton weight?" d = dispatcherwithsend_noread() - d.create_socket(socket.AF_INET, socket.SOCK_STREAM) + d.create_socket() d.connect((HOST, port)) # give time for socket to connect @@ -468,22 +478,22 @@ class BaseTestHandler(asyncore.dispatcher): raise -class TCPServer(asyncore.dispatcher): +class BaseServer(asyncore.dispatcher): """A server which listens on an address and dispatches the connection to a handler. """ - def __init__(self, handler=BaseTestHandler, host=HOST, port=0): + def __init__(self, family, addr, handler=BaseTestHandler): asyncore.dispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.create_socket(family) self.set_reuse_addr() - self.bind((host, port)) + bind_af_aware(self.socket, addr) self.listen(5) self.handler = handler @property def address(self): - return self.socket.getsockname()[:2] + return self.socket.getsockname() def handle_accepted(self, sock, addr): self.handler(sock) @@ -494,9 +504,9 @@ class TCPServer(asyncore.dispatcher): class BaseClient(BaseTestHandler): - def __init__(self, address): + def __init__(self, family, address): BaseTestHandler.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.create_socket(family) self.connect(address) def handle_connect(self): @@ -526,8 +536,8 @@ class BaseTestAPI(unittest.TestCase): def handle_connect(self): self.flag = True - server = TCPServer() - client = TestClient(server.address) + server = BaseServer(self.family, self.addr) + client = TestClient(self.family, server.address) self.loop_waiting_for_flag(client) def test_handle_accept(self): @@ -535,18 +545,18 @@ class BaseTestAPI(unittest.TestCase): class TestListener(BaseTestHandler): - def __init__(self): + def __init__(self, family, addr): BaseTestHandler.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.bind((HOST, 0)) + self.create_socket(family) + bind_af_aware(self.socket, addr) self.listen(5) - self.address = self.socket.getsockname()[:2] + self.address = self.socket.getsockname() def handle_accept(self): self.flag = True - server = TestListener() - client = BaseClient(server.address) + server = TestListener(self.family, self.addr) + client = BaseClient(self.family, server.address) self.loop_waiting_for_flag(server) def test_handle_accepted(self): @@ -554,12 +564,12 @@ class BaseTestAPI(unittest.TestCase): class TestListener(BaseTestHandler): - def __init__(self): + def __init__(self, family, addr): BaseTestHandler.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.bind((HOST, 0)) + self.create_socket(family) + bind_af_aware(self.socket, addr) self.listen(5) - self.address = self.socket.getsockname()[:2] + self.address = self.socket.getsockname() def handle_accept(self): asyncore.dispatcher.handle_accept(self) @@ -568,8 +578,8 @@ class BaseTestAPI(unittest.TestCase): sock.close() self.flag = True - server = TestListener() - client = BaseClient(server.address) + server = TestListener(self.family, self.addr) + client = BaseClient(self.family, server.address) self.loop_waiting_for_flag(server) @@ -585,8 +595,8 @@ class BaseTestAPI(unittest.TestCase): BaseTestHandler.__init__(self, conn) self.send(b'x' * 1024) - server = TCPServer(TestHandler) - client = TestClient(server.address) + server = BaseServer(self.family, self.addr, TestHandler) + client = TestClient(self.family, server.address) self.loop_waiting_for_flag(client) def test_handle_write(self): @@ -596,8 +606,8 @@ class BaseTestAPI(unittest.TestCase): def handle_write(self): self.flag = True - server = TCPServer() - client = TestClient(server.address) + server = BaseServer(self.family, self.addr) + client = TestClient(self.family, server.address) self.loop_waiting_for_flag(client) def test_handle_close(self): @@ -620,8 +630,40 @@ class BaseTestAPI(unittest.TestCase): BaseTestHandler.__init__(self, conn) self.close() - server = TCPServer(TestHandler) - client = TestClient(server.address) + server = BaseServer(self.family, self.addr, TestHandler) + client = TestClient(self.family, server.address) + self.loop_waiting_for_flag(client) + + def test_handle_close_after_conn_broken(self): + # Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and + # #11265). + + data = b'\0' * 128 + + class TestClient(BaseClient): + + def handle_write(self): + self.send(data) + + def handle_close(self): + self.flag = True + self.close() + + def handle_expt(self): + self.flag = True + self.close() + + class TestHandler(BaseTestHandler): + + def handle_read(self): + self.recv(len(data)) + self.close() + + def writable(self): + return False + + server = BaseServer(self.family, self.addr, TestHandler) + client = TestClient(self.family, server.address) self.loop_waiting_for_flag(client) @unittest.skipIf(sys.platform.startswith("sunos"), @@ -630,9 +672,12 @@ class BaseTestAPI(unittest.TestCase): # Make sure handle_expt is called on OOB data received. # Note: this might fail on some platforms as OOB data is # tenuously supported and rarely used. + if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: + self.skipTest("Not applicable to AF_UNIX sockets.") class TestClient(BaseClient): def handle_expt(self): + self.socket.recv(1024, socket.MSG_OOB) self.flag = True class TestHandler(BaseTestHandler): @@ -640,8 +685,8 @@ class BaseTestAPI(unittest.TestCase): BaseTestHandler.__init__(self, conn) self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB) - server = TCPServer(TestHandler) - client = TestClient(server.address) + server = BaseServer(self.family, self.addr, TestHandler) + client = TestClient(self.family, server.address) self.loop_waiting_for_flag(client) def test_handle_error(self): @@ -658,13 +703,13 @@ class BaseTestAPI(unittest.TestCase): else: raise Exception("exception not raised") - server = TCPServer() - client = TestClient(server.address) + server = BaseServer(self.family, self.addr) + client = TestClient(self.family, server.address) self.loop_waiting_for_flag(client) def test_connection_attributes(self): - server = TCPServer() - client = BaseClient(server.address) + server = BaseServer(self.family, self.addr) + client = BaseClient(self.family, server.address) # we start disconnected self.assertFalse(server.connected) @@ -694,25 +739,29 @@ class BaseTestAPI(unittest.TestCase): def test_create_socket(self): s = asyncore.dispatcher() - s.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.assertEqual(s.socket.family, socket.AF_INET) + s.create_socket(self.family) + self.assertEqual(s.socket.family, self.family) SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0) self.assertEqual(s.socket.type, socket.SOCK_STREAM | SOCK_NONBLOCK) def test_bind(self): + if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: + self.skipTest("Not applicable to AF_UNIX sockets.") s1 = asyncore.dispatcher() - s1.create_socket(socket.AF_INET, socket.SOCK_STREAM) - s1.bind((HOST, 0)) + s1.create_socket(self.family) + s1.bind(self.addr) s1.listen(5) port = s1.socket.getsockname()[1] s2 = asyncore.dispatcher() - s2.create_socket(socket.AF_INET, socket.SOCK_STREAM) + s2.create_socket(self.family) # EADDRINUSE indicates the socket was correctly bound - self.assertRaises(socket.error, s2.bind, (HOST, port)) + self.assertRaises(socket.error, s2.bind, (self.addr[0], port)) def test_set_reuse_addr(self): - sock = socket.socket() + if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: + self.skipTest("Not applicable to AF_UNIX sockets.") + sock = socket.socket(self.family) try: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except socket.error: @@ -720,11 +769,11 @@ class BaseTestAPI(unittest.TestCase): else: # if SO_REUSEADDR succeeded for sock we expect asyncore # to do the same - s = asyncore.dispatcher(socket.socket()) + s = asyncore.dispatcher(socket.socket(self.family)) self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)) s.socket.close() - s.create_socket(socket.AF_INET, socket.SOCK_STREAM) + s.create_socket(self.family) s.set_reuse_addr() self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)) @@ -735,12 +784,14 @@ class BaseTestAPI(unittest.TestCase): @support.reap_threads def test_quick_connect(self): # see: http://bugs.python.org/issue10340 - server = TCPServer() - t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, count=500)) - t.start() + if self.family in (socket.AF_INET, getattr(socket, "AF_INET6", object())): + server = BaseServer(self.family, self.addr) + t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, + count=500)) + t.start() + - for x in range(20): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s = socket.socket(self.family, socket.SOCK_STREAM) s.settimeout(.2) s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0)) @@ -751,18 +802,52 @@ class BaseTestAPI(unittest.TestCase): finally: s.close() -class TestAPI_UseSelect(BaseTestAPI): +class TestAPI_UseIPv4Sockets(BaseTestAPI): + family = socket.AF_INET + addr = (HOST, 0) + +@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required') +class TestAPI_UseIPv6Sockets(BaseTestAPI): + family = socket.AF_INET6 + addr = ('::1', 0) + +@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required') +class TestAPI_UseUnixSockets(BaseTestAPI): + if HAS_UNIX_SOCKETS: + family = socket.AF_UNIX + addr = support.TESTFN + + def tearDown(self): + unlink(self.addr) + BaseTestAPI.tearDown(self) + +class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets): use_poll = False @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') -class TestAPI_UsePoll(BaseTestAPI): +class TestAPI_UseIPv4Poll(TestAPI_UseIPv4Sockets): use_poll = True +class TestAPI_UseIPv6Select(TestAPI_UseIPv6Sockets): + use_poll = False + +@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') +class TestAPI_UseIPv6Poll(TestAPI_UseIPv6Sockets): + use_poll = True + +class TestAPI_UseUnixSocketsSelect(TestAPI_UseUnixSockets): + use_poll = False + +@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') +class TestAPI_UseUnixSocketsPoll(TestAPI_UseUnixSockets): + use_poll = True def test_main(): tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests, - DispatcherWithSendTests_UsePoll, TestAPI_UseSelect, - TestAPI_UsePoll, FileWrapperTest] + DispatcherWithSendTests_UsePoll, FileWrapperTest, + TestAPI_UseIPv4Select, TestAPI_UseIPv4Poll, TestAPI_UseIPv6Select, + TestAPI_UseIPv6Poll, TestAPI_UseUnixSocketsSelect, + TestAPI_UseUnixSocketsPoll] run_unittest(*tests) if __name__ == "__main__": |