summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_xmlrpc.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_xmlrpc.py')
-rw-r--r--Lib/test/test_xmlrpc.py454
1 files changed, 394 insertions, 60 deletions
diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py
index 9833f26..3814191 100644
--- a/Lib/test/test_xmlrpc.py
+++ b/Lib/test/test_xmlrpc.py
@@ -5,13 +5,19 @@ import time
import unittest
import xmlrpc.client as xmlrpclib
import xmlrpc.server
-import threading
import http.client
import socket
import os
import re
+import io
+import contextlib
from test import support
+try:
+ import threading
+except ImportError:
+ threading = None
+
alist = [{'astring': 'foo@bar.baz.spam',
'afloat': 7283.43,
'anint': 2**20,
@@ -60,24 +66,15 @@ class XMLRPCTestCase(unittest.TestCase):
(newdt,), m = xmlrpclib.loads(s, use_datetime=0)
self.assertEqual(newdt, xmlrpclib.DateTime('00010210T11:41:23'))
- def test_cmp_datetime_DateTime(self):
- now = datetime.datetime.now()
- dt = xmlrpclib.DateTime(now.timetuple())
- self.assertTrue(dt == now)
- self.assertTrue(now == dt)
- then = now + datetime.timedelta(seconds=4)
- self.assertTrue(then >= dt)
- self.assertTrue(dt < then)
-
def test_bug_1164912 (self):
d = xmlrpclib.DateTime()
((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
methodresponse=True))
- self.assertTrue(isinstance(new_d.value, str))
+ self.assertIsInstance(new_d.value, str)
# Check that the output of dumps() is still an 8-bit string
s = xmlrpclib.dumps((new_d,), methodresponse=True)
- self.assertTrue(isinstance(s, str))
+ self.assertIsInstance(s, str)
def test_newstyle_class(self):
class T(object):
@@ -143,6 +140,9 @@ class XMLRPCTestCase(unittest.TestCase):
('host.tld',
[('Authorization', 'Basic dXNlcg==')], {}))
+ def test_dump_bytes(self):
+ self.assertRaises(TypeError, xmlrpclib.dumps, (b"my dog has fleas",))
+
def test_ssl_presence(self):
try:
import ssl
@@ -180,7 +180,7 @@ class FaultTestCase(unittest.TestCase):
self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
def test_dotted_attribute(self):
- # this will raise AttirebuteError because code don't want us to use
+ # this will raise AttributeError because code don't want us to use
# private methods
self.assertRaises(AttributeError,
xmlrpc.server.resolve_dotted_attribute, str, '__add')
@@ -227,6 +227,45 @@ class DateTimeTestCase(unittest.TestCase):
t2 = xmlrpclib._datetime(d)
self.assertEqual(t1, tref)
+ def test_comparison(self):
+ now = datetime.datetime.now()
+ dtime = xmlrpclib.DateTime(now.timetuple())
+
+ # datetime vs. DateTime
+ self.assertTrue(dtime == now)
+ self.assertTrue(now == dtime)
+ then = now + datetime.timedelta(seconds=4)
+ self.assertTrue(then >= dtime)
+ self.assertTrue(dtime < then)
+
+ # str vs. DateTime
+ dstr = now.strftime("%Y%m%dT%H:%M:%S")
+ self.assertTrue(dtime == dstr)
+ self.assertTrue(dstr == dtime)
+ dtime_then = xmlrpclib.DateTime(then.timetuple())
+ self.assertTrue(dtime_then >= dstr)
+ self.assertTrue(dstr < dtime_then)
+
+ # some other types
+ dbytes = dstr.encode('ascii')
+ dtuple = now.timetuple()
+ with self.assertRaises(TypeError):
+ dtime == 1970
+ with self.assertRaises(TypeError):
+ dtime != dbytes
+ with self.assertRaises(TypeError):
+ dtime == bytearray(dbytes)
+ with self.assertRaises(TypeError):
+ dtime != dtuple
+ with self.assertRaises(TypeError):
+ dtime < float(1970)
+ with self.assertRaises(TypeError):
+ dtime > dbytes
+ with self.assertRaises(TypeError):
+ dtime <= bytearray(dbytes)
+ with self.assertRaises(TypeError):
+ dtime >= dtuple
+
class BinaryTestCase(unittest.TestCase):
# XXX What should str(Binary(b"\xff")) return? I'm chosing "\xff"
@@ -259,7 +298,7 @@ ADDR = PORT = URL = None
# The evt is set twice. First when the server is ready to serve.
# Second when the server has been shutdown. The user must clear
# the event after it has been set the first time to catch the second set.
-def http_server(evt, numrequests):
+def http_server(evt, numrequests, requestHandler=None):
class TestInstanceClass:
def div(self, x, y):
return x // y
@@ -280,14 +319,16 @@ def http_server(evt, numrequests):
s.setblocking(True)
return s, port
- serv = MyXMLRPCServer(("localhost", 0),
+ if not requestHandler:
+ requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
+ serv = MyXMLRPCServer(("localhost", 0), requestHandler,
logRequests=False, bind_and_activate=False)
try:
serv.server_bind()
global ADDR, PORT, URL
ADDR, PORT = serv.socket.getsockname()
#connect to IP address directly. This avoids socket.create_connection()
- #trying to connect to to "localhost" using all address families, which
+ #trying to connect to "localhost" using all address families, which
#causes slowdown e.g. on vista which supports AF_INET6. The server listens
#on AF_INET only.
URL = "http://%s:%d"%(ADDR, PORT)
@@ -312,6 +353,71 @@ def http_server(evt, numrequests):
PORT = None
evt.set()
+def http_multi_server(evt, numrequests, requestHandler=None):
+ class TestInstanceClass:
+ def div(self, x, y):
+ return x // y
+
+ def _methodHelp(self, name):
+ if name == 'div':
+ return 'This is the div function'
+
+ def my_function():
+ '''This is my function'''
+ return True
+
+ class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer):
+ def get_request(self):
+ # Ensure the socket is always non-blocking. On Linux, socket
+ # attributes are not inherited like they are on *BSD and Windows.
+ s, port = self.socket.accept()
+ s.setblocking(True)
+ return s, port
+
+ if not requestHandler:
+ requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
+ class MyRequestHandler(requestHandler):
+ rpc_paths = []
+
+ class BrokenDispatcher:
+ def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
+ raise RuntimeError("broken dispatcher")
+
+ serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
+ logRequests=False, bind_and_activate=False)
+ serv.socket.settimeout(3)
+ serv.server_bind()
+ try:
+ global ADDR, PORT, URL
+ ADDR, PORT = serv.socket.getsockname()
+ #connect to IP address directly. This avoids socket.create_connection()
+ #trying to connect to "localhost" using all address families, which
+ #causes slowdown e.g. on vista which supports AF_INET6. The server listens
+ #on AF_INET only.
+ URL = "http://%s:%d"%(ADDR, PORT)
+ serv.server_activate()
+ paths = ["/foo", "/foo/bar"]
+ for path in paths:
+ d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
+ d.register_introspection_functions()
+ d.register_multicall_functions()
+ serv.get_dispatcher(paths[0]).register_function(pow)
+ serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
+ serv.add_dispatcher("/is/broken", BrokenDispatcher())
+ evt.set()
+
+ # handle up to 'numrequests' requests
+ while numrequests > 0:
+ serv.handle_request()
+ numrequests -= 1
+
+ except socket.timeout:
+ pass
+ finally:
+ serv.socket.close()
+ PORT = None
+ evt.set()
+
# This function prevents errors like:
# <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
def is_unavailable_exception(e):
@@ -331,15 +437,36 @@ def is_unavailable_exception(e):
if exc_mess and 'temporarily unavailable' in exc_mess.lower():
return True
-class SimpleServerTestCase(unittest.TestCase):
+def make_request_and_skipIf(condition, reason):
+ # If we skip the test, we have to make a request because the
+ # the server created in setUp blocks expecting one to come in.
+ if not condition:
+ return lambda func: func
+ def decorator(func):
+ def make_request_and_skip(self):
+ try:
+ xmlrpclib.ServerProxy(URL).my_function()
+ except (xmlrpclib.ProtocolError, socket.error) as e:
+ if not is_unavailable_exception(e):
+ raise
+ raise unittest.SkipTest(reason)
+ return make_request_and_skip
+ return decorator
+
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class BaseServerTestCase(unittest.TestCase):
+ requestHandler = None
+ request_count = 1
+ threadFunc = staticmethod(http_server)
+
def setUp(self):
# enable traceback reporting
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
self.evt = threading.Event()
# start server thread to handle requests
- serv_args = (self.evt, 1)
- threading.Thread(target=http_server, args=serv_args).start()
+ serv_args = (self.evt, self.request_count, self.requestHandler)
+ threading.Thread(target=self.threadFunc, args=serv_args).start()
# wait for the server to be ready
self.evt.wait()
@@ -347,15 +474,12 @@ class SimpleServerTestCase(unittest.TestCase):
def tearDown(self):
# wait on the server thread to terminate
- self.evt.wait(4.0)
- if not self.evt.is_set():
- self.evt.set()
- stop_serving()
- raise RuntimeError("timeout reached, test has failed")
+ self.evt.wait()
# disable traceback reporting
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
+class SimpleServerTestCase(BaseServerTestCase):
def test_simple1(self):
try:
p = xmlrpclib.ServerProxy(URL)
@@ -418,6 +542,8 @@ class SimpleServerTestCase(unittest.TestCase):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
+ @make_request_and_skipIf(sys.flags.optimize >= 2,
+ "Docstrings are omitted with -O2 and above")
def test_introspection3(self):
try:
# test native doc
@@ -491,6 +617,200 @@ class SimpleServerTestCase(unittest.TestCase):
# This avoids waiting for the socket timeout.
self.test_simple1()
+ def test_unicode_host(self):
+ server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
+ self.assertEqual(server.add("a", "\xe9"), "a\xe9")
+
+ def test_partial_post(self):
+ # Check that a partial POST doesn't make the server loop: issue #14001.
+ conn = http.client.HTTPConnection(ADDR, PORT)
+ conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye')
+ conn.close()
+
+
+class MultiPathServerTestCase(BaseServerTestCase):
+ threadFunc = staticmethod(http_multi_server)
+ request_count = 2
+ def test_path1(self):
+ p = xmlrpclib.ServerProxy(URL+"/foo")
+ self.assertEqual(p.pow(6,8), 6**8)
+ self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
+
+ def test_path2(self):
+ p = xmlrpclib.ServerProxy(URL+"/foo/bar")
+ self.assertEqual(p.add(6,8), 6+8)
+ self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
+
+ def test_path3(self):
+ p = xmlrpclib.ServerProxy(URL+"/is/broken")
+ self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
+
+#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
+#does indeed serve subsequent requests on the same connection
+class BaseKeepaliveServerTestCase(BaseServerTestCase):
+ #a request handler that supports keep-alive and logs requests into a
+ #class variable
+ class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
+ parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
+ protocol_version = 'HTTP/1.1'
+ myRequests = []
+ def handle(self):
+ self.myRequests.append([])
+ self.reqidx = len(self.myRequests)-1
+ return self.parentClass.handle(self)
+ def handle_one_request(self):
+ result = self.parentClass.handle_one_request(self)
+ self.myRequests[self.reqidx].append(self.raw_requestline)
+ return result
+
+ requestHandler = RequestHandler
+ def setUp(self):
+ #clear request log
+ self.RequestHandler.myRequests = []
+ return BaseServerTestCase.setUp(self)
+
+#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
+#does indeed serve subsequent requests on the same connection
+class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
+ def test_two(self):
+ p = xmlrpclib.ServerProxy(URL)
+ #do three requests.
+ self.assertEqual(p.pow(6,8), 6**8)
+ self.assertEqual(p.pow(6,8), 6**8)
+ self.assertEqual(p.pow(6,8), 6**8)
+ p("close")()
+
+ #they should have all been handled by a single request handler
+ self.assertEqual(len(self.RequestHandler.myRequests), 1)
+
+ #check that we did at least two (the third may be pending append
+ #due to thread scheduling)
+ self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
+
+
+#test special attribute access on the serverproxy, through the __call__
+#function.
+class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
+ #ask for two keepalive requests to be handled.
+ request_count=2
+
+ def test_close(self):
+ p = xmlrpclib.ServerProxy(URL)
+ #do some requests with close.
+ self.assertEqual(p.pow(6,8), 6**8)
+ self.assertEqual(p.pow(6,8), 6**8)
+ self.assertEqual(p.pow(6,8), 6**8)
+ p("close")() #this should trigger a new keep-alive request
+ self.assertEqual(p.pow(6,8), 6**8)
+ self.assertEqual(p.pow(6,8), 6**8)
+ self.assertEqual(p.pow(6,8), 6**8)
+ p("close")()
+
+ #they should have all been two request handlers, each having logged at least
+ #two complete requests
+ self.assertEqual(len(self.RequestHandler.myRequests), 2)
+ self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
+ self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
+
+
+ def test_transport(self):
+ p = xmlrpclib.ServerProxy(URL)
+ #do some requests with close.
+ self.assertEqual(p.pow(6,8), 6**8)
+ p("transport").close() #same as above, really.
+ self.assertEqual(p.pow(6,8), 6**8)
+ p("close")()
+ self.assertEqual(len(self.RequestHandler.myRequests), 2)
+
+#A test case that verifies that gzip encoding works in both directions
+#(for a request and the response)
+class GzipServerTestCase(BaseServerTestCase):
+ #a request handler that supports keep-alive and logs requests into a
+ #class variable
+ class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
+ parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
+ protocol_version = 'HTTP/1.1'
+
+ def do_POST(self):
+ #store content of last request in class
+ self.__class__.content_length = int(self.headers["content-length"])
+ return self.parentClass.do_POST(self)
+ requestHandler = RequestHandler
+
+ class Transport(xmlrpclib.Transport):
+ #custom transport, stores the response length for our perusal
+ fake_gzip = False
+ def parse_response(self, response):
+ self.response_length=int(response.getheader("content-length", 0))
+ return xmlrpclib.Transport.parse_response(self, response)
+
+ def send_content(self, connection, body):
+ if self.fake_gzip:
+ #add a lone gzip header to induce decode error remotely
+ connection.putheader("Content-Encoding", "gzip")
+ return xmlrpclib.Transport.send_content(self, connection, body)
+
+ def setUp(self):
+ BaseServerTestCase.setUp(self)
+
+ def test_gzip_request(self):
+ t = self.Transport()
+ t.encode_threshold = None
+ p = xmlrpclib.ServerProxy(URL, transport=t)
+ self.assertEqual(p.pow(6,8), 6**8)
+ a = self.RequestHandler.content_length
+ t.encode_threshold = 0 #turn on request encoding
+ self.assertEqual(p.pow(6,8), 6**8)
+ b = self.RequestHandler.content_length
+ self.assertTrue(a>b)
+ p("close")()
+
+ def test_bad_gzip_request(self):
+ t = self.Transport()
+ t.encode_threshold = None
+ t.fake_gzip = True
+ p = xmlrpclib.ServerProxy(URL, transport=t)
+ cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
+ re.compile(r"\b400\b"))
+ with cm:
+ p.pow(6, 8)
+ p("close")()
+
+ def test_gsip_response(self):
+ t = self.Transport()
+ p = xmlrpclib.ServerProxy(URL, transport=t)
+ old = self.requestHandler.encode_threshold
+ self.requestHandler.encode_threshold = None #no encoding
+ self.assertEqual(p.pow(6,8), 6**8)
+ a = t.response_length
+ self.requestHandler.encode_threshold = 0 #always encode
+ self.assertEqual(p.pow(6,8), 6**8)
+ p("close")()
+ b = t.response_length
+ self.requestHandler.encode_threshold = old
+ self.assertTrue(a>b)
+
+#Test special attributes of the ServerProxy object
+class ServerProxyTestCase(unittest.TestCase):
+ def setUp(self):
+ unittest.TestCase.setUp(self)
+ if threading:
+ self.url = URL
+ else:
+ # Without threading, http_server() and http_multi_server() will not
+ # be executed and URL is still equal to None. 'http://' is a just
+ # enough to choose the scheme (HTTP)
+ self.url = 'http://'
+
+ def test_close(self):
+ p = xmlrpclib.ServerProxy(self.url)
+ self.assertEqual(p('close')(), None)
+
+ def test_transport(self):
+ t = xmlrpclib.Transport()
+ p = xmlrpclib.ServerProxy(self.url, transport=t)
+ self.assertEqual(p('transport'), t)
+
# This is a contrived way to make a failure occur on the server side
# in order to test the _send_traceback_header flag on the server
class FailingMessageClass(http.client.HTTPMessage):
@@ -501,6 +821,7 @@ class FailingMessageClass(http.client.HTTPMessage):
return super().get(key, failobj)
+@unittest.skipUnless(threading, 'Threading required for this test.')
class FailingServerTestCase(unittest.TestCase):
def setUp(self):
self.evt = threading.Event()
@@ -576,6 +897,21 @@ class FailingServerTestCase(unittest.TestCase):
else:
self.fail('ProtocolError not raised')
+
+@contextlib.contextmanager
+def captured_stdout(encoding='utf-8'):
+ """A variation on support.captured_stdout() which gives a text stream
+ having a `buffer` attribute.
+ """
+ import io
+ orig_stdout = sys.stdout
+ sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
+ try:
+ yield sys.stdout
+ finally:
+ sys.stdout = orig_stdout
+
+
class CGIHandlerTestCase(unittest.TestCase):
def setUp(self):
self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
@@ -588,54 +924,45 @@ class CGIHandlerTestCase(unittest.TestCase):
env['REQUEST_METHOD'] = 'GET'
# if the method is GET and no request_text is given, it runs handle_get
# get sysout output
- tmp = sys.stdout
- sys.stdout = open(support.TESTFN, "w")
- self.cgi.handle_request()
- sys.stdout.close()
- sys.stdout = tmp
+ with captured_stdout(encoding=self.cgi.encoding) as data_out:
+ self.cgi.handle_request()
# parse Status header
- handle = open(support.TESTFN, "r").read()
+ data_out.seek(0)
+ handle = data_out.read()
status = handle.split()[1]
message = ' '.join(handle.split()[2:4])
self.assertEqual(status, '400')
self.assertEqual(message, 'Bad Request')
- os.remove(support.TESTFN)
def test_cgi_xmlrpc_response(self):
data = """<?xml version='1.0'?>
-<methodCall>
- <methodName>test_method</methodName>
- <params>
- <param>
- <value><string>foo</string></value>
- </param>
- <param>
- <value><string>bar</string></value>
- </param>
- </params>
-</methodCall>
-"""
- open("xmldata.txt", "w").write(data)
- tmp1 = sys.stdin
- tmp2 = sys.stdout
-
- sys.stdin = open("xmldata.txt", "r")
- sys.stdout = open(support.TESTFN, "w")
-
- with support.EnvironmentVarGuard() as env:
+ <methodCall>
+ <methodName>test_method</methodName>
+ <params>
+ <param>
+ <value><string>foo</string></value>
+ </param>
+ <param>
+ <value><string>bar</string></value>
+ </param>
+ </params>
+ </methodCall>
+ """
+
+ with support.EnvironmentVarGuard() as env, \
+ captured_stdout(encoding=self.cgi.encoding) as data_out, \
+ support.captured_stdin() as data_in:
+ data_in.write(data)
+ data_in.seek(0)
env['CONTENT_LENGTH'] = str(len(data))
self.cgi.handle_request()
-
- sys.stdin.close()
- sys.stdout.close()
- sys.stdin = tmp1
- sys.stdout = tmp2
+ data_out.seek(0)
# will respond exception, if so, our goal is achieved ;)
- handle = open(support.TESTFN, "r").read()
+ handle = data_out.read()
# start with 44th char so as not to get http header, we just
# need only xml
@@ -652,14 +979,21 @@ class CGIHandlerTestCase(unittest.TestCase):
int(re.search('Content-Length: (\d+)', handle).group(1)),
len(content))
- os.remove("xmldata.txt")
- os.remove(support.TESTFN)
-
+@support.reap_threads
def test_main():
xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
BinaryTestCase, FaultTestCase]
xmlrpc_tests.append(SimpleServerTestCase)
+ xmlrpc_tests.append(KeepaliveServerTestCase1)
+ xmlrpc_tests.append(KeepaliveServerTestCase2)
+ try:
+ import gzip
+ xmlrpc_tests.append(GzipServerTestCase)
+ except ImportError:
+ pass #gzip not supported in this build
+ xmlrpc_tests.append(MultiPathServerTestCase)
+ xmlrpc_tests.append(ServerProxyTestCase)
xmlrpc_tests.append(FailingServerTestCase)
xmlrpc_tests.append(CGIHandlerTestCase)