diff options
Diffstat (limited to 'Lib/test/test_urllib2_localnet.py')
| -rw-r--r-- | Lib/test/test_urllib2_localnet.py | 136 |
1 files changed, 124 insertions, 12 deletions
diff --git a/Lib/test/test_urllib2_localnet.py b/Lib/test/test_urllib2_localnet.py index 31c638f..0650aa2 100644 --- a/Lib/test/test_urllib2_localnet.py +++ b/Lib/test/test_urllib2_localnet.py @@ -1,3 +1,4 @@ +import base64 import os import email import urllib.parse @@ -5,9 +6,15 @@ import urllib.request import http.server import unittest import hashlib + from test import support + threading = support.import_module('threading') +try: + import ssl +except ImportError: + ssl = None here = os.path.dirname(__file__) # Self-signed cert file for 'localhost' @@ -15,6 +22,7 @@ CERT_localhost = os.path.join(here, 'keycert.pem') # Self-signed cert file for 'fakehostname' CERT_fakehostname = os.path.join(here, 'keycert2.pem') + # Loopback http server infrastructure class LoopbackHttpServer(http.server.HTTPServer): @@ -53,14 +61,11 @@ class LoopbackHttpServerThread(threading.Thread): request_handler.protocol_version = "HTTP/1.0" self.httpd = LoopbackHttpServer(("127.0.0.1", 0), request_handler) - #print "Serving HTTP on %s port %s" % (self.httpd.server_name, - # self.httpd.server_port) self.port = self.httpd.server_port def stop(self): """Stops the webserver if it's currently running.""" - # Set the stop flag. self._stop_server = True self.join() @@ -193,6 +198,49 @@ class DigestAuthHandler: return self._return_auth_challenge(request_handler) return True + +class BasicAuthHandler(http.server.BaseHTTPRequestHandler): + """Handler for performing basic authentication.""" + # Server side values + USER = 'testUser' + PASSWD = 'testPass' + REALM = 'Test' + USER_PASSWD = "%s:%s" % (USER, PASSWD) + ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii') + + def __init__(self, *args, **kwargs): + http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) + + def log_message(self, format, *args): + # Suppress console log message + pass + + def do_HEAD(self): + self.send_response(200) + self.send_header("Content-type", "text/html") + self.end_headers() + + def do_AUTHHEAD(self): + self.send_response(401) + self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM) + self.send_header("Content-type", "text/html") + self.end_headers() + + def do_GET(self): + if not self.headers.get("Authorization", ""): + self.do_AUTHHEAD() + self.wfile.write(b"No Auth header received") + elif self.headers.get( + "Authorization", "") == "Basic " + self.ENCODED_AUTH: + self.send_response(200) + self.end_headers() + self.wfile.write(b"It works") + else: + # Request Unauthorized + self.do_AUTHHEAD() + + + # Proxy test infrastructure class FakeProxyHandler(http.server.BaseHTTPRequestHandler): @@ -228,6 +276,44 @@ class FakeProxyHandler(http.server.BaseHTTPRequestHandler): # Test cases +@unittest.skipUnless(threading, "Threading required for this test.") +class BasicAuthTests(unittest.TestCase): + USER = "testUser" + PASSWD = "testPass" + INCORRECT_PASSWD = "Incorrect" + REALM = "Test" + + def setUp(self): + super(BasicAuthTests, self).setUp() + # With Basic Authentication + def http_server_with_basic_auth_handler(*args, **kwargs): + return BasicAuthHandler(*args, **kwargs) + self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler) + self.server_url = 'http://127.0.0.1:%s' % self.server.port + self.server.start() + self.server.ready.wait() + + def tearDown(self): + self.server.stop() + super(BasicAuthTests, self).tearDown() + + def test_basic_auth_success(self): + ah = urllib.request.HTTPBasicAuthHandler() + ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD) + urllib.request.install_opener(urllib.request.build_opener(ah)) + try: + self.assertTrue(urllib.request.urlopen(self.server_url)) + except urllib.error.HTTPError: + self.fail("Basic auth failed for the url: %s", self.server_url) + + def test_basic_auth_httperror(self): + ah = urllib.request.HTTPBasicAuthHandler() + ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD) + urllib.request.install_opener(urllib.request.build_opener(ah)) + self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url) + + +@unittest.skipUnless(threading, "Threading required for this test.") class ProxyAuthTests(unittest.TestCase): URL = "http://localhost" @@ -240,6 +326,7 @@ class ProxyAuthTests(unittest.TestCase): self.digest_auth_handler = DigestAuthHandler() self.digest_auth_handler.set_users({self.USER: self.PASSWD}) self.digest_auth_handler.set_realm(self.REALM) + # With Digest Authentication. def create_fake_proxy_handler(*args, **kwargs): return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs) @@ -339,6 +426,7 @@ def GetRequestHandler(responses): return FakeHTTPRequestHandler +@unittest.skipUnless(threading, "Threading required for this test.") class TestUrlopen(unittest.TestCase): """Tests urllib.request.urlopen using the network. @@ -387,14 +475,14 @@ class TestUrlopen(unittest.TestCase): handler.port = port return handler - def start_https_server(self, responses=None, certfile=CERT_localhost): + def start_https_server(self, responses=None, **kwargs): if not hasattr(urllib.request, 'HTTPSHandler'): self.skipTest('ssl support required') from test.ssl_servers import make_https_server if responses is None: responses = [(200, [], b"we care a bit")] handler = GetRequestHandler(responses) - server = make_https_server(self, certfile=certfile, handler_class=handler) + server = make_https_server(self, handler_class=handler, **kwargs) handler.port = server.port return handler @@ -457,12 +545,12 @@ class TestUrlopen(unittest.TestCase): def test_https(self): handler = self.start_https_server() - data = self.urlopen("https://localhost:%s/bizarre" % handler.port) + context = ssl.create_default_context(cafile=CERT_localhost) + data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context) self.assertEqual(data, b"we care a bit") def test_https_with_cafile(self): handler = self.start_https_server(certfile=CERT_localhost) - import ssl # Good cert data = self.urlopen("https://localhost:%s/bizarre" % handler.port, cafile=CERT_localhost) @@ -484,6 +572,22 @@ class TestUrlopen(unittest.TestCase): self.urlopen("https://localhost:%s/bizarre" % handler.port, cadefault=True) + def test_https_sni(self): + if ssl is None: + self.skipTest("ssl module required") + if not ssl.HAS_SNI: + self.skipTest("SNI support required in OpenSSL") + sni_name = None + def cb_sni(ssl_sock, server_name, initial_context): + nonlocal sni_name + sni_name = server_name + context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + context.set_servername_callback(cb_sni) + handler = self.start_https_server(context=context, certfile=CERT_localhost) + context = ssl.create_default_context(cafile=CERT_localhost) + self.urlopen("https://localhost:%s" % handler.port, context=context) + self.assertEqual(sni_name, "localhost") + def test_sending_headers(self): handler = self.start_server() req = urllib.request.Request("http://localhost:%s/" % handler.port, @@ -530,7 +634,7 @@ class TestUrlopen(unittest.TestCase): # so we run the test only when -unetwork/-uall is specified to # mitigate the problem a bit (see #17564) support.requires('network') - self.assertRaises(IOError, + self.assertRaises(OSError, # Given that both VeriSign and various ISPs have in # the past or are presently hijacking various invalid # domain name requests in an attempt to boost traffic @@ -571,9 +675,17 @@ class TestUrlopen(unittest.TestCase): self.assertEqual(index + 1, len(lines)) -@support.reap_threads -def test_main(): - support.run_unittest(ProxyAuthTests, TestUrlopen) +threads_key = None + +def setUpModule(): + # Store the threading_setup in a key and ensure that it is cleaned up + # in the tearDown + global threads_key + threads_key = support.threading_setup() + +def tearDownModule(): + if threads_key: + support.threading_cleanup(threads_key) if __name__ == "__main__": - test_main() + unittest.main() |
