diff options
Diffstat (limited to 'Lib/test/test_urllib2.py')
-rw-r--r-- | Lib/test/test_urllib2.py | 234 |
1 files changed, 190 insertions, 44 deletions
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py index 7d41ea1..a5281d8 100644 --- a/Lib/test/test_urllib2.py +++ b/Lib/test/test_urllib2.py @@ -11,7 +11,9 @@ import sys import urllib.request # The proxy bypass method imported below has logic specific to the OSX # proxy config data structure but is testable on all platforms. -from urllib.request import Request, OpenerDirector, _parse_proxy, _proxy_bypass_macosx_sysconf +from urllib.request import (Request, OpenerDirector, HTTPBasicAuthHandler, + HTTPPasswordMgrWithPriorAuth, _parse_proxy, + _proxy_bypass_macosx_sysconf) from urllib.parse import urlparse import urllib.error import http.client @@ -21,6 +23,7 @@ import http.client # CacheFTPHandler (hard to write) # parse_keqv_list, parse_http_list, HTTPDigestAuthHandler + class TrivialTests(unittest.TestCase): def test___all__(self): @@ -71,6 +74,7 @@ class TrivialTests(unittest.TestCase): err = urllib.error.URLError('reason') self.assertIn(err.reason, str(err)) + class RequestHdrsTests(unittest.TestCase): def test_request_headers_dict(self): @@ -130,7 +134,6 @@ class RequestHdrsTests(unittest.TestCase): req.remove_header("Unredirected-spam") self.assertFalse(req.has_header("Unredirected-spam")) - def test_password_manager(self): mgr = urllib.request.HTTPPasswordMgr() add = mgr.add_password @@ -234,43 +237,60 @@ class RequestHdrsTests(unittest.TestCase): class MockOpener: addheaders = [] + def open(self, req, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): self.req, self.data, self.timeout = req, data, timeout + def error(self, proto, *args): self.proto, self.args = proto, args + class MockFile: - def read(self, count=None): pass - def readline(self, count=None): pass - def close(self): pass + def read(self, count=None): + pass + + def readline(self, count=None): + pass + + def close(self): + pass + class MockHeaders(dict): def getheaders(self, name): return list(self.values()) + class MockResponse(io.StringIO): def __init__(self, code, msg, headers, data, url=None): io.StringIO.__init__(self, data) self.code, self.msg, self.headers, self.url = code, msg, headers, url + def info(self): return self.headers + def geturl(self): return self.url + class MockCookieJar: def add_cookie_header(self, request): self.ach_req = request + def extract_cookies(self, response, request): self.ec_req, self.ec_r = request, response + class FakeMethod: def __init__(self, meth_name, action, handle): self.meth_name = meth_name self.handle = handle self.action = action + def __call__(self, *args): return self.handle(self.meth_name, self.action, *args) + class MockHTTPResponse(io.IOBase): def __init__(self, fp, msg, status, reason): self.fp = fp @@ -324,24 +344,31 @@ class MockHTTPClass: self.data = body if self.raise_on_endheaders: raise OSError() + def getresponse(self): return MockHTTPResponse(MockFile(), {}, 200, "OK") def close(self): pass + class MockHandler: # useful for testing handler machinery # see add_ordered_mock_handlers() docstring handler_order = 500 + def __init__(self, methods): self._define_methods(methods) + def _define_methods(self, methods): for spec in methods: - if len(spec) == 2: name, action = spec - else: name, action = spec, None + if len(spec) == 2: + name, action = spec + else: + name, action = spec, None meth = FakeMethod(name, action, self.handle) setattr(self.__class__, name, meth) + def handle(self, fn_name, action, *args, **kwds): self.parent.calls.append((self, fn_name, args, kwds)) if action is None: @@ -364,16 +391,21 @@ class MockHandler: elif action == "raise": raise urllib.error.URLError("blah") assert False - def close(self): pass + + def close(self): + pass + def add_parent(self, parent): self.parent = parent self.parent.calls = [] + def __lt__(self, other): if not hasattr(other, "handler_order"): # No handler_order, leave in original order. Yuck. return True return self.handler_order < other.handler_order + def add_ordered_mock_handlers(opener, meth_spec): """Create MockHandlers and add them to an OpenerDirector. @@ -396,7 +428,9 @@ def add_ordered_mock_handlers(opener, meth_spec): handlers = [] count = 0 for meths in meth_spec: - class MockHandlerSubclass(MockHandler): pass + class MockHandlerSubclass(MockHandler): + pass + h = MockHandlerSubclass(meths) h.handler_order += count h.add_parent(opener) @@ -405,12 +439,14 @@ def add_ordered_mock_handlers(opener, meth_spec): opener.add_handler(h) return handlers + def build_test_opener(*handler_instances): opener = OpenerDirector() for h in handler_instances: opener.add_handler(h) return opener + class MockHTTPHandler(urllib.request.BaseHandler): # useful for testing redirections and auth # sends supplied headers and code as first response @@ -419,9 +455,11 @@ class MockHTTPHandler(urllib.request.BaseHandler): self.code = code self.headers = headers self.reset() + def reset(self): self._count = 0 self.requests = [] + def http_open(self, req): import email, http.client, copy self.requests.append(copy.deepcopy(req)) @@ -436,6 +474,7 @@ class MockHTTPHandler(urllib.request.BaseHandler): msg = email.message_from_string("\r\n\r\n") return MockResponse(200, "OK", msg, "", req.get_full_url()) + class MockHTTPSHandler(urllib.request.AbstractHTTPHandler): # Useful for testing the Proxy-Authorization request by verifying the # properties of httpcon @@ -447,12 +486,33 @@ class MockHTTPSHandler(urllib.request.AbstractHTTPHandler): def https_open(self, req): return self.do_open(self.httpconn, req) + +class MockHTTPHandlerCheckAuth(urllib.request.BaseHandler): + # useful for testing auth + # sends supplied code response + # checks if auth header is specified in request + def __init__(self, code): + self.code = code + self.has_auth_header = False + + def reset(self): + self.has_auth_header = False + + def http_open(self, req): + if req.has_header('Authorization'): + self.has_auth_header = True + name = http.client.responses[self.code] + return MockResponse(self.code, name, MockFile(), "", req.get_full_url()) + + + class MockPasswordManager: def add_password(self, realm, uri, user, password): self.realm = realm self.url = uri self.user = user self.password = password + def find_user_password(self, realm, authuri): self.target_realm = realm self.target_url = authuri @@ -517,11 +577,11 @@ class OpenerDirectorTests(unittest.TestCase): def test_handler_order(self): o = OpenerDirector() handlers = [] - for meths, handler_order in [ - ([("http_open", "return self")], 500), - (["http_open"], 0), - ]: - class MockHandlerSubclass(MockHandler): pass + for meths, handler_order in [([("http_open", "return self")], 500), + (["http_open"], 0)]: + class MockHandlerSubclass(MockHandler): + pass + h = MockHandlerSubclass(meths) h.handler_order = handler_order handlers.append(h) @@ -559,7 +619,8 @@ class OpenerDirectorTests(unittest.TestCase): handlers = add_ordered_mock_handlers(o, meth_spec) class Unknown: - def __eq__(self, other): return True + def __eq__(self, other): + return True req = Request("http://example.com/") o.open(req) @@ -572,7 +633,6 @@ class OpenerDirectorTests(unittest.TestCase): self.assertEqual((handler, method_name), got[:2]) self.assertEqual(args, got[2]) - def test_processors(self): # *_request / *_response methods get called appropriately o = OpenerDirector() @@ -608,6 +668,7 @@ class OpenerDirectorTests(unittest.TestCase): if args[1] is not None: self.assertIsInstance(args[1], MockResponse) + def sanepathname2url(path): try: path.encode("utf-8") @@ -619,18 +680,25 @@ def sanepathname2url(path): # XXX don't ask me about the mac... return urlpath + class HandlerTests(unittest.TestCase): def test_ftp(self): class MockFTPWrapper: - def __init__(self, data): self.data = data + def __init__(self, data): + self.data = data + def retrfile(self, filename, filetype): self.filename, self.filetype = filename, filetype return io.StringIO(self.data), len(self.data) - def close(self): pass + + def close(self): + pass class NullFTPHandler(urllib.request.FTPHandler): - def __init__(self, data): self.data = data + def __init__(self, data): + self.data = data + def connect_ftp(self, user, passwd, host, port, dirs, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): self.user, self.passwd = user, passwd @@ -868,7 +936,7 @@ class HandlerTests(unittest.TestCase): self.assertRaises(ValueError, h.do_request_, req) else: newreq = h.do_request_(req) - self.assertEqual(int(newreq.get_header('Content-length')),30) + self.assertEqual(int(newreq.get_header('Content-length')), 30) file_obj.close() @@ -901,12 +969,12 @@ class HandlerTests(unittest.TestCase): # Check whether host is determined correctly if there is no proxy np_ds_req = h.do_request_(ds_req) - self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com") + self.assertEqual(np_ds_req.unredirected_hdrs["Host"], "example.com") # Check whether host is determined correctly if there is a proxy - ds_req.set_proxy("someproxy:3128",None) + ds_req.set_proxy("someproxy:3128", None) p_ds_req = h.do_request_(ds_req) - self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com") + self.assertEqual(p_ds_req.unredirected_hdrs["Host"], "example.com") def test_full_url_setter(self): # Checks to ensure that components are set correctly after setting the @@ -948,15 +1016,14 @@ class HandlerTests(unittest.TestCase): weird_url = 'http://www.python.org?getspam' req = Request(weird_url) newreq = h.do_request_(req) - self.assertEqual(newreq.host,'www.python.org') - self.assertEqual(newreq.selector,'/?getspam') + self.assertEqual(newreq.host, 'www.python.org') + self.assertEqual(newreq.selector, '/?getspam') url_without_path = 'http://www.python.org' req = Request(url_without_path) newreq = h.do_request_(req) - self.assertEqual(newreq.host,'www.python.org') - self.assertEqual(newreq.selector,'') - + self.assertEqual(newreq.host, 'www.python.org') + self.assertEqual(newreq.selector, '') def test_errors(self): h = urllib.request.HTTPErrorProcessor() @@ -1043,6 +1110,7 @@ class HandlerTests(unittest.TestCase): # loop detection req = Request(from_url) req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT + def redirect(h, req, url=to_url): h.http_error_302(req, MockFile(), 302, "Blah", MockHeaders({"location": url})) @@ -1073,7 +1141,6 @@ class HandlerTests(unittest.TestCase): self.assertEqual(count, urllib.request.HTTPRedirectHandler.max_redirections) - def test_invalid_redirect(self): from_url = "http://example.com/a.html" valid_schemes = ['http','https','ftp'] @@ -1176,7 +1243,6 @@ class HandlerTests(unittest.TestCase): self.assertEqual(req.host, "www.python.org") del os.environ['no_proxy'] - def test_proxy_https(self): o = OpenerDirector() ph = urllib.request.ProxyHandler(dict(https="proxy.example.com:3128")) @@ -1200,21 +1266,21 @@ class HandlerTests(unittest.TestCase): https_handler = MockHTTPSHandler() o.add_handler(https_handler) req = Request("https://www.example.com/") - req.add_header("Proxy-Authorization","FooBar") - req.add_header("User-Agent","Grail") + req.add_header("Proxy-Authorization", "FooBar") + req.add_header("User-Agent", "Grail") self.assertEqual(req.host, "www.example.com") self.assertIsNone(req._tunnel_host) o.open(req) # Verify Proxy-Authorization gets tunneled to request. # httpsconn req_headers do not have the Proxy-Authorization header but # the req will have. - self.assertNotIn(("Proxy-Authorization","FooBar"), + self.assertNotIn(("Proxy-Authorization", "FooBar"), https_handler.httpconn.req_headers) - self.assertIn(("User-Agent","Grail"), + self.assertIn(("User-Agent", "Grail"), https_handler.httpconn.req_headers) self.assertIsNotNone(req._tunnel_host) self.assertEqual(req.host, "proxy.example.com:3128") - self.assertEqual(req.get_header("Proxy-authorization"),"FooBar") + self.assertEqual(req.get_header("Proxy-authorization"), "FooBar") # TODO: This should be only for OSX @unittest.skipUnless(sys.platform == 'darwin', "only relevant for OSX") @@ -1246,7 +1312,7 @@ class HandlerTests(unittest.TestCase): realm = "ACME Widget Store" http_handler = MockHTTPHandler( 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' % - (quote_char, realm, quote_char) ) + (quote_char, realm, quote_char)) opener.add_handler(auth_handler) opener.add_handler(http_handler) self._test_basic_auth(opener, auth_handler, "Authorization", @@ -1304,13 +1370,16 @@ class HandlerTests(unittest.TestCase): def __init__(self): OpenerDirector.__init__(self) self.recorded = [] + def record(self, info): self.recorded.append(info) + class TestDigestAuthHandler(urllib.request.HTTPDigestAuthHandler): def http_error_401(self, *args, **kwds): self.parent.record("digest") urllib.request.HTTPDigestAuthHandler.http_error_401(self, *args, **kwds) + class TestBasicAuthHandler(urllib.request.HTTPBasicAuthHandler): def http_error_401(self, *args, **kwds): self.parent.record("basic") @@ -1346,7 +1415,7 @@ class HandlerTests(unittest.TestCase): 401, 'WWW-Authenticate: Kerberos\r\n\r\n') opener.add_handler(digest_auth_handler) opener.add_handler(http_handler) - self.assertRaises(ValueError,opener.open,"http://www.example.com") + self.assertRaises(ValueError, opener.open, "http://www.example.com") def test_unsupported_auth_basic_handler(self): # While using BasicAuthHandler @@ -1356,7 +1425,7 @@ class HandlerTests(unittest.TestCase): 401, 'WWW-Authenticate: NTLM\r\n\r\n') opener.add_handler(basic_auth_handler) opener.add_handler(http_handler) - self.assertRaises(ValueError,opener.open,"http://www.example.com") + self.assertRaises(ValueError, opener.open, "http://www.example.com") def _test_basic_auth(self, opener, auth_handler, auth_header, realm, http_handler, password_manager, @@ -1395,6 +1464,72 @@ class HandlerTests(unittest.TestCase): self.assertEqual(len(http_handler.requests), 1) self.assertFalse(http_handler.requests[0].has_header(auth_header)) + def test_basic_prior_auth_auto_send(self): + # Assume already authenticated if is_authenticated=True + # for APIs like Github that don't return 401 + + user, password = "wile", "coyote" + request_url = "http://acme.example.com/protected" + + http_handler = MockHTTPHandlerCheckAuth(200) + + pwd_manager = HTTPPasswordMgrWithPriorAuth() + auth_prior_handler = HTTPBasicAuthHandler(pwd_manager) + auth_prior_handler.add_password( + None, request_url, user, password, is_authenticated=True) + + is_auth = pwd_manager.is_authenticated(request_url) + self.assertTrue(is_auth) + + opener = OpenerDirector() + opener.add_handler(auth_prior_handler) + opener.add_handler(http_handler) + + opener.open(request_url) + + # expect request to be sent with auth header + self.assertTrue(http_handler.has_auth_header) + + def test_basic_prior_auth_send_after_first_success(self): + # Auto send auth header after authentication is successful once + + user, password = 'wile', 'coyote' + request_url = 'http://acme.example.com/protected' + realm = 'ACME' + + pwd_manager = HTTPPasswordMgrWithPriorAuth() + auth_prior_handler = HTTPBasicAuthHandler(pwd_manager) + auth_prior_handler.add_password(realm, request_url, user, password) + + is_auth = pwd_manager.is_authenticated(request_url) + self.assertFalse(is_auth) + + opener = OpenerDirector() + opener.add_handler(auth_prior_handler) + + http_handler = MockHTTPHandler( + 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % None) + opener.add_handler(http_handler) + + opener.open(request_url) + + is_auth = pwd_manager.is_authenticated(request_url) + self.assertTrue(is_auth) + + http_handler = MockHTTPHandlerCheckAuth(200) + self.assertFalse(http_handler.has_auth_header) + + opener = OpenerDirector() + opener.add_handler(auth_prior_handler) + opener.add_handler(http_handler) + + # After getting 200 from MockHTTPHandler + # Next request sends header in the first request + opener.open(request_url) + + # expect request to be sent with auth header + self.assertTrue(http_handler.has_auth_header) + def test_http_closed(self): """Test the connection is cleaned up when the response is closed""" for (transfer, data) in ( @@ -1423,6 +1558,7 @@ class HandlerTests(unittest.TestCase): self.assertTrue(conn.fakesock.closed, "Connection not closed") + class MiscTests(unittest.TestCase): def opener_has_handler(self, opener, handler_class): @@ -1430,11 +1566,16 @@ class MiscTests(unittest.TestCase): for h in opener.handlers)) def test_build_opener(self): - class MyHTTPHandler(urllib.request.HTTPHandler): pass + class MyHTTPHandler(urllib.request.HTTPHandler): + pass + class FooHandler(urllib.request.BaseHandler): - def foo_open(self): pass + def foo_open(self): + pass + class BarHandler(urllib.request.BaseHandler): - def bar_open(self): pass + def bar_open(self): + pass build_opener = urllib.request.build_opener @@ -1461,7 +1602,9 @@ class MiscTests(unittest.TestCase): self.opener_has_handler(o, urllib.request.HTTPHandler) # Issue2670: multiple handlers sharing the same base class - class MyOtherHTTPHandler(urllib.request.HTTPHandler): pass + class MyOtherHTTPHandler(urllib.request.HTTPHandler): + pass + o = build_opener(MyHTTPHandler, MyOtherHTTPHandler) self.opener_has_handler(o, MyHTTPHandler) self.opener_has_handler(o, MyOtherHTTPHandler) @@ -1497,6 +1640,8 @@ class MiscTests(unittest.TestCase): self.assertEqual(err.headers, 'Content-Length: 42') expected_errmsg = 'HTTP Error %s: %s' % (err.code, err.msg) self.assertEqual(str(err), expected_errmsg) + expected_errmsg = '<HTTPError %s: %r>' % (err.code, err.msg) + self.assertEqual(repr(err), expected_errmsg) def test_parse_proxy(self): parse_proxy_test_cases = [ @@ -1535,9 +1680,10 @@ class MiscTests(unittest.TestCase): self.assertRaises(ValueError, _parse_proxy, 'file:/ftp.example.com'), + class RequestTests(unittest.TestCase): class PutRequest(Request): - method='PUT' + method = 'PUT' def setUp(self): self.get = Request("http://www.python.org/~jeremy/") @@ -1626,7 +1772,7 @@ class RequestTests(unittest.TestCase): def test_url_fullurl_get_full_url(self): urls = ['http://docs.python.org', 'http://docs.python.org/library/urllib2.html#OK', - 'http://www.python.org/?qs=query#fragment=true' ] + 'http://www.python.org/?qs=query#fragment=true'] for url in urls: req = Request(url) self.assertEqual(req.get_full_url(), req.full_url) |