diff options
Diffstat (limited to 'Lib/test/test_urllib2.py')
| -rw-r--r-- | Lib/test/test_urllib2.py | 308 |
1 files changed, 261 insertions, 47 deletions
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py index 7d41ea1..eda7ccc 100644 --- a/Lib/test/test_urllib2.py +++ b/Lib/test/test_urllib2.py @@ -11,7 +11,10 @@ import sys import urllib.request # The proxy bypass method imported below has logic specific to the OSX # proxy config data structure but is testable on all platforms. -from urllib.request import Request, OpenerDirector, _parse_proxy, _proxy_bypass_macosx_sysconf +from urllib.request import (Request, OpenerDirector, HTTPBasicAuthHandler, + HTTPPasswordMgrWithPriorAuth, _parse_proxy, + _proxy_bypass_macosx_sysconf, + AbstractDigestAuthHandler) from urllib.parse import urlparse import urllib.error import http.client @@ -21,6 +24,7 @@ import http.client # CacheFTPHandler (hard to write) # parse_keqv_list, parse_http_list, HTTPDigestAuthHandler + class TrivialTests(unittest.TestCase): def test___all__(self): @@ -71,6 +75,7 @@ class TrivialTests(unittest.TestCase): err = urllib.error.URLError('reason') self.assertIn(err.reason, str(err)) + class RequestHdrsTests(unittest.TestCase): def test_request_headers_dict(self): @@ -130,7 +135,6 @@ class RequestHdrsTests(unittest.TestCase): req.remove_header("Unredirected-spam") self.assertFalse(req.has_header("Unredirected-spam")) - def test_password_manager(self): mgr = urllib.request.HTTPPasswordMgr() add = mgr.add_password @@ -234,43 +238,60 @@ class RequestHdrsTests(unittest.TestCase): class MockOpener: addheaders = [] + def open(self, req, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): self.req, self.data, self.timeout = req, data, timeout + def error(self, proto, *args): self.proto, self.args = proto, args + class MockFile: - def read(self, count=None): pass - def readline(self, count=None): pass - def close(self): pass + def read(self, count=None): + pass + + def readline(self, count=None): + pass + + def close(self): + pass + class MockHeaders(dict): def getheaders(self, name): return list(self.values()) + class MockResponse(io.StringIO): def __init__(self, code, msg, headers, data, url=None): io.StringIO.__init__(self, data) self.code, self.msg, self.headers, self.url = code, msg, headers, url + def info(self): return self.headers + def geturl(self): return self.url + class MockCookieJar: def add_cookie_header(self, request): self.ach_req = request + def extract_cookies(self, response, request): self.ec_req, self.ec_r = request, response + class FakeMethod: def __init__(self, meth_name, action, handle): self.meth_name = meth_name self.handle = handle self.action = action + def __call__(self, *args): return self.handle(self.meth_name, self.action, *args) + class MockHTTPResponse(io.IOBase): def __init__(self, fp, msg, status, reason): self.fp = fp @@ -324,24 +345,31 @@ class MockHTTPClass: self.data = body if self.raise_on_endheaders: raise OSError() + def getresponse(self): return MockHTTPResponse(MockFile(), {}, 200, "OK") def close(self): pass + class MockHandler: # useful for testing handler machinery # see add_ordered_mock_handlers() docstring handler_order = 500 + def __init__(self, methods): self._define_methods(methods) + def _define_methods(self, methods): for spec in methods: - if len(spec) == 2: name, action = spec - else: name, action = spec, None + if len(spec) == 2: + name, action = spec + else: + name, action = spec, None meth = FakeMethod(name, action, self.handle) setattr(self.__class__, name, meth) + def handle(self, fn_name, action, *args, **kwds): self.parent.calls.append((self, fn_name, args, kwds)) if action is None: @@ -364,16 +392,21 @@ class MockHandler: elif action == "raise": raise urllib.error.URLError("blah") assert False - def close(self): pass + + def close(self): + pass + def add_parent(self, parent): self.parent = parent self.parent.calls = [] + def __lt__(self, other): if not hasattr(other, "handler_order"): # No handler_order, leave in original order. Yuck. return True return self.handler_order < other.handler_order + def add_ordered_mock_handlers(opener, meth_spec): """Create MockHandlers and add them to an OpenerDirector. @@ -396,7 +429,9 @@ def add_ordered_mock_handlers(opener, meth_spec): handlers = [] count = 0 for meths in meth_spec: - class MockHandlerSubclass(MockHandler): pass + class MockHandlerSubclass(MockHandler): + pass + h = MockHandlerSubclass(meths) h.handler_order += count h.add_parent(opener) @@ -405,12 +440,14 @@ def add_ordered_mock_handlers(opener, meth_spec): opener.add_handler(h) return handlers + def build_test_opener(*handler_instances): opener = OpenerDirector() for h in handler_instances: opener.add_handler(h) return opener + class MockHTTPHandler(urllib.request.BaseHandler): # useful for testing redirections and auth # sends supplied headers and code as first response @@ -419,11 +456,13 @@ class MockHTTPHandler(urllib.request.BaseHandler): self.code = code self.headers = headers self.reset() + def reset(self): self._count = 0 self.requests = [] + def http_open(self, req): - import email, http.client, copy + import email, copy self.requests.append(copy.deepcopy(req)) if self._count == 0: self._count = self._count + 1 @@ -436,23 +475,45 @@ class MockHTTPHandler(urllib.request.BaseHandler): msg = email.message_from_string("\r\n\r\n") return MockResponse(200, "OK", msg, "", req.get_full_url()) + class MockHTTPSHandler(urllib.request.AbstractHTTPHandler): # Useful for testing the Proxy-Authorization request by verifying the # properties of httpcon - def __init__(self): - urllib.request.AbstractHTTPHandler.__init__(self) + def __init__(self, debuglevel=0): + urllib.request.AbstractHTTPHandler.__init__(self, debuglevel=debuglevel) self.httpconn = MockHTTPClass() def https_open(self, req): return self.do_open(self.httpconn, req) + +class MockHTTPHandlerCheckAuth(urllib.request.BaseHandler): + # useful for testing auth + # sends supplied code response + # checks if auth header is specified in request + def __init__(self, code): + self.code = code + self.has_auth_header = False + + def reset(self): + self.has_auth_header = False + + def http_open(self, req): + if req.has_header('Authorization'): + self.has_auth_header = True + name = http.client.responses[self.code] + return MockResponse(self.code, name, MockFile(), "", req.get_full_url()) + + + class MockPasswordManager: def add_password(self, realm, uri, user, password): self.realm = realm self.url = uri self.user = user self.password = password + def find_user_password(self, realm, authuri): self.target_realm = realm self.target_url = authuri @@ -517,11 +578,11 @@ class OpenerDirectorTests(unittest.TestCase): def test_handler_order(self): o = OpenerDirector() handlers = [] - for meths, handler_order in [ - ([("http_open", "return self")], 500), - (["http_open"], 0), - ]: - class MockHandlerSubclass(MockHandler): pass + for meths, handler_order in [([("http_open", "return self")], 500), + (["http_open"], 0)]: + class MockHandlerSubclass(MockHandler): + pass + h = MockHandlerSubclass(meths) h.handler_order = handler_order handlers.append(h) @@ -559,7 +620,8 @@ class OpenerDirectorTests(unittest.TestCase): handlers = add_ordered_mock_handlers(o, meth_spec) class Unknown: - def __eq__(self, other): return True + def __eq__(self, other): + return True req = Request("http://example.com/") o.open(req) @@ -572,7 +634,6 @@ class OpenerDirectorTests(unittest.TestCase): self.assertEqual((handler, method_name), got[:2]) self.assertEqual(args, got[2]) - def test_processors(self): # *_request / *_response methods get called appropriately o = OpenerDirector() @@ -608,6 +669,7 @@ class OpenerDirectorTests(unittest.TestCase): if args[1] is not None: self.assertIsInstance(args[1], MockResponse) + def sanepathname2url(path): try: path.encode("utf-8") @@ -619,18 +681,25 @@ def sanepathname2url(path): # XXX don't ask me about the mac... return urlpath + class HandlerTests(unittest.TestCase): def test_ftp(self): class MockFTPWrapper: - def __init__(self, data): self.data = data + def __init__(self, data): + self.data = data + def retrfile(self, filename, filetype): self.filename, self.filetype = filename, filetype return io.StringIO(self.data), len(self.data) - def close(self): pass + + def close(self): + pass class NullFTPHandler(urllib.request.FTPHandler): - def __init__(self, data): self.data = data + def __init__(self, data): + self.data = data + def connect_ftp(self, user, passwd, host, port, dirs, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): self.user, self.passwd = user, passwd @@ -868,7 +937,7 @@ class HandlerTests(unittest.TestCase): self.assertRaises(ValueError, h.do_request_, req) else: newreq = h.do_request_(req) - self.assertEqual(int(newreq.get_header('Content-length')),30) + self.assertEqual(int(newreq.get_header('Content-length')), 30) file_obj.close() @@ -881,6 +950,13 @@ class HandlerTests(unittest.TestCase): newreq = h.do_request_(req) self.assertEqual(int(newreq.get_header('Content-length')),16) + def test_http_handler_debuglevel(self): + o = OpenerDirector() + h = MockHTTPSHandler(debuglevel=1) + o.add_handler(h) + o.open("https://www.example.com") + self.assertEqual(h._debuglevel, 1) + def test_http_doubleslash(self): # Checks the presence of any unnecessary double slash in url does not # break anything. Previously, a double slash directly after the host @@ -901,12 +977,12 @@ class HandlerTests(unittest.TestCase): # Check whether host is determined correctly if there is no proxy np_ds_req = h.do_request_(ds_req) - self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com") + self.assertEqual(np_ds_req.unredirected_hdrs["Host"], "example.com") # Check whether host is determined correctly if there is a proxy - ds_req.set_proxy("someproxy:3128",None) + ds_req.set_proxy("someproxy:3128", None) p_ds_req = h.do_request_(ds_req) - self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com") + self.assertEqual(p_ds_req.unredirected_hdrs["Host"], "example.com") def test_full_url_setter(self): # Checks to ensure that components are set correctly after setting the @@ -948,15 +1024,14 @@ class HandlerTests(unittest.TestCase): weird_url = 'http://www.python.org?getspam' req = Request(weird_url) newreq = h.do_request_(req) - self.assertEqual(newreq.host,'www.python.org') - self.assertEqual(newreq.selector,'/?getspam') + self.assertEqual(newreq.host, 'www.python.org') + self.assertEqual(newreq.selector, '/?getspam') url_without_path = 'http://www.python.org' req = Request(url_without_path) newreq = h.do_request_(req) - self.assertEqual(newreq.host,'www.python.org') - self.assertEqual(newreq.selector,'') - + self.assertEqual(newreq.host, 'www.python.org') + self.assertEqual(newreq.selector, '') def test_errors(self): h = urllib.request.HTTPErrorProcessor() @@ -1043,6 +1118,7 @@ class HandlerTests(unittest.TestCase): # loop detection req = Request(from_url) req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT + def redirect(h, req, url=to_url): h.http_error_302(req, MockFile(), 302, "Blah", MockHeaders({"location": url})) @@ -1073,7 +1149,6 @@ class HandlerTests(unittest.TestCase): self.assertEqual(count, urllib.request.HTTPRedirectHandler.max_redirections) - def test_invalid_redirect(self): from_url = "http://example.com/a.html" valid_schemes = ['http','https','ftp'] @@ -1133,6 +1208,57 @@ class HandlerTests(unittest.TestCase): fp = o.open('http://www.example.com') self.assertEqual(fp.geturl(), redirected_url.strip()) + def test_redirect_no_path(self): + # Issue 14132: Relative redirect strips original path + real_class = http.client.HTTPConnection + response1 = b"HTTP/1.1 302 Found\r\nLocation: ?query\r\n\r\n" + http.client.HTTPConnection = test_urllib.fakehttp(response1) + self.addCleanup(setattr, http.client, "HTTPConnection", real_class) + urls = iter(("/path", "/path?query")) + def request(conn, method, url, *pos, **kw): + self.assertEqual(url, next(urls)) + real_class.request(conn, method, url, *pos, **kw) + # Change response for subsequent connection + conn.__class__.fakedata = b"HTTP/1.1 200 OK\r\n\r\nHello!" + http.client.HTTPConnection.request = request + fp = urllib.request.urlopen("http://python.org/path") + self.assertEqual(fp.geturl(), "http://python.org/path?query") + + def test_redirect_encoding(self): + # Some characters in the redirect target may need special handling, + # but most ASCII characters should be treated as already encoded + class Handler(urllib.request.HTTPHandler): + def http_open(self, req): + result = self.do_open(self.connection, req) + self.last_buf = self.connection.buf + # Set up a normal response for the next request + self.connection = test_urllib.fakehttp( + b'HTTP/1.1 200 OK\r\n' + b'Content-Length: 3\r\n' + b'\r\n' + b'123' + ) + return result + handler = Handler() + opener = urllib.request.build_opener(handler) + tests = ( + (b'/p\xC3\xA5-dansk/', b'/p%C3%A5-dansk/'), + (b'/spaced%20path/', b'/spaced%20path/'), + (b'/spaced path/', b'/spaced%20path/'), + (b'/?p\xC3\xA5-dansk', b'/?p%C3%A5-dansk'), + ) + for [location, result] in tests: + with self.subTest(repr(location)): + handler.connection = test_urllib.fakehttp( + b'HTTP/1.1 302 Redirect\r\n' + b'Location: ' + location + b'\r\n' + b'\r\n' + ) + response = opener.open('http://example.com/') + expected = b'GET ' + result + b' ' + request = handler.last_buf + self.assertTrue(request.startswith(expected), repr(request)) + def test_proxy(self): o = OpenerDirector() ph = urllib.request.ProxyHandler(dict(http="proxy.example.com:3128")) @@ -1176,7 +1302,6 @@ class HandlerTests(unittest.TestCase): self.assertEqual(req.host, "www.python.org") del os.environ['no_proxy'] - def test_proxy_https(self): o = OpenerDirector() ph = urllib.request.ProxyHandler(dict(https="proxy.example.com:3128")) @@ -1200,21 +1325,21 @@ class HandlerTests(unittest.TestCase): https_handler = MockHTTPSHandler() o.add_handler(https_handler) req = Request("https://www.example.com/") - req.add_header("Proxy-Authorization","FooBar") - req.add_header("User-Agent","Grail") + req.add_header("Proxy-Authorization", "FooBar") + req.add_header("User-Agent", "Grail") self.assertEqual(req.host, "www.example.com") self.assertIsNone(req._tunnel_host) o.open(req) # Verify Proxy-Authorization gets tunneled to request. # httpsconn req_headers do not have the Proxy-Authorization header but # the req will have. - self.assertNotIn(("Proxy-Authorization","FooBar"), + self.assertNotIn(("Proxy-Authorization", "FooBar"), https_handler.httpconn.req_headers) - self.assertIn(("User-Agent","Grail"), + self.assertIn(("User-Agent", "Grail"), https_handler.httpconn.req_headers) self.assertIsNotNone(req._tunnel_host) self.assertEqual(req.host, "proxy.example.com:3128") - self.assertEqual(req.get_header("Proxy-authorization"),"FooBar") + self.assertEqual(req.get_header("Proxy-authorization"), "FooBar") # TODO: This should be only for OSX @unittest.skipUnless(sys.platform == 'darwin', "only relevant for OSX") @@ -1246,7 +1371,7 @@ class HandlerTests(unittest.TestCase): realm = "ACME Widget Store" http_handler = MockHTTPHandler( 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' % - (quote_char, realm, quote_char) ) + (quote_char, realm, quote_char)) opener.add_handler(auth_handler) opener.add_handler(http_handler) self._test_basic_auth(opener, auth_handler, "Authorization", @@ -1304,13 +1429,16 @@ class HandlerTests(unittest.TestCase): def __init__(self): OpenerDirector.__init__(self) self.recorded = [] + def record(self, info): self.recorded.append(info) + class TestDigestAuthHandler(urllib.request.HTTPDigestAuthHandler): def http_error_401(self, *args, **kwds): self.parent.record("digest") urllib.request.HTTPDigestAuthHandler.http_error_401(self, *args, **kwds) + class TestBasicAuthHandler(urllib.request.HTTPBasicAuthHandler): def http_error_401(self, *args, **kwds): self.parent.record("basic") @@ -1346,7 +1474,7 @@ class HandlerTests(unittest.TestCase): 401, 'WWW-Authenticate: Kerberos\r\n\r\n') opener.add_handler(digest_auth_handler) opener.add_handler(http_handler) - self.assertRaises(ValueError,opener.open,"http://www.example.com") + self.assertRaises(ValueError, opener.open, "http://www.example.com") def test_unsupported_auth_basic_handler(self): # While using BasicAuthHandler @@ -1356,7 +1484,7 @@ class HandlerTests(unittest.TestCase): 401, 'WWW-Authenticate: NTLM\r\n\r\n') opener.add_handler(basic_auth_handler) opener.add_handler(http_handler) - self.assertRaises(ValueError,opener.open,"http://www.example.com") + self.assertRaises(ValueError, opener.open, "http://www.example.com") def _test_basic_auth(self, opener, auth_handler, auth_header, realm, http_handler, password_manager, @@ -1395,6 +1523,72 @@ class HandlerTests(unittest.TestCase): self.assertEqual(len(http_handler.requests), 1) self.assertFalse(http_handler.requests[0].has_header(auth_header)) + def test_basic_prior_auth_auto_send(self): + # Assume already authenticated if is_authenticated=True + # for APIs like Github that don't return 401 + + user, password = "wile", "coyote" + request_url = "http://acme.example.com/protected" + + http_handler = MockHTTPHandlerCheckAuth(200) + + pwd_manager = HTTPPasswordMgrWithPriorAuth() + auth_prior_handler = HTTPBasicAuthHandler(pwd_manager) + auth_prior_handler.add_password( + None, request_url, user, password, is_authenticated=True) + + is_auth = pwd_manager.is_authenticated(request_url) + self.assertTrue(is_auth) + + opener = OpenerDirector() + opener.add_handler(auth_prior_handler) + opener.add_handler(http_handler) + + opener.open(request_url) + + # expect request to be sent with auth header + self.assertTrue(http_handler.has_auth_header) + + def test_basic_prior_auth_send_after_first_success(self): + # Auto send auth header after authentication is successful once + + user, password = 'wile', 'coyote' + request_url = 'http://acme.example.com/protected' + realm = 'ACME' + + pwd_manager = HTTPPasswordMgrWithPriorAuth() + auth_prior_handler = HTTPBasicAuthHandler(pwd_manager) + auth_prior_handler.add_password(realm, request_url, user, password) + + is_auth = pwd_manager.is_authenticated(request_url) + self.assertFalse(is_auth) + + opener = OpenerDirector() + opener.add_handler(auth_prior_handler) + + http_handler = MockHTTPHandler( + 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % None) + opener.add_handler(http_handler) + + opener.open(request_url) + + is_auth = pwd_manager.is_authenticated(request_url) + self.assertTrue(is_auth) + + http_handler = MockHTTPHandlerCheckAuth(200) + self.assertFalse(http_handler.has_auth_header) + + opener = OpenerDirector() + opener.add_handler(auth_prior_handler) + opener.add_handler(http_handler) + + # After getting 200 from MockHTTPHandler + # Next request sends header in the first request + opener.open(request_url) + + # expect request to be sent with auth header + self.assertTrue(http_handler.has_auth_header) + def test_http_closed(self): """Test the connection is cleaned up when the response is closed""" for (transfer, data) in ( @@ -1423,6 +1617,7 @@ class HandlerTests(unittest.TestCase): self.assertTrue(conn.fakesock.closed, "Connection not closed") + class MiscTests(unittest.TestCase): def opener_has_handler(self, opener, handler_class): @@ -1430,11 +1625,16 @@ class MiscTests(unittest.TestCase): for h in opener.handlers)) def test_build_opener(self): - class MyHTTPHandler(urllib.request.HTTPHandler): pass + class MyHTTPHandler(urllib.request.HTTPHandler): + pass + class FooHandler(urllib.request.BaseHandler): - def foo_open(self): pass + def foo_open(self): + pass + class BarHandler(urllib.request.BaseHandler): - def bar_open(self): pass + def bar_open(self): + pass build_opener = urllib.request.build_opener @@ -1461,7 +1661,9 @@ class MiscTests(unittest.TestCase): self.opener_has_handler(o, urllib.request.HTTPHandler) # Issue2670: multiple handlers sharing the same base class - class MyOtherHTTPHandler(urllib.request.HTTPHandler): pass + class MyOtherHTTPHandler(urllib.request.HTTPHandler): + pass + o = build_opener(MyHTTPHandler, MyOtherHTTPHandler) self.opener_has_handler(o, MyHTTPHandler) self.opener_has_handler(o, MyOtherHTTPHandler) @@ -1497,6 +1699,8 @@ class MiscTests(unittest.TestCase): self.assertEqual(err.headers, 'Content-Length: 42') expected_errmsg = 'HTTP Error %s: %s' % (err.code, err.msg) self.assertEqual(str(err), expected_errmsg) + expected_errmsg = '<HTTPError %s: %r>' % (err.code, err.msg) + self.assertEqual(repr(err), expected_errmsg) def test_parse_proxy(self): parse_proxy_test_cases = [ @@ -1535,9 +1739,19 @@ class MiscTests(unittest.TestCase): self.assertRaises(ValueError, _parse_proxy, 'file:/ftp.example.com'), + def test_unsupported_algorithm(self): + handler = AbstractDigestAuthHandler() + with self.assertRaises(ValueError) as exc: + handler.get_algorithm_impls('invalid') + self.assertEqual( + str(exc.exception), + "Unsupported digest authentication algorithm 'invalid'" + ) + + class RequestTests(unittest.TestCase): class PutRequest(Request): - method='PUT' + method = 'PUT' def setUp(self): self.get = Request("http://www.python.org/~jeremy/") @@ -1626,7 +1840,7 @@ class RequestTests(unittest.TestCase): def test_url_fullurl_get_full_url(self): urls = ['http://docs.python.org', 'http://docs.python.org/library/urllib2.html#OK', - 'http://www.python.org/?qs=query#fragment=true' ] + 'http://www.python.org/?qs=query#fragment=true'] for url in urls: req = Request(url) self.assertEqual(req.get_full_url(), req.full_url) |
