summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_urllib2_localnet.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_urllib2_localnet.py')
-rw-r--r--Lib/test/test_urllib2_localnet.py136
1 files changed, 124 insertions, 12 deletions
diff --git a/Lib/test/test_urllib2_localnet.py b/Lib/test/test_urllib2_localnet.py
index 31c638f..0650aa2 100644
--- a/Lib/test/test_urllib2_localnet.py
+++ b/Lib/test/test_urllib2_localnet.py
@@ -1,3 +1,4 @@
+import base64
import os
import email
import urllib.parse
@@ -5,9 +6,15 @@ import urllib.request
import http.server
import unittest
import hashlib
+
from test import support
+
threading = support.import_module('threading')
+try:
+ import ssl
+except ImportError:
+ ssl = None
here = os.path.dirname(__file__)
# Self-signed cert file for 'localhost'
@@ -15,6 +22,7 @@ CERT_localhost = os.path.join(here, 'keycert.pem')
# Self-signed cert file for 'fakehostname'
CERT_fakehostname = os.path.join(here, 'keycert2.pem')
+
# Loopback http server infrastructure
class LoopbackHttpServer(http.server.HTTPServer):
@@ -53,14 +61,11 @@ class LoopbackHttpServerThread(threading.Thread):
request_handler.protocol_version = "HTTP/1.0"
self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
request_handler)
- #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
- # self.httpd.server_port)
self.port = self.httpd.server_port
def stop(self):
"""Stops the webserver if it's currently running."""
- # Set the stop flag.
self._stop_server = True
self.join()
@@ -193,6 +198,49 @@ class DigestAuthHandler:
return self._return_auth_challenge(request_handler)
return True
+
+class BasicAuthHandler(http.server.BaseHTTPRequestHandler):
+ """Handler for performing basic authentication."""
+ # Server side values
+ USER = 'testUser'
+ PASSWD = 'testPass'
+ REALM = 'Test'
+ USER_PASSWD = "%s:%s" % (USER, PASSWD)
+ ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii')
+
+ def __init__(self, *args, **kwargs):
+ http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
+
+ def log_message(self, format, *args):
+ # Suppress console log message
+ pass
+
+ def do_HEAD(self):
+ self.send_response(200)
+ self.send_header("Content-type", "text/html")
+ self.end_headers()
+
+ def do_AUTHHEAD(self):
+ self.send_response(401)
+ self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
+ self.send_header("Content-type", "text/html")
+ self.end_headers()
+
+ def do_GET(self):
+ if not self.headers.get("Authorization", ""):
+ self.do_AUTHHEAD()
+ self.wfile.write(b"No Auth header received")
+ elif self.headers.get(
+ "Authorization", "") == "Basic " + self.ENCODED_AUTH:
+ self.send_response(200)
+ self.end_headers()
+ self.wfile.write(b"It works")
+ else:
+ # Request Unauthorized
+ self.do_AUTHHEAD()
+
+
+
# Proxy test infrastructure
class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
@@ -228,6 +276,44 @@ class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
# Test cases
+@unittest.skipUnless(threading, "Threading required for this test.")
+class BasicAuthTests(unittest.TestCase):
+ USER = "testUser"
+ PASSWD = "testPass"
+ INCORRECT_PASSWD = "Incorrect"
+ REALM = "Test"
+
+ def setUp(self):
+ super(BasicAuthTests, self).setUp()
+ # With Basic Authentication
+ def http_server_with_basic_auth_handler(*args, **kwargs):
+ return BasicAuthHandler(*args, **kwargs)
+ self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
+ self.server_url = 'http://127.0.0.1:%s' % self.server.port
+ self.server.start()
+ self.server.ready.wait()
+
+ def tearDown(self):
+ self.server.stop()
+ super(BasicAuthTests, self).tearDown()
+
+ def test_basic_auth_success(self):
+ ah = urllib.request.HTTPBasicAuthHandler()
+ ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
+ urllib.request.install_opener(urllib.request.build_opener(ah))
+ try:
+ self.assertTrue(urllib.request.urlopen(self.server_url))
+ except urllib.error.HTTPError:
+ self.fail("Basic auth failed for the url: %s", self.server_url)
+
+ def test_basic_auth_httperror(self):
+ ah = urllib.request.HTTPBasicAuthHandler()
+ ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD)
+ urllib.request.install_opener(urllib.request.build_opener(ah))
+ self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)
+
+
+@unittest.skipUnless(threading, "Threading required for this test.")
class ProxyAuthTests(unittest.TestCase):
URL = "http://localhost"
@@ -240,6 +326,7 @@ class ProxyAuthTests(unittest.TestCase):
self.digest_auth_handler = DigestAuthHandler()
self.digest_auth_handler.set_users({self.USER: self.PASSWD})
self.digest_auth_handler.set_realm(self.REALM)
+ # With Digest Authentication.
def create_fake_proxy_handler(*args, **kwargs):
return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
@@ -339,6 +426,7 @@ def GetRequestHandler(responses):
return FakeHTTPRequestHandler
+@unittest.skipUnless(threading, "Threading required for this test.")
class TestUrlopen(unittest.TestCase):
"""Tests urllib.request.urlopen using the network.
@@ -387,14 +475,14 @@ class TestUrlopen(unittest.TestCase):
handler.port = port
return handler
- def start_https_server(self, responses=None, certfile=CERT_localhost):
+ def start_https_server(self, responses=None, **kwargs):
if not hasattr(urllib.request, 'HTTPSHandler'):
self.skipTest('ssl support required')
from test.ssl_servers import make_https_server
if responses is None:
responses = [(200, [], b"we care a bit")]
handler = GetRequestHandler(responses)
- server = make_https_server(self, certfile=certfile, handler_class=handler)
+ server = make_https_server(self, handler_class=handler, **kwargs)
handler.port = server.port
return handler
@@ -457,12 +545,12 @@ class TestUrlopen(unittest.TestCase):
def test_https(self):
handler = self.start_https_server()
- data = self.urlopen("https://localhost:%s/bizarre" % handler.port)
+ context = ssl.create_default_context(cafile=CERT_localhost)
+ data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
self.assertEqual(data, b"we care a bit")
def test_https_with_cafile(self):
handler = self.start_https_server(certfile=CERT_localhost)
- import ssl
# Good cert
data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
cafile=CERT_localhost)
@@ -484,6 +572,22 @@ class TestUrlopen(unittest.TestCase):
self.urlopen("https://localhost:%s/bizarre" % handler.port,
cadefault=True)
+ def test_https_sni(self):
+ if ssl is None:
+ self.skipTest("ssl module required")
+ if not ssl.HAS_SNI:
+ self.skipTest("SNI support required in OpenSSL")
+ sni_name = None
+ def cb_sni(ssl_sock, server_name, initial_context):
+ nonlocal sni_name
+ sni_name = server_name
+ context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ context.set_servername_callback(cb_sni)
+ handler = self.start_https_server(context=context, certfile=CERT_localhost)
+ context = ssl.create_default_context(cafile=CERT_localhost)
+ self.urlopen("https://localhost:%s" % handler.port, context=context)
+ self.assertEqual(sni_name, "localhost")
+
def test_sending_headers(self):
handler = self.start_server()
req = urllib.request.Request("http://localhost:%s/" % handler.port,
@@ -530,7 +634,7 @@ class TestUrlopen(unittest.TestCase):
# so we run the test only when -unetwork/-uall is specified to
# mitigate the problem a bit (see #17564)
support.requires('network')
- self.assertRaises(IOError,
+ self.assertRaises(OSError,
# Given that both VeriSign and various ISPs have in
# the past or are presently hijacking various invalid
# domain name requests in an attempt to boost traffic
@@ -571,9 +675,17 @@ class TestUrlopen(unittest.TestCase):
self.assertEqual(index + 1, len(lines))
-@support.reap_threads
-def test_main():
- support.run_unittest(ProxyAuthTests, TestUrlopen)
+threads_key = None
+
+def setUpModule():
+ # Store the threading_setup in a key and ensure that it is cleaned up
+ # in the tearDown
+ global threads_key
+ threads_key = support.threading_setup()
+
+def tearDownModule():
+ if threads_key:
+ support.threading_cleanup(threads_key)
if __name__ == "__main__":
- test_main()
+ unittest.main()