summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_urllib2.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_urllib2.py')
-rw-r--r--Lib/test/test_urllib2.py234
1 files changed, 190 insertions, 44 deletions
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py
index 7d41ea1..a5281d8 100644
--- a/Lib/test/test_urllib2.py
+++ b/Lib/test/test_urllib2.py
@@ -11,7 +11,9 @@ import sys
import urllib.request
# The proxy bypass method imported below has logic specific to the OSX
# proxy config data structure but is testable on all platforms.
-from urllib.request import Request, OpenerDirector, _parse_proxy, _proxy_bypass_macosx_sysconf
+from urllib.request import (Request, OpenerDirector, HTTPBasicAuthHandler,
+ HTTPPasswordMgrWithPriorAuth, _parse_proxy,
+ _proxy_bypass_macosx_sysconf)
from urllib.parse import urlparse
import urllib.error
import http.client
@@ -21,6 +23,7 @@ import http.client
# CacheFTPHandler (hard to write)
# parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
+
class TrivialTests(unittest.TestCase):
def test___all__(self):
@@ -71,6 +74,7 @@ class TrivialTests(unittest.TestCase):
err = urllib.error.URLError('reason')
self.assertIn(err.reason, str(err))
+
class RequestHdrsTests(unittest.TestCase):
def test_request_headers_dict(self):
@@ -130,7 +134,6 @@ class RequestHdrsTests(unittest.TestCase):
req.remove_header("Unredirected-spam")
self.assertFalse(req.has_header("Unredirected-spam"))
-
def test_password_manager(self):
mgr = urllib.request.HTTPPasswordMgr()
add = mgr.add_password
@@ -234,43 +237,60 @@ class RequestHdrsTests(unittest.TestCase):
class MockOpener:
addheaders = []
+
def open(self, req, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
self.req, self.data, self.timeout = req, data, timeout
+
def error(self, proto, *args):
self.proto, self.args = proto, args
+
class MockFile:
- def read(self, count=None): pass
- def readline(self, count=None): pass
- def close(self): pass
+ def read(self, count=None):
+ pass
+
+ def readline(self, count=None):
+ pass
+
+ def close(self):
+ pass
+
class MockHeaders(dict):
def getheaders(self, name):
return list(self.values())
+
class MockResponse(io.StringIO):
def __init__(self, code, msg, headers, data, url=None):
io.StringIO.__init__(self, data)
self.code, self.msg, self.headers, self.url = code, msg, headers, url
+
def info(self):
return self.headers
+
def geturl(self):
return self.url
+
class MockCookieJar:
def add_cookie_header(self, request):
self.ach_req = request
+
def extract_cookies(self, response, request):
self.ec_req, self.ec_r = request, response
+
class FakeMethod:
def __init__(self, meth_name, action, handle):
self.meth_name = meth_name
self.handle = handle
self.action = action
+
def __call__(self, *args):
return self.handle(self.meth_name, self.action, *args)
+
class MockHTTPResponse(io.IOBase):
def __init__(self, fp, msg, status, reason):
self.fp = fp
@@ -324,24 +344,31 @@ class MockHTTPClass:
self.data = body
if self.raise_on_endheaders:
raise OSError()
+
def getresponse(self):
return MockHTTPResponse(MockFile(), {}, 200, "OK")
def close(self):
pass
+
class MockHandler:
# useful for testing handler machinery
# see add_ordered_mock_handlers() docstring
handler_order = 500
+
def __init__(self, methods):
self._define_methods(methods)
+
def _define_methods(self, methods):
for spec in methods:
- if len(spec) == 2: name, action = spec
- else: name, action = spec, None
+ if len(spec) == 2:
+ name, action = spec
+ else:
+ name, action = spec, None
meth = FakeMethod(name, action, self.handle)
setattr(self.__class__, name, meth)
+
def handle(self, fn_name, action, *args, **kwds):
self.parent.calls.append((self, fn_name, args, kwds))
if action is None:
@@ -364,16 +391,21 @@ class MockHandler:
elif action == "raise":
raise urllib.error.URLError("blah")
assert False
- def close(self): pass
+
+ def close(self):
+ pass
+
def add_parent(self, parent):
self.parent = parent
self.parent.calls = []
+
def __lt__(self, other):
if not hasattr(other, "handler_order"):
# No handler_order, leave in original order. Yuck.
return True
return self.handler_order < other.handler_order
+
def add_ordered_mock_handlers(opener, meth_spec):
"""Create MockHandlers and add them to an OpenerDirector.
@@ -396,7 +428,9 @@ def add_ordered_mock_handlers(opener, meth_spec):
handlers = []
count = 0
for meths in meth_spec:
- class MockHandlerSubclass(MockHandler): pass
+ class MockHandlerSubclass(MockHandler):
+ pass
+
h = MockHandlerSubclass(meths)
h.handler_order += count
h.add_parent(opener)
@@ -405,12 +439,14 @@ def add_ordered_mock_handlers(opener, meth_spec):
opener.add_handler(h)
return handlers
+
def build_test_opener(*handler_instances):
opener = OpenerDirector()
for h in handler_instances:
opener.add_handler(h)
return opener
+
class MockHTTPHandler(urllib.request.BaseHandler):
# useful for testing redirections and auth
# sends supplied headers and code as first response
@@ -419,9 +455,11 @@ class MockHTTPHandler(urllib.request.BaseHandler):
self.code = code
self.headers = headers
self.reset()
+
def reset(self):
self._count = 0
self.requests = []
+
def http_open(self, req):
import email, http.client, copy
self.requests.append(copy.deepcopy(req))
@@ -436,6 +474,7 @@ class MockHTTPHandler(urllib.request.BaseHandler):
msg = email.message_from_string("\r\n\r\n")
return MockResponse(200, "OK", msg, "", req.get_full_url())
+
class MockHTTPSHandler(urllib.request.AbstractHTTPHandler):
# Useful for testing the Proxy-Authorization request by verifying the
# properties of httpcon
@@ -447,12 +486,33 @@ class MockHTTPSHandler(urllib.request.AbstractHTTPHandler):
def https_open(self, req):
return self.do_open(self.httpconn, req)
+
+class MockHTTPHandlerCheckAuth(urllib.request.BaseHandler):
+ # useful for testing auth
+ # sends supplied code response
+ # checks if auth header is specified in request
+ def __init__(self, code):
+ self.code = code
+ self.has_auth_header = False
+
+ def reset(self):
+ self.has_auth_header = False
+
+ def http_open(self, req):
+ if req.has_header('Authorization'):
+ self.has_auth_header = True
+ name = http.client.responses[self.code]
+ return MockResponse(self.code, name, MockFile(), "", req.get_full_url())
+
+
+
class MockPasswordManager:
def add_password(self, realm, uri, user, password):
self.realm = realm
self.url = uri
self.user = user
self.password = password
+
def find_user_password(self, realm, authuri):
self.target_realm = realm
self.target_url = authuri
@@ -517,11 +577,11 @@ class OpenerDirectorTests(unittest.TestCase):
def test_handler_order(self):
o = OpenerDirector()
handlers = []
- for meths, handler_order in [
- ([("http_open", "return self")], 500),
- (["http_open"], 0),
- ]:
- class MockHandlerSubclass(MockHandler): pass
+ for meths, handler_order in [([("http_open", "return self")], 500),
+ (["http_open"], 0)]:
+ class MockHandlerSubclass(MockHandler):
+ pass
+
h = MockHandlerSubclass(meths)
h.handler_order = handler_order
handlers.append(h)
@@ -559,7 +619,8 @@ class OpenerDirectorTests(unittest.TestCase):
handlers = add_ordered_mock_handlers(o, meth_spec)
class Unknown:
- def __eq__(self, other): return True
+ def __eq__(self, other):
+ return True
req = Request("http://example.com/")
o.open(req)
@@ -572,7 +633,6 @@ class OpenerDirectorTests(unittest.TestCase):
self.assertEqual((handler, method_name), got[:2])
self.assertEqual(args, got[2])
-
def test_processors(self):
# *_request / *_response methods get called appropriately
o = OpenerDirector()
@@ -608,6 +668,7 @@ class OpenerDirectorTests(unittest.TestCase):
if args[1] is not None:
self.assertIsInstance(args[1], MockResponse)
+
def sanepathname2url(path):
try:
path.encode("utf-8")
@@ -619,18 +680,25 @@ def sanepathname2url(path):
# XXX don't ask me about the mac...
return urlpath
+
class HandlerTests(unittest.TestCase):
def test_ftp(self):
class MockFTPWrapper:
- def __init__(self, data): self.data = data
+ def __init__(self, data):
+ self.data = data
+
def retrfile(self, filename, filetype):
self.filename, self.filetype = filename, filetype
return io.StringIO(self.data), len(self.data)
- def close(self): pass
+
+ def close(self):
+ pass
class NullFTPHandler(urllib.request.FTPHandler):
- def __init__(self, data): self.data = data
+ def __init__(self, data):
+ self.data = data
+
def connect_ftp(self, user, passwd, host, port, dirs,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
self.user, self.passwd = user, passwd
@@ -868,7 +936,7 @@ class HandlerTests(unittest.TestCase):
self.assertRaises(ValueError, h.do_request_, req)
else:
newreq = h.do_request_(req)
- self.assertEqual(int(newreq.get_header('Content-length')),30)
+ self.assertEqual(int(newreq.get_header('Content-length')), 30)
file_obj.close()
@@ -901,12 +969,12 @@ class HandlerTests(unittest.TestCase):
# Check whether host is determined correctly if there is no proxy
np_ds_req = h.do_request_(ds_req)
- self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
+ self.assertEqual(np_ds_req.unredirected_hdrs["Host"], "example.com")
# Check whether host is determined correctly if there is a proxy
- ds_req.set_proxy("someproxy:3128",None)
+ ds_req.set_proxy("someproxy:3128", None)
p_ds_req = h.do_request_(ds_req)
- self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
+ self.assertEqual(p_ds_req.unredirected_hdrs["Host"], "example.com")
def test_full_url_setter(self):
# Checks to ensure that components are set correctly after setting the
@@ -948,15 +1016,14 @@ class HandlerTests(unittest.TestCase):
weird_url = 'http://www.python.org?getspam'
req = Request(weird_url)
newreq = h.do_request_(req)
- self.assertEqual(newreq.host,'www.python.org')
- self.assertEqual(newreq.selector,'/?getspam')
+ self.assertEqual(newreq.host, 'www.python.org')
+ self.assertEqual(newreq.selector, '/?getspam')
url_without_path = 'http://www.python.org'
req = Request(url_without_path)
newreq = h.do_request_(req)
- self.assertEqual(newreq.host,'www.python.org')
- self.assertEqual(newreq.selector,'')
-
+ self.assertEqual(newreq.host, 'www.python.org')
+ self.assertEqual(newreq.selector, '')
def test_errors(self):
h = urllib.request.HTTPErrorProcessor()
@@ -1043,6 +1110,7 @@ class HandlerTests(unittest.TestCase):
# loop detection
req = Request(from_url)
req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
+
def redirect(h, req, url=to_url):
h.http_error_302(req, MockFile(), 302, "Blah",
MockHeaders({"location": url}))
@@ -1073,7 +1141,6 @@ class HandlerTests(unittest.TestCase):
self.assertEqual(count,
urllib.request.HTTPRedirectHandler.max_redirections)
-
def test_invalid_redirect(self):
from_url = "http://example.com/a.html"
valid_schemes = ['http','https','ftp']
@@ -1176,7 +1243,6 @@ class HandlerTests(unittest.TestCase):
self.assertEqual(req.host, "www.python.org")
del os.environ['no_proxy']
-
def test_proxy_https(self):
o = OpenerDirector()
ph = urllib.request.ProxyHandler(dict(https="proxy.example.com:3128"))
@@ -1200,21 +1266,21 @@ class HandlerTests(unittest.TestCase):
https_handler = MockHTTPSHandler()
o.add_handler(https_handler)
req = Request("https://www.example.com/")
- req.add_header("Proxy-Authorization","FooBar")
- req.add_header("User-Agent","Grail")
+ req.add_header("Proxy-Authorization", "FooBar")
+ req.add_header("User-Agent", "Grail")
self.assertEqual(req.host, "www.example.com")
self.assertIsNone(req._tunnel_host)
o.open(req)
# Verify Proxy-Authorization gets tunneled to request.
# httpsconn req_headers do not have the Proxy-Authorization header but
# the req will have.
- self.assertNotIn(("Proxy-Authorization","FooBar"),
+ self.assertNotIn(("Proxy-Authorization", "FooBar"),
https_handler.httpconn.req_headers)
- self.assertIn(("User-Agent","Grail"),
+ self.assertIn(("User-Agent", "Grail"),
https_handler.httpconn.req_headers)
self.assertIsNotNone(req._tunnel_host)
self.assertEqual(req.host, "proxy.example.com:3128")
- self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
+ self.assertEqual(req.get_header("Proxy-authorization"), "FooBar")
# TODO: This should be only for OSX
@unittest.skipUnless(sys.platform == 'darwin', "only relevant for OSX")
@@ -1246,7 +1312,7 @@ class HandlerTests(unittest.TestCase):
realm = "ACME Widget Store"
http_handler = MockHTTPHandler(
401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
- (quote_char, realm, quote_char) )
+ (quote_char, realm, quote_char))
opener.add_handler(auth_handler)
opener.add_handler(http_handler)
self._test_basic_auth(opener, auth_handler, "Authorization",
@@ -1304,13 +1370,16 @@ class HandlerTests(unittest.TestCase):
def __init__(self):
OpenerDirector.__init__(self)
self.recorded = []
+
def record(self, info):
self.recorded.append(info)
+
class TestDigestAuthHandler(urllib.request.HTTPDigestAuthHandler):
def http_error_401(self, *args, **kwds):
self.parent.record("digest")
urllib.request.HTTPDigestAuthHandler.http_error_401(self,
*args, **kwds)
+
class TestBasicAuthHandler(urllib.request.HTTPBasicAuthHandler):
def http_error_401(self, *args, **kwds):
self.parent.record("basic")
@@ -1346,7 +1415,7 @@ class HandlerTests(unittest.TestCase):
401, 'WWW-Authenticate: Kerberos\r\n\r\n')
opener.add_handler(digest_auth_handler)
opener.add_handler(http_handler)
- self.assertRaises(ValueError,opener.open,"http://www.example.com")
+ self.assertRaises(ValueError, opener.open, "http://www.example.com")
def test_unsupported_auth_basic_handler(self):
# While using BasicAuthHandler
@@ -1356,7 +1425,7 @@ class HandlerTests(unittest.TestCase):
401, 'WWW-Authenticate: NTLM\r\n\r\n')
opener.add_handler(basic_auth_handler)
opener.add_handler(http_handler)
- self.assertRaises(ValueError,opener.open,"http://www.example.com")
+ self.assertRaises(ValueError, opener.open, "http://www.example.com")
def _test_basic_auth(self, opener, auth_handler, auth_header,
realm, http_handler, password_manager,
@@ -1395,6 +1464,72 @@ class HandlerTests(unittest.TestCase):
self.assertEqual(len(http_handler.requests), 1)
self.assertFalse(http_handler.requests[0].has_header(auth_header))
+ def test_basic_prior_auth_auto_send(self):
+ # Assume already authenticated if is_authenticated=True
+ # for APIs like Github that don't return 401
+
+ user, password = "wile", "coyote"
+ request_url = "http://acme.example.com/protected"
+
+ http_handler = MockHTTPHandlerCheckAuth(200)
+
+ pwd_manager = HTTPPasswordMgrWithPriorAuth()
+ auth_prior_handler = HTTPBasicAuthHandler(pwd_manager)
+ auth_prior_handler.add_password(
+ None, request_url, user, password, is_authenticated=True)
+
+ is_auth = pwd_manager.is_authenticated(request_url)
+ self.assertTrue(is_auth)
+
+ opener = OpenerDirector()
+ opener.add_handler(auth_prior_handler)
+ opener.add_handler(http_handler)
+
+ opener.open(request_url)
+
+ # expect request to be sent with auth header
+ self.assertTrue(http_handler.has_auth_header)
+
+ def test_basic_prior_auth_send_after_first_success(self):
+ # Auto send auth header after authentication is successful once
+
+ user, password = 'wile', 'coyote'
+ request_url = 'http://acme.example.com/protected'
+ realm = 'ACME'
+
+ pwd_manager = HTTPPasswordMgrWithPriorAuth()
+ auth_prior_handler = HTTPBasicAuthHandler(pwd_manager)
+ auth_prior_handler.add_password(realm, request_url, user, password)
+
+ is_auth = pwd_manager.is_authenticated(request_url)
+ self.assertFalse(is_auth)
+
+ opener = OpenerDirector()
+ opener.add_handler(auth_prior_handler)
+
+ http_handler = MockHTTPHandler(
+ 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % None)
+ opener.add_handler(http_handler)
+
+ opener.open(request_url)
+
+ is_auth = pwd_manager.is_authenticated(request_url)
+ self.assertTrue(is_auth)
+
+ http_handler = MockHTTPHandlerCheckAuth(200)
+ self.assertFalse(http_handler.has_auth_header)
+
+ opener = OpenerDirector()
+ opener.add_handler(auth_prior_handler)
+ opener.add_handler(http_handler)
+
+ # After getting 200 from MockHTTPHandler
+ # Next request sends header in the first request
+ opener.open(request_url)
+
+ # expect request to be sent with auth header
+ self.assertTrue(http_handler.has_auth_header)
+
def test_http_closed(self):
"""Test the connection is cleaned up when the response is closed"""
for (transfer, data) in (
@@ -1423,6 +1558,7 @@ class HandlerTests(unittest.TestCase):
self.assertTrue(conn.fakesock.closed, "Connection not closed")
+
class MiscTests(unittest.TestCase):
def opener_has_handler(self, opener, handler_class):
@@ -1430,11 +1566,16 @@ class MiscTests(unittest.TestCase):
for h in opener.handlers))
def test_build_opener(self):
- class MyHTTPHandler(urllib.request.HTTPHandler): pass
+ class MyHTTPHandler(urllib.request.HTTPHandler):
+ pass
+
class FooHandler(urllib.request.BaseHandler):
- def foo_open(self): pass
+ def foo_open(self):
+ pass
+
class BarHandler(urllib.request.BaseHandler):
- def bar_open(self): pass
+ def bar_open(self):
+ pass
build_opener = urllib.request.build_opener
@@ -1461,7 +1602,9 @@ class MiscTests(unittest.TestCase):
self.opener_has_handler(o, urllib.request.HTTPHandler)
# Issue2670: multiple handlers sharing the same base class
- class MyOtherHTTPHandler(urllib.request.HTTPHandler): pass
+ class MyOtherHTTPHandler(urllib.request.HTTPHandler):
+ pass
+
o = build_opener(MyHTTPHandler, MyOtherHTTPHandler)
self.opener_has_handler(o, MyHTTPHandler)
self.opener_has_handler(o, MyOtherHTTPHandler)
@@ -1497,6 +1640,8 @@ class MiscTests(unittest.TestCase):
self.assertEqual(err.headers, 'Content-Length: 42')
expected_errmsg = 'HTTP Error %s: %s' % (err.code, err.msg)
self.assertEqual(str(err), expected_errmsg)
+ expected_errmsg = '<HTTPError %s: %r>' % (err.code, err.msg)
+ self.assertEqual(repr(err), expected_errmsg)
def test_parse_proxy(self):
parse_proxy_test_cases = [
@@ -1535,9 +1680,10 @@ class MiscTests(unittest.TestCase):
self.assertRaises(ValueError, _parse_proxy, 'file:/ftp.example.com'),
+
class RequestTests(unittest.TestCase):
class PutRequest(Request):
- method='PUT'
+ method = 'PUT'
def setUp(self):
self.get = Request("http://www.python.org/~jeremy/")
@@ -1626,7 +1772,7 @@ class RequestTests(unittest.TestCase):
def test_url_fullurl_get_full_url(self):
urls = ['http://docs.python.org',
'http://docs.python.org/library/urllib2.html#OK',
- 'http://www.python.org/?qs=query#fragment=true' ]
+ 'http://www.python.org/?qs=query#fragment=true']
for url in urls:
req = Request(url)
self.assertEqual(req.get_full_url(), req.full_url)