summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
authorKristján Valur Jónsson <kristjan@ccpgames.com>2009-07-01 10:01:31 (GMT)
committerKristján Valur Jónsson <kristjan@ccpgames.com>2009-07-01 10:01:31 (GMT)
commit985fc6a3041e992448827fd2b496cc5b3f344b28 (patch)
tree776a30cdbfe53180711389c95fa550d77c2e65a2 /Lib/test
parentfae45ed87441f54b10607081e0455acd9e5ad630 (diff)
downloadcpython-985fc6a3041e992448827fd2b496cc5b3f344b28.zip
cpython-985fc6a3041e992448827fd2b496cc5b3f344b28.tar.gz
cpython-985fc6a3041e992448827fd2b496cc5b3f344b28.tar.bz2
http://bugs.python.org/issue6267
porting revision 73638 to py3k
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_xmlrpc.py119
1 files changed, 115 insertions, 4 deletions
diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py
index 7bcb6ba..100a93c 100644
--- a/Lib/test/test_xmlrpc.py
+++ b/Lib/test/test_xmlrpc.py
@@ -246,7 +246,7 @@ ADDR = PORT = URL = None
# The evt is set twice. First when the server is ready to serve.
# Second when the server has been shutdown. The user must clear
# the event after it has been set the first time to catch the second set.
-def http_server(evt, numrequests):
+def http_server(evt, numrequests, requestHandler=None):
class TestInstanceClass:
def div(self, x, y):
return x // y
@@ -267,7 +267,9 @@ def http_server(evt, numrequests):
s.setblocking(True)
return s, port
- serv = MyXMLRPCServer(("localhost", 0),
+ if not requestHandler:
+ requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
+ serv = MyXMLRPCServer(("localhost", 0), requestHandler,
logRequests=False, bind_and_activate=False)
try:
serv.server_bind()
@@ -318,14 +320,15 @@ def is_unavailable_exception(e):
if exc_mess and 'temporarily unavailable' in exc_mess.lower():
return True
-class SimpleServerTestCase(unittest.TestCase):
+class BaseServerTestCase(unittest.TestCase):
+ requestHandler = None
def setUp(self):
# enable traceback reporting
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
self.evt = threading.Event()
# start server thread to handle requests
- serv_args = (self.evt, 1)
+ serv_args = (self.evt, 1, self.requestHandler)
threading.Thread(target=http_server, args=serv_args).start()
# wait for the server to be ready
@@ -343,6 +346,7 @@ class SimpleServerTestCase(unittest.TestCase):
# disable traceback reporting
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
+class SimpleServerTestCase(BaseServerTestCase):
def test_simple1(self):
try:
p = xmlrpclib.ServerProxy(URL)
@@ -478,6 +482,110 @@ class SimpleServerTestCase(unittest.TestCase):
# This avoids waiting for the socket timeout.
self.test_simple1()
+#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
+#does indeed serve subsequent requests on the same connection
+class KeepaliveServerTestCase(BaseServerTestCase):
+ #a request handler that supports keep-alive and logs requests into a
+ #class variable
+ class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
+ parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
+ protocol_version = 'HTTP/1.1'
+ myRequests = []
+ def handle(self):
+ self.myRequests.append([])
+ return self.parentClass.handle(self)
+ def handle_one_request(self):
+ result = self.parentClass.handle_one_request(self)
+ self.myRequests[-1].append(self.raw_requestline)
+ return result
+
+ requestHandler = RequestHandler
+ def setUp(self):
+ #clear request log
+ self.RequestHandler.myRequests = []
+ return BaseServerTestCase.setUp(self)
+
+ def test_two(self):
+ p = xmlrpclib.ServerProxy(URL)
+ self.assertEqual(p.pow(6,8), 6**8)
+ self.assertEqual(p.pow(6,8), 6**8)
+ self.assertEqual(len(self.RequestHandler.myRequests), 1)
+ #we may or may not catch the final "append" with the empty line
+ self.assertTrue(len(self.RequestHandler.myRequests[-1]) >= 2)
+
+#A test case that verifies that gzip encoding works in both directions
+#(for a request and the response)
+class GzipServerTestCase(BaseServerTestCase):
+ #a request handler that supports keep-alive and logs requests into a
+ #class variable
+ class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
+ parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
+ protocol_version = 'HTTP/1.1'
+
+ def do_POST(self):
+ #store content of last request in class
+ self.__class__.content_length = int(self.headers["content-length"])
+ return self.parentClass.do_POST(self)
+ requestHandler = RequestHandler
+
+ class Transport(xmlrpclib.Transport):
+ #custom transport, stores the response length for our perusal
+ fake_gzip = False
+ def parse_response(self, response):
+ self.response_length=int(response.getheader("content-length", 0))
+ return xmlrpclib.Transport.parse_response(self, response)
+
+ def send_content(self, connection, body):
+ if self.fake_gzip:
+ #add a lone gzip header to induce decode error remotely
+ connection.putheader("Content-Encoding", "gzip")
+ return xmlrpclib.Transport.send_content(self, connection, body)
+
+ def test_gzip_request(self):
+ t = self.Transport()
+ t.encode_threshold = None
+ p = xmlrpclib.ServerProxy(URL, transport=t)
+ self.assertEqual(p.pow(6,8), 6**8)
+ a = self.RequestHandler.content_length
+ t.encode_threshold = 0 #turn on request encoding
+ self.assertEqual(p.pow(6,8), 6**8)
+ b = self.RequestHandler.content_length
+ self.assertTrue(a>b)
+
+ def test_bad_gzip_request(self):
+ t = self.Transport()
+ t.encode_threshold = None
+ t.fake_gzip = True
+ p = xmlrpclib.ServerProxy(URL, transport=t)
+ cm = self.assertRaisesRegexp(xmlrpclib.ProtocolError,
+ re.compile(r"\b400\b"))
+ with cm:
+ p.pow(6, 8)
+
+ def test_gsip_response(self):
+ t = self.Transport()
+ p = xmlrpclib.ServerProxy(URL, transport=t)
+ old = self.requestHandler.encode_threshold
+ self.requestHandler.encode_threshold = None #no encoding
+ self.assertEqual(p.pow(6,8), 6**8)
+ a = t.response_length
+ self.requestHandler.encode_threshold = 0 #always encode
+ self.assertEqual(p.pow(6,8), 6**8)
+ b = t.response_length
+ self.requestHandler.encode_threshold = old
+ self.assertTrue(a>b)
+
+#Test special attributes of the ServerProxy object
+class ServerProxyTestCase(unittest.TestCase):
+ def test_close(self):
+ p = xmlrpclib.ServerProxy(URL)
+ self.assertEqual(p('close')(), None)
+
+ def test_transport(self):
+ t = xmlrpclib.Transport()
+ p = xmlrpclib.ServerProxy(URL, transport=t)
+ self.assertEqual(p('transport'), t)
+
# This is a contrived way to make a failure occur on the server side
# in order to test the _send_traceback_header flag on the server
class FailingMessageClass(http.client.HTTPMessage):
@@ -647,6 +755,9 @@ def test_main():
xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
BinaryTestCase, FaultTestCase]
xmlrpc_tests.append(SimpleServerTestCase)
+ xmlrpc_tests.append(KeepaliveServerTestCase)
+ xmlrpc_tests.append(GzipServerTestCase)
+ xmlrpc_tests.append(ServerProxyTestCase)
xmlrpc_tests.append(FailingServerTestCase)
xmlrpc_tests.append(CGIHandlerTestCase)