summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/test_asyncore.py276
1 files changed, 275 insertions, 1 deletions
diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py
index bc2dd72..2fb40b9 100644
--- a/Lib/test/test_asyncore.py
+++ b/Lib/test/test_asyncore.py
@@ -418,11 +418,285 @@ if hasattr(asyncore, 'file_wrapper'):
self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)
+class BaseTestHandler(asyncore.dispatcher):
+
+ def __init__(self, sock=None):
+ asyncore.dispatcher.__init__(self, sock)
+ self.flag = False
+
+ def handle_accept(self):
+ raise Exception("handle_accept not supposed to be called")
+
+ def handle_connect(self):
+ raise Exception("handle_connect not supposed to be called")
+
+ def handle_expt(self):
+ raise Exception("handle_expt not supposed to be called")
+
+ def handle_close(self):
+ raise Exception("handle_close not supposed to be called")
+
+ def handle_error(self):
+ raise
+
+
+class TCPServer(asyncore.dispatcher):
+ """A server which listens on an address and dispatches the
+ connection to a handler.
+ """
+
+ def __init__(self, handler=BaseTestHandler, host=HOST, port=0):
+ asyncore.dispatcher.__init__(self)
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.set_reuse_addr()
+ self.bind((host, port))
+ self.listen(5)
+ self.handler = handler
+
+ @property
+ def address(self):
+ return self.socket.getsockname()[:2]
+
+ def handle_accept(self):
+ sock, addr = self.accept()
+ self.handler(sock)
+
+ def handle_error(self):
+ raise
+
+
+class BaseClient(BaseTestHandler):
+
+ def __init__(self, address):
+ BaseTestHandler.__init__(self)
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.connect(address)
+
+ def handle_connect(self):
+ pass
+
+
+class BaseTestAPI(unittest.TestCase):
+
+ def tearDown(self):
+ asyncore.close_all()
+
+ def loop_waiting_for_flag(self, instance, timeout=5):
+ timeout = float(timeout) / 100
+ count = 100
+ while asyncore.socket_map and count > 0:
+ asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
+ if instance.flag:
+ return
+ count -= 1
+ time.sleep(timeout)
+ self.fail("flag not set")
+
+ def test_handle_connect(self):
+ # make sure handle_connect is called on connect()
+
+ class TestClient(BaseClient):
+ def handle_connect(self):
+ self.flag = True
+
+ server = TCPServer()
+ client = TestClient(server.address)
+ self.loop_waiting_for_flag(client)
+
+ def test_handle_accept(self):
+ # make sure handle_accept() is called when a client connects
+
+ class TestListener(BaseTestHandler):
+
+ def __init__(self):
+ BaseTestHandler.__init__(self)
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.bind((HOST, 0))
+ self.listen(5)
+ self.address = self.socket.getsockname()[:2]
+
+ def handle_accept(self):
+ self.flag = True
+
+ server = TestListener()
+ client = BaseClient(server.address)
+ self.loop_waiting_for_flag(server)
+
+ def test_handle_read(self):
+ # make sure handle_read is called on data received
+
+ class TestClient(BaseClient):
+ def handle_read(self):
+ self.flag = True
+
+ class TestHandler(BaseTestHandler):
+ def __init__(self, conn):
+ BaseTestHandler.__init__(self, conn)
+ self.send('x' * 1024)
+
+ server = TCPServer(TestHandler)
+ client = TestClient(server.address)
+ self.loop_waiting_for_flag(client)
+
+ def test_handle_write(self):
+ # make sure handle_write is called
+
+ class TestClient(BaseClient):
+ def handle_write(self):
+ self.flag = True
+
+ server = TCPServer()
+ client = TestClient(server.address)
+ self.loop_waiting_for_flag(client)
+
+ def test_handle_close(self):
+ # make sure handle_close is called when the other end closes
+ # the connection
+
+ class TestClient(BaseClient):
+
+ def handle_read(self):
+ # in order to make handle_close be called we are supposed
+ # to make at least one recv() call
+ self.recv(1024)
+
+ def handle_close(self):
+ self.flag = True
+ self.close()
+
+ class TestHandler(BaseTestHandler):
+ def __init__(self, conn):
+ BaseTestHandler.__init__(self, conn)
+ self.close()
+
+ server = TCPServer(TestHandler)
+ client = TestClient(server.address)
+ self.loop_waiting_for_flag(client)
+
+ @unittest.skipIf(sys.platform.startswith("sunos"),
+ "OOB support is broken on Solaris")
+ def test_handle_expt(self):
+ # Make sure handle_expt is called on OOB data received.
+ # Note: this might fail on some platforms as OOB data is
+ # tenuously supported and rarely used.
+
+ class TestClient(BaseClient):
+ def handle_expt(self):
+ self.flag = True
+
+ class TestHandler(BaseTestHandler):
+ def __init__(self, conn):
+ BaseTestHandler.__init__(self, conn)
+ self.socket.send(chr(244), socket.MSG_OOB)
+
+ server = TCPServer(TestHandler)
+ client = TestClient(server.address)
+ self.loop_waiting_for_flag(client)
+
+ def test_handle_error(self):
+
+ class TestClient(BaseClient):
+ def handle_write(self):
+ 1.0 / 0
+ def handle_error(self):
+ self.flag = True
+ try:
+ raise
+ except ZeroDivisionError:
+ pass
+ else:
+ raise Exception("exception not raised")
+
+ server = TCPServer()
+ client = TestClient(server.address)
+ self.loop_waiting_for_flag(client)
+
+ def test_connection_attributes(self):
+ server = TCPServer()
+ client = BaseClient(server.address)
+
+ # we start disconnected
+ self.assertFalse(server.connected)
+ self.assertTrue(server.accepting)
+ # XXX - Solaris seems to connect() immediately even without
+ # starting the poller. This is something which should be
+ # fixed as handle_connect() gets called immediately even if
+ # no connection actually took place (see issue #8490).
+ if not sys.platform.startswith("sunos"):
+ self.assertFalse(client.connected)
+ self.assertFalse(client.accepting)
+
+ # execute some loops so that client connects to server
+ asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100)
+ self.assertFalse(server.connected)
+ self.assertTrue(server.accepting)
+ self.assertTrue(client.connected)
+ self.assertFalse(client.accepting)
+
+ # disconnect the client
+ client.close()
+ self.assertFalse(server.connected)
+ self.assertTrue(server.accepting)
+ self.assertFalse(client.connected)
+ self.assertFalse(client.accepting)
+
+ # stop serving
+ server.close()
+ self.assertFalse(server.connected)
+ self.assertFalse(server.accepting)
+
+ def test_create_socket(self):
+ s = asyncore.dispatcher()
+ s.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.assertEqual(s.socket.family, socket.AF_INET)
+ self.assertEqual(s.socket.type, socket.SOCK_STREAM)
+
+ def test_bind(self):
+ s1 = asyncore.dispatcher()
+ s1.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ s1.bind((HOST, 0))
+ s1.listen(5)
+ port = s1.socket.getsockname()[1]
+
+ s2 = asyncore.dispatcher()
+ s2.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ # EADDRINUSE indicates the socket was correctly bound
+ self.assertRaises(socket.error, s2.bind, (HOST, port))
+
+ def test_set_reuse_addr(self):
+ sock = socket.socket()
+ try:
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ except socket.error:
+ unittest.skip("SO_REUSEADDR not supported on this platform")
+ else:
+ # if SO_REUSEADDR succeeded for sock we expect asyncore
+ # to do the same
+ s = asyncore.dispatcher(socket.socket())
+ self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR))
+ s.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.set_reuse_addr()
+ self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR))
+ finally:
+ sock.close()
+
+
+class TestAPI_UseSelect(BaseTestAPI):
+ use_poll = False
+
+class TestAPI_UsePoll(BaseTestAPI):
+ use_poll = True
+
+
def test_main():
tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
- DispatcherWithSendTests_UsePoll]
+ DispatcherWithSendTests_UsePoll, TestAPI_UseSelect]
if hasattr(asyncore, 'file_wrapper'):
tests.append(FileWrapperTest)
+ if hasattr(select, 'poll'):
+ tests.append(TestAPI_UsePoll)
run_unittest(*tests)