summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
authorGeorg Brandl <georg@python.org>2007-06-07 13:34:10 (GMT)
committerGeorg Brandl <georg@python.org>2007-06-07 13:34:10 (GMT)
commitff8712263deb720496b174ab95fbe81b162b8b0a (patch)
treea38aaa0bdc7b3cf9e51ae6277df403d13f24eeee /Lib/test
parent04233ee5e16478e5c092b2e275587605157607a6 (diff)
downloadcpython-ff8712263deb720496b174ab95fbe81b162b8b0a.zip
cpython-ff8712263deb720496b174ab95fbe81b162b8b0a.tar.gz
cpython-ff8712263deb720496b174ab95fbe81b162b8b0a.tar.bz2
Patch #1667860: Fix UnboundLocalError in urllib2.
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_urllib2_localnet.py301
1 files changed, 301 insertions, 0 deletions
diff --git a/Lib/test/test_urllib2_localnet.py b/Lib/test/test_urllib2_localnet.py
new file mode 100644
index 0000000..ff4349f
--- /dev/null
+++ b/Lib/test/test_urllib2_localnet.py
@@ -0,0 +1,301 @@
+#!/usr/bin/env python
+
+import sys
+import threading
+import urlparse
+import urllib2
+import BaseHTTPServer
+import unittest
+import hashlib
+from test import test_support
+
+# Loopback http server infrastructure
+
+class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
+ """HTTP server w/ a few modifications that make it useful for
+ loopback testing purposes.
+ """
+
+ def __init__(self, server_address, RequestHandlerClass):
+ BaseHTTPServer.HTTPServer.__init__(self,
+ server_address,
+ RequestHandlerClass)
+
+ # Set the timeout of our listening socket really low so
+ # that we can stop the server easily.
+ self.socket.settimeout(1.0)
+
+ def get_request(self):
+ """BaseHTTPServer method, overridden."""
+
+ request, client_address = self.socket.accept()
+
+ # It's a loopback connection, so setting the timeout
+ # really low shouldn't affect anything, but should make
+ # deadlocks less likely to occur.
+ request.settimeout(10.0)
+
+ return (request, client_address)
+
+class LoopbackHttpServerThread(threading.Thread):
+ """Stoppable thread that runs a loopback http server."""
+
+ def __init__(self, port, RequestHandlerClass):
+ threading.Thread.__init__(self)
+ self._RequestHandlerClass = RequestHandlerClass
+ self._stop = False
+ self._port = port
+ self._server_address = ('127.0.0.1', self._port)
+ self.ready = threading.Event()
+
+ def stop(self):
+ """Stops the webserver if it's currently running."""
+
+ # Set the stop flag.
+ self._stop = True
+
+ self.join()
+
+ def run(self):
+ protocol = "HTTP/1.0"
+
+ self._RequestHandlerClass.protocol_version = protocol
+ httpd = LoopbackHttpServer(self._server_address,
+ self._RequestHandlerClass)
+
+ sa = httpd.socket.getsockname()
+ #print "Serving HTTP on", sa[0], "port", sa[1], "..."
+
+ self.ready.set()
+ while not self._stop:
+ httpd.handle_request()
+
+# Authentication infrastructure
+
+class DigestAuthHandler:
+ """Handler for performing digest authentication."""
+
+ def __init__(self):
+ self._request_num = 0
+ self._nonces = []
+ self._users = {}
+ self._realm_name = "Test Realm"
+ self._qop = "auth"
+
+ def set_qop(self, qop):
+ self._qop = qop
+
+ def set_users(self, users):
+ assert isinstance(users, dict)
+ self._users = users
+
+ def set_realm(self, realm):
+ self._realm_name = realm
+
+ def _generate_nonce(self):
+ self._request_num += 1
+ nonce = hashlib.md5(str(self._request_num)).hexdigest()
+ self._nonces.append(nonce)
+ return nonce
+
+ def _create_auth_dict(self, auth_str):
+ first_space_index = auth_str.find(" ")
+ auth_str = auth_str[first_space_index+1:]
+
+ parts = auth_str.split(",")
+
+ auth_dict = {}
+ for part in parts:
+ name, value = part.split("=")
+ name = name.strip()
+ if value[0] == '"' and value[-1] == '"':
+ value = value[1:-1]
+ else:
+ value = value.strip()
+ auth_dict[name] = value
+ return auth_dict
+
+ def _validate_auth(self, auth_dict, password, method, uri):
+ final_dict = {}
+ final_dict.update(auth_dict)
+ final_dict["password"] = password
+ final_dict["method"] = method
+ final_dict["uri"] = uri
+ HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
+ HA1 = hashlib.md5(HA1_str).hexdigest()
+ HA2_str = "%(method)s:%(uri)s" % final_dict
+ HA2 = hashlib.md5(HA2_str).hexdigest()
+ final_dict["HA1"] = HA1
+ final_dict["HA2"] = HA2
+ response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
+ "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
+ response = hashlib.md5(response_str).hexdigest()
+
+ return response == auth_dict["response"]
+
+ def _return_auth_challenge(self, request_handler):
+ request_handler.send_response(407, "Proxy Authentication Required")
+ request_handler.send_header("Content-Type", "text/html")
+ request_handler.send_header(
+ 'Proxy-Authenticate', 'Digest realm="%s", '
+ 'qop="%s",'
+ 'nonce="%s", ' % \
+ (self._realm_name, self._qop, self._generate_nonce()))
+ # XXX: Not sure if we're supposed to add this next header or
+ # not.
+ #request_handler.send_header('Connection', 'close')
+ request_handler.end_headers()
+ request_handler.wfile.write("Proxy Authentication Required.")
+ return False
+
+ def handle_request(self, request_handler):
+ """Performs digest authentication on the given HTTP request
+ handler. Returns True if authentication was successful, False
+ otherwise.
+
+ If no users have been set, then digest auth is effectively
+ disabled and this method will always return True.
+ """
+
+ if len(self._users) == 0:
+ return True
+
+ if not request_handler.headers.has_key('Proxy-Authorization'):
+ return self._return_auth_challenge(request_handler)
+ else:
+ auth_dict = self._create_auth_dict(
+ request_handler.headers['Proxy-Authorization']
+ )
+ if self._users.has_key(auth_dict["username"]):
+ password = self._users[ auth_dict["username"] ]
+ else:
+ return self._return_auth_challenge(request_handler)
+ if not auth_dict.get("nonce") in self._nonces:
+ return self._return_auth_challenge(request_handler)
+ else:
+ self._nonces.remove(auth_dict["nonce"])
+
+ auth_validated = False
+
+ # MSIE uses short_path in its validation, but Python's
+ # urllib2 uses the full path, so we're going to see if
+ # either of them works here.
+
+ for path in [request_handler.path, request_handler.short_path]:
+ if self._validate_auth(auth_dict,
+ password,
+ request_handler.command,
+ path):
+ auth_validated = True
+
+ if not auth_validated:
+ return self._return_auth_challenge(request_handler)
+ return True
+
+# Proxy test infrastructure
+
+class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ """This is a 'fake proxy' that makes it look like the entire
+ internet has gone down due to a sudden zombie invasion. It main
+ utility is in providing us with authentication support for
+ testing.
+ """
+
+ digest_auth_handler = DigestAuthHandler()
+
+ def log_message(self, format, *args):
+ # Uncomment the next line for debugging.
+ #sys.stderr.write(format % args)
+ pass
+
+ def do_GET(self):
+ (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
+ self.path, 'http')
+ self.short_path = path
+ if self.digest_auth_handler.handle_request(self):
+ self.send_response(200, "OK")
+ self.send_header("Content-Type", "text/html")
+ self.end_headers()
+ self.wfile.write("You've reached %s!<BR>" % self.path)
+ self.wfile.write("Our apologies, but our server is down due to "
+ "a sudden zombie invasion.")
+
+# Test cases
+
+class ProxyAuthTests(unittest.TestCase):
+ URL = "http://www.foo.com"
+
+ PORT = 8080
+ USER = "tester"
+ PASSWD = "test123"
+ REALM = "TestRealm"
+
+ PROXY_URL = "http://127.0.0.1:%d" % PORT
+
+ def setUp(self):
+ FakeProxyHandler.digest_auth_handler.set_users({
+ self.USER : self.PASSWD
+ })
+ FakeProxyHandler.digest_auth_handler.set_realm(self.REALM)
+
+ self.server = LoopbackHttpServerThread(self.PORT, FakeProxyHandler)
+ self.server.start()
+ self.server.ready.wait()
+
+ handler = urllib2.ProxyHandler({"http" : self.PROXY_URL})
+ self._digest_auth_handler = urllib2.ProxyDigestAuthHandler()
+ self.opener = urllib2.build_opener(handler, self._digest_auth_handler)
+
+ def tearDown(self):
+ self.server.stop()
+
+ def test_proxy_with_bad_password_raises_httperror(self):
+ self._digest_auth_handler.add_password(self.REALM, self.URL,
+ self.USER, self.PASSWD+"bad")
+ FakeProxyHandler.digest_auth_handler.set_qop("auth")
+ self.assertRaises(urllib2.HTTPError,
+ self.opener.open,
+ self.URL)
+
+ def test_proxy_with_no_password_raises_httperror(self):
+ FakeProxyHandler.digest_auth_handler.set_qop("auth")
+ self.assertRaises(urllib2.HTTPError,
+ self.opener.open,
+ self.URL)
+
+ def test_proxy_qop_auth_works(self):
+ self._digest_auth_handler.add_password(self.REALM, self.URL,
+ self.USER, self.PASSWD)
+ FakeProxyHandler.digest_auth_handler.set_qop("auth")
+ result = self.opener.open(self.URL)
+ while result.read():
+ pass
+ result.close()
+
+ def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
+ self._digest_auth_handler.add_password(self.REALM, self.URL,
+ self.USER, self.PASSWD)
+ FakeProxyHandler.digest_auth_handler.set_qop("auth-int")
+ try:
+ result = self.opener.open(self.URL)
+ except urllib2.URLError:
+ # It's okay if we don't support auth-int, but we certainly
+ # shouldn't receive any kind of exception here other than
+ # a URLError.
+ result = None
+ if result:
+ while result.read():
+ pass
+ result.close()
+
+def test_main():
+ # We will NOT depend on the network resource flag
+ # (Lib/test/regrtest.py -u network) since all tests here are only
+ # localhost. However, if this is a bad rationale, then uncomment
+ # the next line.
+ #test_support.requires("network")
+
+ test_support.run_unittest(ProxyAuthTests)
+
+if __name__ == "__main__":
+ test_main()