diff options
Diffstat (limited to 'Lib/test/test_xmlrpc.py')
-rw-r--r-- | Lib/test/test_xmlrpc.py | 454 |
1 files changed, 394 insertions, 60 deletions
diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index 9833f26..3814191 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -5,13 +5,19 @@ import time import unittest import xmlrpc.client as xmlrpclib import xmlrpc.server -import threading import http.client import socket import os import re +import io +import contextlib from test import support +try: + import threading +except ImportError: + threading = None + alist = [{'astring': 'foo@bar.baz.spam', 'afloat': 7283.43, 'anint': 2**20, @@ -60,24 +66,15 @@ class XMLRPCTestCase(unittest.TestCase): (newdt,), m = xmlrpclib.loads(s, use_datetime=0) self.assertEqual(newdt, xmlrpclib.DateTime('00010210T11:41:23')) - def test_cmp_datetime_DateTime(self): - now = datetime.datetime.now() - dt = xmlrpclib.DateTime(now.timetuple()) - self.assertTrue(dt == now) - self.assertTrue(now == dt) - then = now + datetime.timedelta(seconds=4) - self.assertTrue(then >= dt) - self.assertTrue(dt < then) - def test_bug_1164912 (self): d = xmlrpclib.DateTime() ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,), methodresponse=True)) - self.assertTrue(isinstance(new_d.value, str)) + self.assertIsInstance(new_d.value, str) # Check that the output of dumps() is still an 8-bit string s = xmlrpclib.dumps((new_d,), methodresponse=True) - self.assertTrue(isinstance(s, str)) + self.assertIsInstance(s, str) def test_newstyle_class(self): class T(object): @@ -143,6 +140,9 @@ class XMLRPCTestCase(unittest.TestCase): ('host.tld', [('Authorization', 'Basic dXNlcg==')], {})) + def test_dump_bytes(self): + self.assertRaises(TypeError, xmlrpclib.dumps, (b"my dog has fleas",)) + def test_ssl_presence(self): try: import ssl @@ -180,7 +180,7 @@ class FaultTestCase(unittest.TestCase): self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s) def test_dotted_attribute(self): - # this will raise AttirebuteError because code don't want us to use + # this will raise AttributeError because code don't want us to use # private methods self.assertRaises(AttributeError, xmlrpc.server.resolve_dotted_attribute, str, '__add') @@ -227,6 +227,45 @@ class DateTimeTestCase(unittest.TestCase): t2 = xmlrpclib._datetime(d) self.assertEqual(t1, tref) + def test_comparison(self): + now = datetime.datetime.now() + dtime = xmlrpclib.DateTime(now.timetuple()) + + # datetime vs. DateTime + self.assertTrue(dtime == now) + self.assertTrue(now == dtime) + then = now + datetime.timedelta(seconds=4) + self.assertTrue(then >= dtime) + self.assertTrue(dtime < then) + + # str vs. DateTime + dstr = now.strftime("%Y%m%dT%H:%M:%S") + self.assertTrue(dtime == dstr) + self.assertTrue(dstr == dtime) + dtime_then = xmlrpclib.DateTime(then.timetuple()) + self.assertTrue(dtime_then >= dstr) + self.assertTrue(dstr < dtime_then) + + # some other types + dbytes = dstr.encode('ascii') + dtuple = now.timetuple() + with self.assertRaises(TypeError): + dtime == 1970 + with self.assertRaises(TypeError): + dtime != dbytes + with self.assertRaises(TypeError): + dtime == bytearray(dbytes) + with self.assertRaises(TypeError): + dtime != dtuple + with self.assertRaises(TypeError): + dtime < float(1970) + with self.assertRaises(TypeError): + dtime > dbytes + with self.assertRaises(TypeError): + dtime <= bytearray(dbytes) + with self.assertRaises(TypeError): + dtime >= dtuple + class BinaryTestCase(unittest.TestCase): # XXX What should str(Binary(b"\xff")) return? I'm chosing "\xff" @@ -259,7 +298,7 @@ ADDR = PORT = URL = None # The evt is set twice. First when the server is ready to serve. # Second when the server has been shutdown. The user must clear # the event after it has been set the first time to catch the second set. -def http_server(evt, numrequests): +def http_server(evt, numrequests, requestHandler=None): class TestInstanceClass: def div(self, x, y): return x // y @@ -280,14 +319,16 @@ def http_server(evt, numrequests): s.setblocking(True) return s, port - serv = MyXMLRPCServer(("localhost", 0), + if not requestHandler: + requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler + serv = MyXMLRPCServer(("localhost", 0), requestHandler, logRequests=False, bind_and_activate=False) try: serv.server_bind() global ADDR, PORT, URL ADDR, PORT = serv.socket.getsockname() #connect to IP address directly. This avoids socket.create_connection() - #trying to connect to to "localhost" using all address families, which + #trying to connect to "localhost" using all address families, which #causes slowdown e.g. on vista which supports AF_INET6. The server listens #on AF_INET only. URL = "http://%s:%d"%(ADDR, PORT) @@ -312,6 +353,71 @@ def http_server(evt, numrequests): PORT = None evt.set() +def http_multi_server(evt, numrequests, requestHandler=None): + class TestInstanceClass: + def div(self, x, y): + return x // y + + def _methodHelp(self, name): + if name == 'div': + return 'This is the div function' + + def my_function(): + '''This is my function''' + return True + + class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer): + def get_request(self): + # Ensure the socket is always non-blocking. On Linux, socket + # attributes are not inherited like they are on *BSD and Windows. + s, port = self.socket.accept() + s.setblocking(True) + return s, port + + if not requestHandler: + requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler + class MyRequestHandler(requestHandler): + rpc_paths = [] + + class BrokenDispatcher: + def _marshaled_dispatch(self, data, dispatch_method=None, path=None): + raise RuntimeError("broken dispatcher") + + serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler, + logRequests=False, bind_and_activate=False) + serv.socket.settimeout(3) + serv.server_bind() + try: + global ADDR, PORT, URL + ADDR, PORT = serv.socket.getsockname() + #connect to IP address directly. This avoids socket.create_connection() + #trying to connect to "localhost" using all address families, which + #causes slowdown e.g. on vista which supports AF_INET6. The server listens + #on AF_INET only. + URL = "http://%s:%d"%(ADDR, PORT) + serv.server_activate() + paths = ["/foo", "/foo/bar"] + for path in paths: + d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher()) + d.register_introspection_functions() + d.register_multicall_functions() + serv.get_dispatcher(paths[0]).register_function(pow) + serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add') + serv.add_dispatcher("/is/broken", BrokenDispatcher()) + evt.set() + + # handle up to 'numrequests' requests + while numrequests > 0: + serv.handle_request() + numrequests -= 1 + + except socket.timeout: + pass + finally: + serv.socket.close() + PORT = None + evt.set() + # This function prevents errors like: # <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error> def is_unavailable_exception(e): @@ -331,15 +437,36 @@ def is_unavailable_exception(e): if exc_mess and 'temporarily unavailable' in exc_mess.lower(): return True -class SimpleServerTestCase(unittest.TestCase): +def make_request_and_skipIf(condition, reason): + # If we skip the test, we have to make a request because the + # the server created in setUp blocks expecting one to come in. + if not condition: + return lambda func: func + def decorator(func): + def make_request_and_skip(self): + try: + xmlrpclib.ServerProxy(URL).my_function() + except (xmlrpclib.ProtocolError, socket.error) as e: + if not is_unavailable_exception(e): + raise + raise unittest.SkipTest(reason) + return make_request_and_skip + return decorator + +@unittest.skipUnless(threading, 'Threading required for this test.') +class BaseServerTestCase(unittest.TestCase): + requestHandler = None + request_count = 1 + threadFunc = staticmethod(http_server) + def setUp(self): # enable traceback reporting xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True self.evt = threading.Event() # start server thread to handle requests - serv_args = (self.evt, 1) - threading.Thread(target=http_server, args=serv_args).start() + serv_args = (self.evt, self.request_count, self.requestHandler) + threading.Thread(target=self.threadFunc, args=serv_args).start() # wait for the server to be ready self.evt.wait() @@ -347,15 +474,12 @@ class SimpleServerTestCase(unittest.TestCase): def tearDown(self): # wait on the server thread to terminate - self.evt.wait(4.0) - if not self.evt.is_set(): - self.evt.set() - stop_serving() - raise RuntimeError("timeout reached, test has failed") + self.evt.wait() # disable traceback reporting xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False +class SimpleServerTestCase(BaseServerTestCase): def test_simple1(self): try: p = xmlrpclib.ServerProxy(URL) @@ -418,6 +542,8 @@ class SimpleServerTestCase(unittest.TestCase): # protocol error; provide additional information in test output self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) + @make_request_and_skipIf(sys.flags.optimize >= 2, + "Docstrings are omitted with -O2 and above") def test_introspection3(self): try: # test native doc @@ -491,6 +617,200 @@ class SimpleServerTestCase(unittest.TestCase): # This avoids waiting for the socket timeout. self.test_simple1() + def test_unicode_host(self): + server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) + self.assertEqual(server.add("a", "\xe9"), "a\xe9") + + def test_partial_post(self): + # Check that a partial POST doesn't make the server loop: issue #14001. + conn = http.client.HTTPConnection(ADDR, PORT) + conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye') + conn.close() + + +class MultiPathServerTestCase(BaseServerTestCase): + threadFunc = staticmethod(http_multi_server) + request_count = 2 + def test_path1(self): + p = xmlrpclib.ServerProxy(URL+"/foo") + self.assertEqual(p.pow(6,8), 6**8) + self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) + + def test_path2(self): + p = xmlrpclib.ServerProxy(URL+"/foo/bar") + self.assertEqual(p.add(6,8), 6+8) + self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) + + def test_path3(self): + p = xmlrpclib.ServerProxy(URL+"/is/broken") + self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) + +#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism +#does indeed serve subsequent requests on the same connection +class BaseKeepaliveServerTestCase(BaseServerTestCase): + #a request handler that supports keep-alive and logs requests into a + #class variable + class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): + parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler + protocol_version = 'HTTP/1.1' + myRequests = [] + def handle(self): + self.myRequests.append([]) + self.reqidx = len(self.myRequests)-1 + return self.parentClass.handle(self) + def handle_one_request(self): + result = self.parentClass.handle_one_request(self) + self.myRequests[self.reqidx].append(self.raw_requestline) + return result + + requestHandler = RequestHandler + def setUp(self): + #clear request log + self.RequestHandler.myRequests = [] + return BaseServerTestCase.setUp(self) + +#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism +#does indeed serve subsequent requests on the same connection +class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase): + def test_two(self): + p = xmlrpclib.ServerProxy(URL) + #do three requests. + self.assertEqual(p.pow(6,8), 6**8) + self.assertEqual(p.pow(6,8), 6**8) + self.assertEqual(p.pow(6,8), 6**8) + p("close")() + + #they should have all been handled by a single request handler + self.assertEqual(len(self.RequestHandler.myRequests), 1) + + #check that we did at least two (the third may be pending append + #due to thread scheduling) + self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) + + +#test special attribute access on the serverproxy, through the __call__ +#function. +class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): + #ask for two keepalive requests to be handled. + request_count=2 + + def test_close(self): + p = xmlrpclib.ServerProxy(URL) + #do some requests with close. + self.assertEqual(p.pow(6,8), 6**8) + self.assertEqual(p.pow(6,8), 6**8) + self.assertEqual(p.pow(6,8), 6**8) + p("close")() #this should trigger a new keep-alive request + self.assertEqual(p.pow(6,8), 6**8) + self.assertEqual(p.pow(6,8), 6**8) + self.assertEqual(p.pow(6,8), 6**8) + p("close")() + + #they should have all been two request handlers, each having logged at least + #two complete requests + self.assertEqual(len(self.RequestHandler.myRequests), 2) + self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) + self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2) + + + def test_transport(self): + p = xmlrpclib.ServerProxy(URL) + #do some requests with close. + self.assertEqual(p.pow(6,8), 6**8) + p("transport").close() #same as above, really. + self.assertEqual(p.pow(6,8), 6**8) + p("close")() + self.assertEqual(len(self.RequestHandler.myRequests), 2) + +#A test case that verifies that gzip encoding works in both directions +#(for a request and the response) +class GzipServerTestCase(BaseServerTestCase): + #a request handler that supports keep-alive and logs requests into a + #class variable + class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): + parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler + protocol_version = 'HTTP/1.1' + + def do_POST(self): + #store content of last request in class + self.__class__.content_length = int(self.headers["content-length"]) + return self.parentClass.do_POST(self) + requestHandler = RequestHandler + + class Transport(xmlrpclib.Transport): + #custom transport, stores the response length for our perusal + fake_gzip = False + def parse_response(self, response): + self.response_length=int(response.getheader("content-length", 0)) + return xmlrpclib.Transport.parse_response(self, response) + + def send_content(self, connection, body): + if self.fake_gzip: + #add a lone gzip header to induce decode error remotely + connection.putheader("Content-Encoding", "gzip") + return xmlrpclib.Transport.send_content(self, connection, body) + + def setUp(self): + BaseServerTestCase.setUp(self) + + def test_gzip_request(self): + t = self.Transport() + t.encode_threshold = None + p = xmlrpclib.ServerProxy(URL, transport=t) + self.assertEqual(p.pow(6,8), 6**8) + a = self.RequestHandler.content_length + t.encode_threshold = 0 #turn on request encoding + self.assertEqual(p.pow(6,8), 6**8) + b = self.RequestHandler.content_length + self.assertTrue(a>b) + p("close")() + + def test_bad_gzip_request(self): + t = self.Transport() + t.encode_threshold = None + t.fake_gzip = True + p = xmlrpclib.ServerProxy(URL, transport=t) + cm = self.assertRaisesRegex(xmlrpclib.ProtocolError, + re.compile(r"\b400\b")) + with cm: + p.pow(6, 8) + p("close")() + + def test_gsip_response(self): + t = self.Transport() + p = xmlrpclib.ServerProxy(URL, transport=t) + old = self.requestHandler.encode_threshold + self.requestHandler.encode_threshold = None #no encoding + self.assertEqual(p.pow(6,8), 6**8) + a = t.response_length + self.requestHandler.encode_threshold = 0 #always encode + self.assertEqual(p.pow(6,8), 6**8) + p("close")() + b = t.response_length + self.requestHandler.encode_threshold = old + self.assertTrue(a>b) + +#Test special attributes of the ServerProxy object +class ServerProxyTestCase(unittest.TestCase): + def setUp(self): + unittest.TestCase.setUp(self) + if threading: + self.url = URL + else: + # Without threading, http_server() and http_multi_server() will not + # be executed and URL is still equal to None. 'http://' is a just + # enough to choose the scheme (HTTP) + self.url = 'http://' + + def test_close(self): + p = xmlrpclib.ServerProxy(self.url) + self.assertEqual(p('close')(), None) + + def test_transport(self): + t = xmlrpclib.Transport() + p = xmlrpclib.ServerProxy(self.url, transport=t) + self.assertEqual(p('transport'), t) + # This is a contrived way to make a failure occur on the server side # in order to test the _send_traceback_header flag on the server class FailingMessageClass(http.client.HTTPMessage): @@ -501,6 +821,7 @@ class FailingMessageClass(http.client.HTTPMessage): return super().get(key, failobj) +@unittest.skipUnless(threading, 'Threading required for this test.') class FailingServerTestCase(unittest.TestCase): def setUp(self): self.evt = threading.Event() @@ -576,6 +897,21 @@ class FailingServerTestCase(unittest.TestCase): else: self.fail('ProtocolError not raised') + +@contextlib.contextmanager +def captured_stdout(encoding='utf-8'): + """A variation on support.captured_stdout() which gives a text stream + having a `buffer` attribute. + """ + import io + orig_stdout = sys.stdout + sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding) + try: + yield sys.stdout + finally: + sys.stdout = orig_stdout + + class CGIHandlerTestCase(unittest.TestCase): def setUp(self): self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler() @@ -588,54 +924,45 @@ class CGIHandlerTestCase(unittest.TestCase): env['REQUEST_METHOD'] = 'GET' # if the method is GET and no request_text is given, it runs handle_get # get sysout output - tmp = sys.stdout - sys.stdout = open(support.TESTFN, "w") - self.cgi.handle_request() - sys.stdout.close() - sys.stdout = tmp + with captured_stdout(encoding=self.cgi.encoding) as data_out: + self.cgi.handle_request() # parse Status header - handle = open(support.TESTFN, "r").read() + data_out.seek(0) + handle = data_out.read() status = handle.split()[1] message = ' '.join(handle.split()[2:4]) self.assertEqual(status, '400') self.assertEqual(message, 'Bad Request') - os.remove(support.TESTFN) def test_cgi_xmlrpc_response(self): data = """<?xml version='1.0'?> -<methodCall> - <methodName>test_method</methodName> - <params> - <param> - <value><string>foo</string></value> - </param> - <param> - <value><string>bar</string></value> - </param> - </params> -</methodCall> -""" - open("xmldata.txt", "w").write(data) - tmp1 = sys.stdin - tmp2 = sys.stdout - - sys.stdin = open("xmldata.txt", "r") - sys.stdout = open(support.TESTFN, "w") - - with support.EnvironmentVarGuard() as env: + <methodCall> + <methodName>test_method</methodName> + <params> + <param> + <value><string>foo</string></value> + </param> + <param> + <value><string>bar</string></value> + </param> + </params> + </methodCall> + """ + + with support.EnvironmentVarGuard() as env, \ + captured_stdout(encoding=self.cgi.encoding) as data_out, \ + support.captured_stdin() as data_in: + data_in.write(data) + data_in.seek(0) env['CONTENT_LENGTH'] = str(len(data)) self.cgi.handle_request() - - sys.stdin.close() - sys.stdout.close() - sys.stdin = tmp1 - sys.stdout = tmp2 + data_out.seek(0) # will respond exception, if so, our goal is achieved ;) - handle = open(support.TESTFN, "r").read() + handle = data_out.read() # start with 44th char so as not to get http header, we just # need only xml @@ -652,14 +979,21 @@ class CGIHandlerTestCase(unittest.TestCase): int(re.search('Content-Length: (\d+)', handle).group(1)), len(content)) - os.remove("xmldata.txt") - os.remove(support.TESTFN) - +@support.reap_threads def test_main(): xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase, BinaryTestCase, FaultTestCase] xmlrpc_tests.append(SimpleServerTestCase) + xmlrpc_tests.append(KeepaliveServerTestCase1) + xmlrpc_tests.append(KeepaliveServerTestCase2) + try: + import gzip + xmlrpc_tests.append(GzipServerTestCase) + except ImportError: + pass #gzip not supported in this build + xmlrpc_tests.append(MultiPathServerTestCase) + xmlrpc_tests.append(ServerProxyTestCase) xmlrpc_tests.append(FailingServerTestCase) xmlrpc_tests.append(CGIHandlerTestCase) |