diff options
author | Kristján Valur Jónsson <kristjan@ccpgames.com> | 2009-07-01 10:01:31 (GMT) |
---|---|---|
committer | Kristján Valur Jónsson <kristjan@ccpgames.com> | 2009-07-01 10:01:31 (GMT) |
commit | 985fc6a3041e992448827fd2b496cc5b3f344b28 (patch) | |
tree | 776a30cdbfe53180711389c95fa550d77c2e65a2 /Lib/test/test_xmlrpc.py | |
parent | fae45ed87441f54b10607081e0455acd9e5ad630 (diff) | |
download | cpython-985fc6a3041e992448827fd2b496cc5b3f344b28.zip cpython-985fc6a3041e992448827fd2b496cc5b3f344b28.tar.gz cpython-985fc6a3041e992448827fd2b496cc5b3f344b28.tar.bz2 |
http://bugs.python.org/issue6267
porting revision 73638 to py3k
Diffstat (limited to 'Lib/test/test_xmlrpc.py')
-rw-r--r-- | Lib/test/test_xmlrpc.py | 119 |
1 files changed, 115 insertions, 4 deletions
diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index 7bcb6ba..100a93c 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -246,7 +246,7 @@ ADDR = PORT = URL = None # The evt is set twice. First when the server is ready to serve. # Second when the server has been shutdown. The user must clear # the event after it has been set the first time to catch the second set. -def http_server(evt, numrequests): +def http_server(evt, numrequests, requestHandler=None): class TestInstanceClass: def div(self, x, y): return x // y @@ -267,7 +267,9 @@ def http_server(evt, numrequests): s.setblocking(True) return s, port - serv = MyXMLRPCServer(("localhost", 0), + if not requestHandler: + requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler + serv = MyXMLRPCServer(("localhost", 0), requestHandler, logRequests=False, bind_and_activate=False) try: serv.server_bind() @@ -318,14 +320,15 @@ def is_unavailable_exception(e): if exc_mess and 'temporarily unavailable' in exc_mess.lower(): return True -class SimpleServerTestCase(unittest.TestCase): +class BaseServerTestCase(unittest.TestCase): + requestHandler = None def setUp(self): # enable traceback reporting xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True self.evt = threading.Event() # start server thread to handle requests - serv_args = (self.evt, 1) + serv_args = (self.evt, 1, self.requestHandler) threading.Thread(target=http_server, args=serv_args).start() # wait for the server to be ready @@ -343,6 +346,7 @@ class SimpleServerTestCase(unittest.TestCase): # disable traceback reporting xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False +class SimpleServerTestCase(BaseServerTestCase): def test_simple1(self): try: p = xmlrpclib.ServerProxy(URL) @@ -478,6 +482,110 @@ class SimpleServerTestCase(unittest.TestCase): # This avoids waiting for the socket timeout. self.test_simple1() +#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism +#does indeed serve subsequent requests on the same connection +class KeepaliveServerTestCase(BaseServerTestCase): + #a request handler that supports keep-alive and logs requests into a + #class variable + class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): + parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler + protocol_version = 'HTTP/1.1' + myRequests = [] + def handle(self): + self.myRequests.append([]) + return self.parentClass.handle(self) + def handle_one_request(self): + result = self.parentClass.handle_one_request(self) + self.myRequests[-1].append(self.raw_requestline) + return result + + requestHandler = RequestHandler + def setUp(self): + #clear request log + self.RequestHandler.myRequests = [] + return BaseServerTestCase.setUp(self) + + def test_two(self): + p = xmlrpclib.ServerProxy(URL) + self.assertEqual(p.pow(6,8), 6**8) + self.assertEqual(p.pow(6,8), 6**8) + self.assertEqual(len(self.RequestHandler.myRequests), 1) + #we may or may not catch the final "append" with the empty line + self.assertTrue(len(self.RequestHandler.myRequests[-1]) >= 2) + +#A test case that verifies that gzip encoding works in both directions +#(for a request and the response) +class GzipServerTestCase(BaseServerTestCase): + #a request handler that supports keep-alive and logs requests into a + #class variable + class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): + parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler + protocol_version = 'HTTP/1.1' + + def do_POST(self): + #store content of last request in class + self.__class__.content_length = int(self.headers["content-length"]) + return self.parentClass.do_POST(self) + requestHandler = RequestHandler + + class Transport(xmlrpclib.Transport): + #custom transport, stores the response length for our perusal + fake_gzip = False + def parse_response(self, response): + self.response_length=int(response.getheader("content-length", 0)) + return xmlrpclib.Transport.parse_response(self, response) + + def send_content(self, connection, body): + if self.fake_gzip: + #add a lone gzip header to induce decode error remotely + connection.putheader("Content-Encoding", "gzip") + return xmlrpclib.Transport.send_content(self, connection, body) + + def test_gzip_request(self): + t = self.Transport() + t.encode_threshold = None + p = xmlrpclib.ServerProxy(URL, transport=t) + self.assertEqual(p.pow(6,8), 6**8) + a = self.RequestHandler.content_length + t.encode_threshold = 0 #turn on request encoding + self.assertEqual(p.pow(6,8), 6**8) + b = self.RequestHandler.content_length + self.assertTrue(a>b) + + def test_bad_gzip_request(self): + t = self.Transport() + t.encode_threshold = None + t.fake_gzip = True + p = xmlrpclib.ServerProxy(URL, transport=t) + cm = self.assertRaisesRegexp(xmlrpclib.ProtocolError, + re.compile(r"\b400\b")) + with cm: + p.pow(6, 8) + + def test_gsip_response(self): + t = self.Transport() + p = xmlrpclib.ServerProxy(URL, transport=t) + old = self.requestHandler.encode_threshold + self.requestHandler.encode_threshold = None #no encoding + self.assertEqual(p.pow(6,8), 6**8) + a = t.response_length + self.requestHandler.encode_threshold = 0 #always encode + self.assertEqual(p.pow(6,8), 6**8) + b = t.response_length + self.requestHandler.encode_threshold = old + self.assertTrue(a>b) + +#Test special attributes of the ServerProxy object +class ServerProxyTestCase(unittest.TestCase): + def test_close(self): + p = xmlrpclib.ServerProxy(URL) + self.assertEqual(p('close')(), None) + + def test_transport(self): + t = xmlrpclib.Transport() + p = xmlrpclib.ServerProxy(URL, transport=t) + self.assertEqual(p('transport'), t) + # This is a contrived way to make a failure occur on the server side # in order to test the _send_traceback_header flag on the server class FailingMessageClass(http.client.HTTPMessage): @@ -647,6 +755,9 @@ def test_main(): xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase, BinaryTestCase, FaultTestCase] xmlrpc_tests.append(SimpleServerTestCase) + xmlrpc_tests.append(KeepaliveServerTestCase) + xmlrpc_tests.append(GzipServerTestCase) + xmlrpc_tests.append(ServerProxyTestCase) xmlrpc_tests.append(FailingServerTestCase) xmlrpc_tests.append(CGIHandlerTestCase) |