From 8fc1178cf81e155f1e50151b5820c24e23d43e37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giampaolo=20Rodol=C3=A0?= Date: Mon, 10 May 2010 15:40:49 +0000 Subject: Merged revisions 81043 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r81043 | giampaolo.rodola | 2010-05-10 17:33:22 +0200 (lun, 10 mag 2010) | 1 line Issue #8490: adds a more solid test suite for asyncore ........ --- Lib/test/test_asyncore.py | 276 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 275 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py index 4b388e7..e5b2f0c 100644 --- a/Lib/test/test_asyncore.py +++ b/Lib/test/test_asyncore.py @@ -419,11 +419,285 @@ if hasattr(asyncore, 'file_wrapper'): self.assertEqual(open(TESTFN, 'rb').read(), self.d + d1 + d2) +class BaseTestHandler(asyncore.dispatcher): + + def __init__(self, sock=None): + asyncore.dispatcher.__init__(self, sock) + self.flag = False + + def handle_accept(self): + raise Exception("handle_accept not supposed to be called") + + def handle_connect(self): + raise Exception("handle_connect not supposed to be called") + + def handle_expt(self): + raise Exception("handle_expt not supposed to be called") + + def handle_close(self): + raise Exception("handle_close not supposed to be called") + + def handle_error(self): + raise + + +class TCPServer(asyncore.dispatcher): + """A server which listens on an address and dispatches the + connection to a handler. + """ + + def __init__(self, handler=BaseTestHandler, host=HOST, port=0): + asyncore.dispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind((host, port)) + self.listen(5) + self.handler = handler + + @property + def address(self): + return self.socket.getsockname()[:2] + + def handle_accept(self): + sock, addr = self.accept() + self.handler(sock) + + def handle_error(self): + raise + + +class BaseClient(BaseTestHandler): + + def __init__(self, address): + BaseTestHandler.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect(address) + + def handle_connect(self): + pass + + +class BaseTestAPI(unittest.TestCase): + + def tearDown(self): + asyncore.close_all() + + def loop_waiting_for_flag(self, instance, timeout=5): + timeout = float(timeout) / 100 + count = 100 + while asyncore.socket_map and count > 0: + asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll) + if instance.flag: + return + count -= 1 + time.sleep(timeout) + self.fail("flag not set") + + def test_handle_connect(self): + # make sure handle_connect is called on connect() + + class TestClient(BaseClient): + def handle_connect(self): + self.flag = True + + server = TCPServer() + client = TestClient(server.address) + self.loop_waiting_for_flag(client) + + def test_handle_accept(self): + # make sure handle_accept() is called when a client connects + + class TestListener(BaseTestHandler): + + def __init__(self): + BaseTestHandler.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.bind((HOST, 0)) + self.listen(5) + self.address = self.socket.getsockname()[:2] + + def handle_accept(self): + self.flag = True + + server = TestListener() + client = BaseClient(server.address) + self.loop_waiting_for_flag(server) + + def test_handle_read(self): + # make sure handle_read is called on data received + + class TestClient(BaseClient): + def handle_read(self): + self.flag = True + + class TestHandler(BaseTestHandler): + def __init__(self, conn): + BaseTestHandler.__init__(self, conn) + self.send(b'x' * 1024) + + server = TCPServer(TestHandler) + client = TestClient(server.address) + self.loop_waiting_for_flag(client) + + def test_handle_write(self): + # make sure handle_write is called + + class TestClient(BaseClient): + def handle_write(self): + self.flag = True + + server = TCPServer() + client = TestClient(server.address) + self.loop_waiting_for_flag(client) + + def test_handle_close(self): + # make sure handle_close is called when the other end closes + # the connection + + class TestClient(BaseClient): + + def handle_read(self): + # in order to make handle_close be called we are supposed + # to make at least one recv() call + self.recv(1024) + + def handle_close(self): + self.flag = True + self.close() + + class TestHandler(BaseTestHandler): + def __init__(self, conn): + BaseTestHandler.__init__(self, conn) + self.close() + + server = TCPServer(TestHandler) + client = TestClient(server.address) + self.loop_waiting_for_flag(client) + + @unittest.skipIf(sys.platform.startswith("sunos"), + "OOB support is broken on Solaris") + def test_handle_expt(self): + # Make sure handle_expt is called on OOB data received. + # Note: this might fail on some platforms as OOB data is + # tenuously supported and rarely used. + + class TestClient(BaseClient): + def handle_expt(self): + self.flag = True + + class TestHandler(BaseTestHandler): + def __init__(self, conn): + BaseTestHandler.__init__(self, conn) + self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB) + + server = TCPServer(TestHandler) + client = TestClient(server.address) + self.loop_waiting_for_flag(client) + + def test_handle_error(self): + + class TestClient(BaseClient): + def handle_write(self): + 1.0 / 0 + def handle_error(self): + self.flag = True + try: + raise + except ZeroDivisionError: + pass + else: + raise Exception("exception not raised") + + server = TCPServer() + client = TestClient(server.address) + self.loop_waiting_for_flag(client) + + def test_connection_attributes(self): + server = TCPServer() + client = BaseClient(server.address) + + # we start disconnected + self.assertFalse(server.connected) + self.assertTrue(server.accepting) + # XXX - Solaris seems to connect() immediately even without + # starting the poller. This is something which should be + # fixed as handle_connect() gets called immediately even if + # no connection actually took place (see issue #8490). + if not sys.platform.startswith("sunos"): + self.assertFalse(client.connected) + self.assertFalse(client.accepting) + + # execute some loops so that client connects to server + asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100) + self.assertFalse(server.connected) + self.assertTrue(server.accepting) + self.assertTrue(client.connected) + self.assertFalse(client.accepting) + + # disconnect the client + client.close() + self.assertFalse(server.connected) + self.assertTrue(server.accepting) + self.assertFalse(client.connected) + self.assertFalse(client.accepting) + + # stop serving + server.close() + self.assertFalse(server.connected) + self.assertFalse(server.accepting) + + def test_create_socket(self): + s = asyncore.dispatcher() + s.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.assertEqual(s.socket.family, socket.AF_INET) + self.assertEqual(s.socket.type, socket.SOCK_STREAM) + + def test_bind(self): + s1 = asyncore.dispatcher() + s1.create_socket(socket.AF_INET, socket.SOCK_STREAM) + s1.bind((HOST, 0)) + s1.listen(5) + port = s1.socket.getsockname()[1] + + s2 = asyncore.dispatcher() + s2.create_socket(socket.AF_INET, socket.SOCK_STREAM) + # EADDRINUSE indicates the socket was correctly bound + self.assertRaises(socket.error, s2.bind, (HOST, port)) + + def test_set_reuse_addr(self): + sock = socket.socket() + try: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + except socket.error: + unittest.skip("SO_REUSEADDR not supported on this platform") + else: + # if SO_REUSEADDR succeeded for sock we expect asyncore + # to do the same + s = asyncore.dispatcher(socket.socket()) + self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR)) + s.create_socket(socket.AF_INET, socket.SOCK_STREAM) + s.set_reuse_addr() + self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR)) + finally: + sock.close() + + +class TestAPI_UseSelect(BaseTestAPI): + use_poll = False + +class TestAPI_UsePoll(BaseTestAPI): + use_poll = True + + def test_main(): tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests, - DispatcherWithSendTests_UsePoll] + DispatcherWithSendTests_UsePoll, TestAPI_UseSelect] if hasattr(asyncore, 'file_wrapper'): tests.append(FileWrapperTest) + if hasattr(select, 'poll'): + tests.append(TestAPI_UsePoll) run_unittest(*tests) -- cgit v0.12