From 985fc6a3041e992448827fd2b496cc5b3f344b28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 1 Jul 2009 10:01:31 +0000 Subject: http://bugs.python.org/issue6267 porting revision 73638 to py3k --- Lib/http/server.py | 28 ++++-- Lib/socketserver.py | 12 ++- Lib/test/test_xmlrpc.py | 119 +++++++++++++++++++++- Lib/xmlrpc/client.py | 255 +++++++++++++++++++++++++++++++++++++++--------- Lib/xmlrpc/server.py | 62 +++++++++--- 5 files changed, 402 insertions(+), 74 deletions(-) diff --git a/Lib/http/server.py b/Lib/http/server.py index 2163e74..c9cfe9b 100644 --- a/Lib/http/server.py +++ b/Lib/http/server.py @@ -332,18 +332,26 @@ class BaseHTTPRequestHandler(socketserver.StreamRequestHandler): commands such as GET and POST. """ - self.raw_requestline = self.rfile.readline() - if not self.raw_requestline: + try: + self.raw_requestline = self.rfile.readline() + if not self.raw_requestline: + self.close_connection = 1 + return + if not self.parse_request(): + # An error code has been sent, just exit + return + mname = 'do_' + self.command + if not hasattr(self, mname): + self.send_error(501, "Unsupported method (%r)" % self.command) + return + method = getattr(self, mname) + method() + self.wfile.flush() #actually send the response if not already done. + except socket.timeout as e: + #a read or a write timed out. Discard this connection + self.log_error("Request timed out: %r", e) self.close_connection = 1 return - if not self.parse_request(): # An error code has been sent, just exit - return - mname = 'do_' + self.command - if not hasattr(self, mname): - self.send_error(501, "Unsupported method (%r)" % self.command) - return - method = getattr(self, mname) - method() def handle(self): """Handle multiple requests if necessary.""" diff --git a/Lib/socketserver.py b/Lib/socketserver.py index 2ed50b9..e5f5778 100644 --- a/Lib/socketserver.py +++ b/Lib/socketserver.py @@ -445,6 +445,7 @@ class TCPServer(BaseServer): def close_request(self, request): """Called to clean up an individual request.""" + request.shutdown(socket.SHUT_WR) request.close() @@ -611,8 +612,10 @@ class BaseRequestHandler: self.client_address = client_address self.server = server self.setup() - self.handle() - self.finish() + try: + self.handle() + finally: + self.finish() def setup(self): pass @@ -646,12 +649,17 @@ class StreamRequestHandler(BaseRequestHandler): rbufsize = -1 wbufsize = 0 + # A timeout to apply to the request socket, if not None. + timeout = None + # Disable nagle algoritm for this socket, if True. # Use only when wbufsize != 0, to avoid small packets. disable_nagle_algorithm = False def setup(self): self.connection = self.request + if self.timeout is not None: + self.connection.settimeout(self.timeout) if self.disable_nagle_algorithm: self.connection.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index 7bcb6ba..100a93c 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -246,7 +246,7 @@ ADDR = PORT = URL = None # The evt is set twice. First when the server is ready to serve. # Second when the server has been shutdown. The user must clear # the event after it has been set the first time to catch the second set. -def http_server(evt, numrequests): +def http_server(evt, numrequests, requestHandler=None): class TestInstanceClass: def div(self, x, y): return x // y @@ -267,7 +267,9 @@ def http_server(evt, numrequests): s.setblocking(True) return s, port - serv = MyXMLRPCServer(("localhost", 0), + if not requestHandler: + requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler + serv = MyXMLRPCServer(("localhost", 0), requestHandler, logRequests=False, bind_and_activate=False) try: serv.server_bind() @@ -318,14 +320,15 @@ def is_unavailable_exception(e): if exc_mess and 'temporarily unavailable' in exc_mess.lower(): return True -class SimpleServerTestCase(unittest.TestCase): +class BaseServerTestCase(unittest.TestCase): + requestHandler = None def setUp(self): # enable traceback reporting xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True self.evt = threading.Event() # start server thread to handle requests - serv_args = (self.evt, 1) + serv_args = (self.evt, 1, self.requestHandler) threading.Thread(target=http_server, args=serv_args).start() # wait for the server to be ready @@ -343,6 +346,7 @@ class SimpleServerTestCase(unittest.TestCase): # disable traceback reporting xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False +class SimpleServerTestCase(BaseServerTestCase): def test_simple1(self): try: p = xmlrpclib.ServerProxy(URL) @@ -478,6 +482,110 @@ class SimpleServerTestCase(unittest.TestCase): # This avoids waiting for the socket timeout. self.test_simple1() +#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism +#does indeed serve subsequent requests on the same connection +class KeepaliveServerTestCase(BaseServerTestCase): + #a request handler that supports keep-alive and logs requests into a + #class variable + class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): + parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler + protocol_version = 'HTTP/1.1' + myRequests = [] + def handle(self): + self.myRequests.append([]) + return self.parentClass.handle(self) + def handle_one_request(self): + result = self.parentClass.handle_one_request(self) + self.myRequests[-1].append(self.raw_requestline) + return result + + requestHandler = RequestHandler + def setUp(self): + #clear request log + self.RequestHandler.myRequests = [] + return BaseServerTestCase.setUp(self) + + def test_two(self): + p = xmlrpclib.ServerProxy(URL) + self.assertEqual(p.pow(6,8), 6**8) + self.assertEqual(p.pow(6,8), 6**8) + self.assertEqual(len(self.RequestHandler.myRequests), 1) + #we may or may not catch the final "append" with the empty line + self.assertTrue(len(self.RequestHandler.myRequests[-1]) >= 2) + +#A test case that verifies that gzip encoding works in both directions +#(for a request and the response) +class GzipServerTestCase(BaseServerTestCase): + #a request handler that supports keep-alive and logs requests into a + #class variable + class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): + parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler + protocol_version = 'HTTP/1.1' + + def do_POST(self): + #store content of last request in class + self.__class__.content_length = int(self.headers["content-length"]) + return self.parentClass.do_POST(self) + requestHandler = RequestHandler + + class Transport(xmlrpclib.Transport): + #custom transport, stores the response length for our perusal + fake_gzip = False + def parse_response(self, response): + self.response_length=int(response.getheader("content-length", 0)) + return xmlrpclib.Transport.parse_response(self, response) + + def send_content(self, connection, body): + if self.fake_gzip: + #add a lone gzip header to induce decode error remotely + connection.putheader("Content-Encoding", "gzip") + return xmlrpclib.Transport.send_content(self, connection, body) + + def test_gzip_request(self): + t = self.Transport() + t.encode_threshold = None + p = xmlrpclib.ServerProxy(URL, transport=t) + self.assertEqual(p.pow(6,8), 6**8) + a = self.RequestHandler.content_length + t.encode_threshold = 0 #turn on request encoding + self.assertEqual(p.pow(6,8), 6**8) + b = self.RequestHandler.content_length + self.assertTrue(a>b) + + def test_bad_gzip_request(self): + t = self.Transport() + t.encode_threshold = None + t.fake_gzip = True + p = xmlrpclib.ServerProxy(URL, transport=t) + cm = self.assertRaisesRegexp(xmlrpclib.ProtocolError, + re.compile(r"\b400\b")) + with cm: + p.pow(6, 8) + + def test_gsip_response(self): + t = self.Transport() + p = xmlrpclib.ServerProxy(URL, transport=t) + old = self.requestHandler.encode_threshold + self.requestHandler.encode_threshold = None #no encoding + self.assertEqual(p.pow(6,8), 6**8) + a = t.response_length + self.requestHandler.encode_threshold = 0 #always encode + self.assertEqual(p.pow(6,8), 6**8) + b = t.response_length + self.requestHandler.encode_threshold = old + self.assertTrue(a>b) + +#Test special attributes of the ServerProxy object +class ServerProxyTestCase(unittest.TestCase): + def test_close(self): + p = xmlrpclib.ServerProxy(URL) + self.assertEqual(p('close')(), None) + + def test_transport(self): + t = xmlrpclib.Transport() + p = xmlrpclib.ServerProxy(URL, transport=t) + self.assertEqual(p('transport'), t) + # This is a contrived way to make a failure occur on the server side # in order to test the _send_traceback_header flag on the server class FailingMessageClass(http.client.HTTPMessage): @@ -647,6 +755,9 @@ def test_main(): xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase, BinaryTestCase, FaultTestCase] xmlrpc_tests.append(SimpleServerTestCase) + xmlrpc_tests.append(KeepaliveServerTestCase) + xmlrpc_tests.append(GzipServerTestCase) + xmlrpc_tests.append(ServerProxyTestCase) xmlrpc_tests.append(FailingServerTestCase) xmlrpc_tests.append(CGIHandlerTestCase) diff --git a/Lib/xmlrpc/client.py b/Lib/xmlrpc/client.py index c62dbc4..4b532ef 100644 --- a/Lib/xmlrpc/client.py +++ b/Lib/xmlrpc/client.py @@ -136,6 +136,10 @@ Exported functions: import re, time, operator import http.client from xml.parsers import expat +import gzip +import socket +import errno +from io import BytesIO # -------------------------------------------------------------------- # Internal stuff @@ -1013,6 +1017,72 @@ def loads(data, use_datetime=0): p.close() return u.close(), u.getmethodname() +## +# Encode a string using the gzip content encoding such as specified by the +# Content-Encoding: gzip +# in the HTTP header, as described in RFC 1952 +# +# @param data the unencoded data +# @return the encoded data + +def gzip_encode(data): + """data -> gzip encoded data + + Encode data using the gzip content encoding as described in RFC 1952 + """ + f = BytesIO() + gzf = gzip.GzipFile(mode="wb", fileobj=f, compresslevel=1) + gzf.write(data) + gzf.close() + encoded = f.getvalue() + f.close() + return encoded + +## +# Decode a string using the gzip content encoding such as specified by the +# Content-Encoding: gzip +# in the HTTP header, as described in RFC 1952 +# +# @param data The encoded data +# @return the unencoded data +# @raises ValueError if data is not correctly coded. + +def gzip_decode(data): + """gzip encoded data -> unencoded data + + Decode data using the gzip content encoding as described in RFC 1952 + """ + f = BytesIO(data) + gzf = gzip.GzipFile(mode="rb", fileobj=f) + try: + decoded = gzf.read() + except IOError: + raise ValueError("invalid data") + f.close() + gzf.close() + return decoded + +## +# Return a decoded file-like object for the gzip encoding +# as described in RFC 1952. +# +# @param response A stream supporting a read() method +# @return a file-like object that the decoded data can be read() from + +class GzipDecodedResponse(gzip.GzipFile): + """a file-like object to decode a response encoded with the gzip + method, as described in RFC 1952. + """ + def __init__(self, response): + #response doesn't support tell() and read(), required by + #GzipFile + self.io = BytesIO(response.read()) + gzip.GzipFile.__init__(self, mode="rb", fileobj=self.io) + + def close(self): + gzip.GzipFile.close(self) + self.io.close() + # -------------------------------------------------------------------- # request dispatcher @@ -1040,11 +1110,22 @@ class Transport: # client identifier (may be overridden) user_agent = "xmlrpclib.py/%s (by www.pythonware.com)" % __version__ + #if true, we'll request gzip encoding + accept_gzip_encoding = True + + # if positive, encode request using gzip if it exceeds this threshold + # note that many server will get confused, so only use it if you know + # that they can decode such a request + encode_threshold = None #None = don't encode + def __init__(self, use_datetime=0): self._use_datetime = use_datetime + self._connection = (None, None) + self._extra_headers = [] ## # Send a complete request, and parse the response. + # Retry request if a cached connection has disconnected. # # @param host Target host. # @param handler Target PRC handler. @@ -1053,21 +1134,44 @@ class Transport: # @return Parsed response. def request(self, host, handler, request_body, verbose=0): + #retry request once if cached connection has gone cold + for i in (0, 1): + try: + return self.single_request(host, handler, request_body, verbose) + except (socket.error, http.client.HTTPException) as e: + retry = (errno.ECONNRESET, + errno.ECONNABORTED, + http.client.BadStatusLine) #close after we sent request + if i or e[0] not in retry: + raise + + def single_request(self, host, handler, request_body, verbose=0): # issue XML-RPC request + try: + http_conn = self.send_request(host, handler, request_body, verbose) + resp = http_conn.getresponse() + if resp.status == 200: + self.verbose = verbose + return self.parse_response(resp) + + except Fault: + raise + except Exception: + #All unexpected errors leave connection in + # a strange state, so we clear it. + self.close() + raise + + #We got an error response. + #Discard any response data and raise exception + if resp.getheader("content-length", ""): + resp.read() + raise ProtocolError( + host + handler, + resp.status, resp.reason, + dict(resp.getheaders()) + ) - http_conn = self.send_request(host, handler, request_body, verbose) - resp = http_conn.getresponse() - - if resp.status != 200: - raise ProtocolError( - host + handler, - resp.status, resp.reason, - dict(resp.getheaders()) - ) - - self.verbose = verbose - - return self.parse_response(resp) ## # Create parser. @@ -1106,7 +1210,7 @@ class Transport: ("Authorization", "Basic " + auth) ] else: - extra_headers = None + extra_headers = [] return host, extra_headers, x509 @@ -1117,9 +1221,23 @@ class Transport: # @return An HTTPConnection object def make_connection(self, host): + #return an existing connection if possible. This allows + #HTTP/1.1 keep-alive. + if self._connection and host == self._connection[0]: + return self._connection[1] # create a HTTP connection object from a host descriptor - host, extra_headers, x509 = self.get_host_info(host) + chost, self._extra_headers, x509 = self.get_host_info(host) + self._connection = host, http.client.HTTPConnection(chost) + return self._connection[1] + ## + # Clear any cached connection object. + # Used in the event of socket errors. + # + def close(self): + if self._connection[1]: + self._connection[1].close() + self._connection = (None, None) ## # Send HTTP request. @@ -1131,39 +1249,74 @@ class Transport: # @return An HTTPConnection. def send_request(self, host, handler, request_body, debug): - host, extra_headers, x509 = self.get_host_info(host) - connection = http.client.HTTPConnection(host) + connection = self.make_connection(host) + headers = self._extra_headers[:] if debug: connection.set_debuglevel(1) - headers = {} - if extra_headers: - for key, val in extra_headers: - headers[key] = val - headers["Content-Type"] = "text/xml" - headers["User-Agent"] = self.user_agent - connection.request("POST", handler, request_body, headers) + if self.accept_gzip_encoding: + connection.putrequest("POST", handler, skip_accept_encoding=True) + headers.append(("Accept-Encoding", "gzip")) + else: + connection.putrequest("POST", handler) + headers.append(("Content-Type", "text/xml")) + headers.append(("User-Agent", self.user_agent)) + self.send_headers(connection, headers) + self.send_content(connection, request_body) return connection ## + # Send request headers. + # This function provides a useful hook for subclassing + # + # @param connection httpConnection. + # @param headers list of key,value pairs for HTTP headers + + def send_headers(self, connection, headers): + for key, val in headers: + connection.putheader(key, val) + + ## + # Send request body. + # This function provides a useful hook for subclassing + # + # @param connection httpConnection. + # @param request_body XML-RPC request body. + + def send_content(self, connection, request_body): + #optionally encode the request + if (self.encode_threshold is not None and + self.encode_threshold < len(request_body)): + connection.putheader("Content-Encoding", "gzip") + request_body = gzip_encode(request_body) + + connection.putheader("Content-Length", str(len(request_body))) + connection.endheaders(request_body) + + ## # Parse response. # # @param file Stream. # @return Response tuple and target method. - def parse_response(self, file): - # read response from input file/socket, and parse it + def parse_response(self, response): + # read response data from httpresponse, and parse it + if response.getheader("Content-Encoding", "") == "gzip": + stream = GzipDecodedResponse(response) + else: + stream = response p, u = self.getparser() while 1: - response = file.read(1024) - if not response: + data = stream.read(1024) + if not data: break if self.verbose: - print("body:", repr(response)) - p.feed(response) + print("body:", repr(data)) + p.feed(data) - file.close() + if stream is not response: + stream.close() p.close() return u.close() @@ -1176,24 +1329,19 @@ class SafeTransport(Transport): # FIXME: mostly untested - def send_request(self, host, handler, request_body, debug): - import socket + def make_connection(self, host): + if self._connection and host == self._connection[0]: + return self._connection[1] + if not hasattr(socket, "ssl"): raise NotImplementedError( - "your version of http.client doesn't support HTTPS") - - host, extra_headers, x509 = self.get_host_info(host) - connection = http.client.HTTPSConnection(host, None, **(x509 or {})) - if debug: - connection.set_debuglevel(1) - headers = {} - if extra_headers: - for key, val in extra_headers: - headers[key] = val - headers["Content-Type"] = "text/xml" - headers["User-Agent"] = self.user_agent - connection.request("POST", handler, request_body, headers) - return connection + "your version of http.client doesn't support HTTPS") + # create a HTTPS connection object from a host descriptor + # host may be a string, or a (host, x509-dict) tuple + chost, self._extra_headers, x509 = self.get_host_info(host) + self._connection = host, http.client.HTTPSConnection(chost, + None, **(x509 or {})) + return self._connection[1] ## # Standard server proxy. This class establishes a virtual connection @@ -1258,6 +1406,9 @@ class ServerProxy: self.__verbose = verbose self.__allow_none = allow_none + def __close(self): + self.__transport.close() + def __request(self, methodname, params): # call a method on the remote server @@ -1291,6 +1442,16 @@ class ServerProxy: # note: to call a remote object with an non-standard name, use # result getattr(server, "strange-python-name")(args) + def __call__(self, attr): + """A workaround to get special attributes on the ServerProxy + without interfering with the magic __getattr__ + """ + if attr == "close": + return self.__close + elif attr == "transport": + return self.__transport + raise AttributeError("Attribute %r not found" % (attr,)) + # compatibility Server = ServerProxy diff --git a/Lib/xmlrpc/server.py b/Lib/xmlrpc/server.py index 60f992b..273202f 100644 --- a/Lib/xmlrpc/server.py +++ b/Lib/xmlrpc/server.py @@ -104,7 +104,7 @@ server.handle_request() # Written by Brian Quinlan (brian@sweetapp.com). # Based on code written by Fredrik Lundh. -from xmlrpc.client import Fault, dumps, loads +from xmlrpc.client import Fault, dumps, loads, gzip_encode, gzip_decode from http.server import BaseHTTPRequestHandler import http.server import socketserver @@ -420,6 +420,31 @@ class SimpleXMLRPCRequestHandler(BaseHTTPRequestHandler): # paths not on this list will result in a 404 error. rpc_paths = ('/', '/RPC2') + #if not None, encode responses larger than this, if possible + encode_threshold = 1400 #a common MTU + + #Override form StreamRequestHandler: full buffering of output + #and no Nagle. + wbufsize = -1 + disable_nagle_algorithm = True + + # a re to match a gzip Accept-Encoding + aepattern = re.compile(r""" + \s* ([^\s;]+) \s* #content-coding + (;\s* q \s*=\s* ([0-9\.]+))? #q + """, re.VERBOSE | re.IGNORECASE) + + def accept_encodings(self): + r = {} + ae = self.headers.get("Accept-Encoding", "") + for e in ae.split(","): + match = self.aepattern.match(e) + if match: + v = match.group(3) + v = float(v) if v else 1.0 + r[match.group(1)] = v + return r + def is_rpc_path_valid(self): if self.rpc_paths: return self.path in self.rpc_paths @@ -453,6 +478,10 @@ class SimpleXMLRPCRequestHandler(BaseHTTPRequestHandler): size_remaining -= len(L[-1]) data = b''.join(L) + data = self.decode_request_content(data) + if data is None: + return #response has been sent + # In previous versions of SimpleXMLRPCServer, _dispatch # could be overridden in this class, instead of in # SimpleXMLRPCDispatcher. To maintain backwards compatibility, @@ -471,17 +500,35 @@ class SimpleXMLRPCRequestHandler(BaseHTTPRequestHandler): self.send_header("X-exception", str(e)) self.send_header("X-traceback", traceback.format_exc()) + self.send_header("Content-length", "0") self.end_headers() else: self.send_response(200) self.send_header("Content-type", "text/xml") + if self.encode_threshold is not None: + if len(response) > self.encode_threshold: + q = self.accept_encodings().get("gzip", 0) + if q: + response = gzip_encode(response) + self.send_header("Content-Encoding", "gzip") self.send_header("Content-length", str(len(response))) self.end_headers() self.wfile.write(response) - # shut down the connection - self.wfile.flush() - self.connection.shutdown(1) + def decode_request_content(self, data): + #support gzip encoding of request + encoding = self.headers.get("content-encoding", "identity").lower() + if encoding == "identity": + return data + if encoding == "gzip": + try: + return gzip_decode(data) + except ValueError: + self.send_response(400, "error decoding gzip content") + else: + self.send_response(501, "encoding %r not supported" % encoding) + self.send_header("Content-length", "0") + self.end_headers() def report_404 (self): # Report a 404 error @@ -491,9 +538,6 @@ class SimpleXMLRPCRequestHandler(BaseHTTPRequestHandler): self.send_header("Content-length", str(len(response))) self.end_headers() self.wfile.write(response) - # shut down the connection - self.wfile.flush() - self.connection.shutdown(1) def log_request(self, code='-', size='-'): """Selectively log an accepted request.""" @@ -824,10 +868,6 @@ class DocXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): self.end_headers() self.wfile.write(response) - # shut down the connection - self.wfile.flush() - self.connection.shutdown(1) - class DocXMLRPCServer( SimpleXMLRPCServer, XMLRPCDocGenerator): """XML-RPC and HTML documentation server. -- cgit v0.12