diff options
Diffstat (limited to 'Lib/test/test_urllib2_localnet.py')
-rw-r--r-- | Lib/test/test_urllib2_localnet.py | 310 |
1 files changed, 310 insertions, 0 deletions
diff --git a/Lib/test/test_urllib2_localnet.py b/Lib/test/test_urllib2_localnet.py new file mode 100644 index 0000000..737ecbd --- /dev/null +++ b/Lib/test/test_urllib2_localnet.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python + +import sys +import threading +import urlparse +import urllib2 +import BaseHTTPServer +import unittest +import hashlib +from test import test_support + +# Loopback http server infrastructure + +class LoopbackHttpServer(BaseHTTPServer.HTTPServer): + """HTTP server w/ a few modifications that make it useful for + loopback testing purposes. + """ + + def __init__(self, server_address, RequestHandlerClass): + BaseHTTPServer.HTTPServer.__init__(self, + server_address, + RequestHandlerClass) + + # Set the timeout of our listening socket really low so + # that we can stop the server easily. + self.socket.settimeout(1.0) + + def get_request(self): + """BaseHTTPServer method, overridden.""" + + request, client_address = self.socket.accept() + + # It's a loopback connection, so setting the timeout + # really low shouldn't affect anything, but should make + # deadlocks less likely to occur. + request.settimeout(10.0) + + return (request, client_address) + +class LoopbackHttpServerThread(threading.Thread): + """Stoppable thread that runs a loopback http server.""" + + def __init__(self, port, RequestHandlerClass): + threading.Thread.__init__(self) + self._RequestHandlerClass = RequestHandlerClass + self._stop = False + self._port = port + self._server_address = ('127.0.0.1', self._port) + self.ready = threading.Event() + self.error = None + + def stop(self): + """Stops the webserver if it's currently running.""" + + # Set the stop flag. + self._stop = True + + self.join() + + def run(self): + protocol = "HTTP/1.0" + + try: + self._RequestHandlerClass.protocol_version = protocol + httpd = LoopbackHttpServer(self._server_address, + self._RequestHandlerClass) + + sa = httpd.socket.getsockname() + #print "Serving HTTP on", sa[0], "port", sa[1], "..." + except: + # Fail "gracefully" if we are unable to start. + self.ready.set() + self.error = sys.exc_info()[1] + raise + + self.ready.set() + while not self._stop: + httpd.handle_request() + +# Authentication infrastructure + +class DigestAuthHandler: + """Handler for performing digest authentication.""" + + def __init__(self): + self._request_num = 0 + self._nonces = [] + self._users = {} + self._realm_name = "Test Realm" + self._qop = "auth" + + def set_qop(self, qop): + self._qop = qop + + def set_users(self, users): + assert isinstance(users, dict) + self._users = users + + def set_realm(self, realm): + self._realm_name = realm + + def _generate_nonce(self): + self._request_num += 1 + nonce = hashlib.md5(str(self._request_num)).hexdigest() + self._nonces.append(nonce) + return nonce + + def _create_auth_dict(self, auth_str): + first_space_index = auth_str.find(" ") + auth_str = auth_str[first_space_index+1:] + + parts = auth_str.split(",") + + auth_dict = {} + for part in parts: + name, value = part.split("=") + name = name.strip() + if value[0] == '"' and value[-1] == '"': + value = value[1:-1] + else: + value = value.strip() + auth_dict[name] = value + return auth_dict + + def _validate_auth(self, auth_dict, password, method, uri): + final_dict = {} + final_dict.update(auth_dict) + final_dict["password"] = password + final_dict["method"] = method + final_dict["uri"] = uri + HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict + HA1 = hashlib.md5(HA1_str).hexdigest() + HA2_str = "%(method)s:%(uri)s" % final_dict + HA2 = hashlib.md5(HA2_str).hexdigest() + final_dict["HA1"] = HA1 + final_dict["HA2"] = HA2 + response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ + "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict + response = hashlib.md5(response_str).hexdigest() + + return response == auth_dict["response"] + + def _return_auth_challenge(self, request_handler): + request_handler.send_response(407, "Proxy Authentication Required") + request_handler.send_header("Content-Type", "text/html") + request_handler.send_header( + 'Proxy-Authenticate', 'Digest realm="%s", ' + 'qop="%s",' + 'nonce="%s", ' % \ + (self._realm_name, self._qop, self._generate_nonce())) + # XXX: Not sure if we're supposed to add this next header or + # not. + #request_handler.send_header('Connection', 'close') + request_handler.end_headers() + request_handler.wfile.write("Proxy Authentication Required.") + return False + + def handle_request(self, request_handler): + """Performs digest authentication on the given HTTP request + handler. Returns True if authentication was successful, False + otherwise. + + If no users have been set, then digest auth is effectively + disabled and this method will always return True. + """ + + if len(self._users) == 0: + return True + + if 'Proxy-Authorization' not in request_handler.headers: + return self._return_auth_challenge(request_handler) + else: + auth_dict = self._create_auth_dict( + request_handler.headers['Proxy-Authorization'] + ) + if auth_dict["username"] in self._users: + password = self._users[ auth_dict["username"] ] + else: + return self._return_auth_challenge(request_handler) + if not auth_dict.get("nonce") in self._nonces: + return self._return_auth_challenge(request_handler) + else: + self._nonces.remove(auth_dict["nonce"]) + + auth_validated = False + + # MSIE uses short_path in its validation, but Python's + # urllib2 uses the full path, so we're going to see if + # either of them works here. + + for path in [request_handler.path, request_handler.short_path]: + if self._validate_auth(auth_dict, + password, + request_handler.command, + path): + auth_validated = True + + if not auth_validated: + return self._return_auth_challenge(request_handler) + return True + +# Proxy test infrastructure + +class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): + """This is a 'fake proxy' that makes it look like the entire + internet has gone down due to a sudden zombie invasion. It main + utility is in providing us with authentication support for + testing. + """ + + digest_auth_handler = DigestAuthHandler() + + def log_message(self, format, *args): + # Uncomment the next line for debugging. + #sys.stderr.write(format % args) + pass + + def do_GET(self): + (scm, netloc, path, params, query, fragment) = urlparse.urlparse( + self.path, 'http') + self.short_path = path + if self.digest_auth_handler.handle_request(self): + self.send_response(200, "OK") + self.send_header("Content-Type", "text/html") + self.end_headers() + self.wfile.write("You've reached %s!<BR>" % self.path) + self.wfile.write("Our apologies, but our server is down due to " + "a sudden zombie invasion.") + +# Test cases + +class ProxyAuthTests(unittest.TestCase): + URL = "http://www.foo.com" + + PORT = 8080 + USER = "tester" + PASSWD = "test123" + REALM = "TestRealm" + + PROXY_URL = "http://127.0.0.1:%d" % PORT + + def setUp(self): + FakeProxyHandler.digest_auth_handler.set_users({ + self.USER : self.PASSWD + }) + FakeProxyHandler.digest_auth_handler.set_realm(self.REALM) + + self.server = LoopbackHttpServerThread(self.PORT, FakeProxyHandler) + self.server.start() + self.server.ready.wait() + if self.server.error: + raise self.server.error + + handler = urllib2.ProxyHandler({"http" : self.PROXY_URL}) + self._digest_auth_handler = urllib2.ProxyDigestAuthHandler() + self.opener = urllib2.build_opener(handler, self._digest_auth_handler) + + def tearDown(self): + self.server.stop() + + def test_proxy_with_bad_password_raises_httperror(self): + self._digest_auth_handler.add_password(self.REALM, self.URL, + self.USER, self.PASSWD+"bad") + FakeProxyHandler.digest_auth_handler.set_qop("auth") + self.assertRaises(urllib2.HTTPError, + self.opener.open, + self.URL) + + def test_proxy_with_no_password_raises_httperror(self): + FakeProxyHandler.digest_auth_handler.set_qop("auth") + self.assertRaises(urllib2.HTTPError, + self.opener.open, + self.URL) + + def test_proxy_qop_auth_works(self): + self._digest_auth_handler.add_password(self.REALM, self.URL, + self.USER, self.PASSWD) + FakeProxyHandler.digest_auth_handler.set_qop("auth") + result = self.opener.open(self.URL) + while result.read(): + pass + result.close() + + def test_proxy_qop_auth_int_works_or_throws_urlerror(self): + self._digest_auth_handler.add_password(self.REALM, self.URL, + self.USER, self.PASSWD) + FakeProxyHandler.digest_auth_handler.set_qop("auth-int") + try: + result = self.opener.open(self.URL) + except urllib2.URLError: + # It's okay if we don't support auth-int, but we certainly + # shouldn't receive any kind of exception here other than + # a URLError. + result = None + if result: + while result.read(): + pass + result.close() + +def test_main(): + # We will NOT depend on the network resource flag + # (Lib/test/regrtest.py -u network) since all tests here are only + # localhost. However, if this is a bad rationale, then uncomment + # the next line. + #test_support.requires("network") + + test_support.run_unittest(ProxyAuthTests) + +if __name__ == "__main__": + test_main() |