summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_urllib2.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_urllib2.py')
-rw-r--r--Lib/test/test_urllib2.py372
1 files changed, 204 insertions, 168 deletions
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py
index 64a2ee9..c8f19bc 100644
--- a/Lib/test/test_urllib2.py
+++ b/Lib/test/test_urllib2.py
@@ -10,10 +10,7 @@ from urllib2 import Request, OpenerDirector
# XXX
# Request
# CacheFTPHandler (hard to write)
-# parse_keqv_list, parse_http_list (I'm leaving this for Anthony Baxter
-# and Greg Stein, since they're doing Digest Authentication)
-# Authentication stuff (ditto)
-# CustomProxy, CustomProxyHandler
+# parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
class TrivialTests(unittest.TestCase):
def test_trivial(self):
@@ -49,6 +46,70 @@ class TrivialTests(unittest.TestCase):
self.assertEquals(urllib2.parse_http_list(string), list)
+def test_password_manager(self):
+ """
+ >>> mgr = urllib2.HTTPPasswordMgr()
+ >>> add = mgr.add_password
+ >>> add("Some Realm", "http://example.com/", "joe", "password")
+ >>> add("Some Realm", "http://example.com/ni", "ni", "ni")
+ >>> add("c", "http://example.com/foo", "foo", "ni")
+ >>> add("c", "http://example.com/bar", "bar", "nini")
+ >>> add("b", "http://example.com/", "first", "blah")
+ >>> add("b", "http://example.com/", "second", "spam")
+ >>> add("a", "http://example.com", "1", "a")
+ >>> add("Some Realm", "http://c.example.com:3128", "3", "c")
+ >>> add("Some Realm", "d.example.com", "4", "d")
+ >>> add("Some Realm", "e.example.com:3128", "5", "e")
+
+ >>> mgr.find_user_password("Some Realm", "example.com")
+ ('joe', 'password')
+ >>> mgr.find_user_password("Some Realm", "http://example.com")
+ ('joe', 'password')
+ >>> mgr.find_user_password("Some Realm", "http://example.com/")
+ ('joe', 'password')
+ >>> mgr.find_user_password("Some Realm", "http://example.com/spam")
+ ('joe', 'password')
+ >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
+ ('joe', 'password')
+ >>> mgr.find_user_password("c", "http://example.com/foo")
+ ('foo', 'ni')
+ >>> mgr.find_user_password("c", "http://example.com/bar")
+ ('bar', 'nini')
+
+ Currently, we use the highest-level path where more than one match:
+
+ >>> mgr.find_user_password("Some Realm", "http://example.com/ni")
+ ('joe', 'password')
+
+ Use latest add_password() in case of conflict:
+
+ >>> mgr.find_user_password("b", "http://example.com/")
+ ('second', 'spam')
+
+ No special relationship between a.example.com and example.com:
+
+ >>> mgr.find_user_password("a", "http://example.com/")
+ ('1', 'a')
+ >>> mgr.find_user_password("a", "http://a.example.com/")
+ (None, None)
+
+ Ports:
+
+ >>> mgr.find_user_password("Some Realm", "c.example.com")
+ (None, None)
+ >>> mgr.find_user_password("Some Realm", "c.example.com:3128")
+ ('3', 'c')
+ >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
+ ('3', 'c')
+ >>> mgr.find_user_password("Some Realm", "d.example.com")
+ ('4', 'd')
+ >>> mgr.find_user_password("Some Realm", "e.example.com:3128")
+ ('5', 'e')
+
+ """
+ pass
+
+
class MockOpener:
addheaders = []
def open(self, req, data=None):
@@ -89,6 +150,8 @@ class FakeMethod:
return self.handle(self.meth_name, self.action, *args)
class MockHandler:
+ # useful for testing handler machinery
+ # see add_ordered_mock_handlers() docstring
handler_order = 500
def __init__(self, methods):
self._define_methods(methods)
@@ -161,6 +224,50 @@ def add_ordered_mock_handlers(opener, meth_spec):
opener.add_handler(h)
return handlers
+def build_test_opener(*handler_instances):
+ opener = OpenerDirector()
+ for h in handler_instances:
+ opener.add_handler(h)
+ return opener
+
+class MockHTTPHandler(urllib2.BaseHandler):
+ # useful for testing redirections and auth
+ # sends supplied headers and code as first response
+ # sends 200 OK as second response
+ def __init__(self, code, headers):
+ self.code = code
+ self.headers = headers
+ self.reset()
+ def reset(self):
+ self._count = 0
+ self.requests = []
+ def http_open(self, req):
+ import mimetools, httplib, copy
+ from StringIO import StringIO
+ self.requests.append(copy.deepcopy(req))
+ if self._count == 0:
+ self._count = self._count + 1
+ name = httplib.responses[self.code]
+ msg = mimetools.Message(StringIO(self.headers))
+ return self.parent.error(
+ "http", req, MockFile(), self.code, name, msg)
+ else:
+ self.req = req
+ msg = mimetools.Message(StringIO("\r\n\r\n"))
+ return MockResponse(200, "OK", msg, "", req.get_full_url())
+
+class MockPasswordManager:
+ def add_password(self, realm, uri, user, password):
+ self.realm = realm
+ self.url = uri
+ self.user = user
+ self.password = password
+ def find_user_password(self, realm, authuri):
+ self.target_realm = realm
+ self.target_url = authuri
+ return self.user, self.password
+
+
class OpenerDirectorTests(unittest.TestCase):
def test_handled(self):
@@ -612,33 +719,18 @@ class HandlerTests(unittest.TestCase):
urllib2.HTTPRedirectHandler.max_redirections)
def test_cookie_redirect(self):
- class MockHTTPHandler(urllib2.HTTPHandler):
- def __init__(self): self._count = 0
- def http_open(self, req):
- import mimetools
- from StringIO import StringIO
- if self._count == 0:
- self._count = self._count + 1
- msg = mimetools.Message(
- StringIO("Location: http://www.cracker.com/\r\n\r\n"))
- return self.parent.error(
- "http", req, MockFile(), 302, "Found", msg)
- else:
- self.req = req
- msg = mimetools.Message(StringIO("\r\n\r\n"))
- return MockResponse(200, "OK", msg, "", req.get_full_url())
# cookies shouldn't leak into redirected requests
from cookielib import CookieJar
- from urllib2 import build_opener, HTTPHandler, HTTPError, \
- HTTPCookieProcessor
from test.test_cookielib import interact_netscape
cj = CookieJar()
interact_netscape(cj, "http://www.example.com/", "spam=eggs")
- hh = MockHTTPHandler()
- cp = HTTPCookieProcessor(cj)
- o = build_opener(hh, cp)
+ hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
+ hdeh = urllib2.HTTPDefaultErrorHandler()
+ hrh = urllib2.HTTPRedirectHandler()
+ cp = urllib2.HTTPCookieProcessor(cj)
+ o = build_test_opener(hh, hdeh, hrh, cp)
o.open("http://www.example.com/")
self.assert_(not hh.req.has_header("Cookie"))
@@ -659,6 +751,92 @@ class HandlerTests(unittest.TestCase):
self.assertEqual([(handlers[0], "http_open")],
[tup[0:2] for tup in o.calls])
+ def test_basic_auth(self):
+ opener = OpenerDirector()
+ password_manager = MockPasswordManager()
+ auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
+ realm = "ACME Widget Store"
+ http_handler = MockHTTPHandler(
+ 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
+ self._test_basic_auth(opener, auth_handler, "Authorization",
+ realm, http_handler, password_manager,
+ "http://acme.example.com/protected",
+ "http://acme.example.com/protected",
+ )
+
+ def test_proxy_basic_auth(self):
+ opener = OpenerDirector()
+ ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
+ opener.add_handler(ph)
+ password_manager = MockPasswordManager()
+ auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
+ realm = "ACME Networks"
+ http_handler = MockHTTPHandler(
+ 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
+ self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
+ realm, http_handler, password_manager,
+ "http://acme.example.com:3128/protected",
+ "proxy.example.com:3128",
+ )
+
+ def test_basic_and_digest_auth_handlers(self):
+ # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*
+ # response (http://python.org/sf/1479302), where it should instead
+ # return None to allow another handler (especially
+ # HTTPBasicAuthHandler) to handle the response.
+ class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
+ handler_order = 400 # strictly before HTTPBasicAuthHandler
+ opener = OpenerDirector()
+ password_manager = MockPasswordManager()
+ digest_handler = TestDigestAuthHandler(password_manager)
+ basic_handler = urllib2.HTTPBasicAuthHandler(password_manager)
+ opener.add_handler(digest_handler)
+ realm = "ACME Networks"
+ http_handler = MockHTTPHandler(
+ 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
+ self._test_basic_auth(opener, basic_handler, "Authorization",
+ realm, http_handler, password_manager,
+ "http://acme.example.com/protected",
+ "http://acme.example.com/protected",
+ )
+
+ def _test_basic_auth(self, opener, auth_handler, auth_header,
+ realm, http_handler, password_manager,
+ request_url, protected_url):
+ import base64, httplib
+ user, password = "wile", "coyote"
+ opener.add_handler(auth_handler)
+ opener.add_handler(http_handler)
+
+ # .add_password() fed through to password manager
+ auth_handler.add_password(realm, request_url, user, password)
+ self.assertEqual(realm, password_manager.realm)
+ self.assertEqual(request_url, password_manager.url)
+ self.assertEqual(user, password_manager.user)
+ self.assertEqual(password, password_manager.password)
+
+ r = opener.open(request_url)
+
+ # should have asked the password manager for the username/password
+ self.assertEqual(password_manager.target_realm, realm)
+ self.assertEqual(password_manager.target_url, protected_url)
+
+ # expect one request without authorization, then one with
+ self.assertEqual(len(http_handler.requests), 2)
+ self.assertFalse(http_handler.requests[0].has_header(auth_header))
+ userpass = '%s:%s' % (user, password)
+ auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip()
+ self.assertEqual(http_handler.requests[1].get_header(auth_header),
+ auth_hdr_value)
+
+ # if the password manager can't find a password, the handler won't
+ # handle the HTTP auth error
+ password_manager.user = password_manager.password = None
+ http_handler.reset()
+ r = opener.open(request_url)
+ self.assertEqual(len(http_handler.requests), 1)
+ self.assertFalse(http_handler.requests[0].has_header(auth_header))
+
class MiscTests(unittest.TestCase):
@@ -700,157 +878,15 @@ class MiscTests(unittest.TestCase):
else:
self.assert_(False)
-class NetworkTests(unittest.TestCase):
- def setUp(self):
- if 0: # for debugging
- import logging
- logger = logging.getLogger("test_urllib2")
- logger.addHandler(logging.StreamHandler())
-
- def test_range (self):
- req = urllib2.Request("http://www.python.org",
- headers={'Range': 'bytes=20-39'})
- result = urllib2.urlopen(req)
- data = result.read()
- self.assertEqual(len(data), 20)
-
- # XXX The rest of these tests aren't very good -- they don't check much.
- # They do sometimes catch some major disasters, though.
-
- def test_ftp(self):
- urls = [
- 'ftp://www.python.org/pub/python/misc/sousa.au',
- 'ftp://www.python.org/pub/tmp/blat',
- 'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC'
- '/research-reports/00README-Legal-Rules-Regs',
- ]
- self._test_urls(urls, self._extra_handlers())
-
- def test_gopher(self):
- import warnings
- warnings.filterwarnings("ignore",
- "the gopherlib module is deprecated",
- DeprecationWarning,
- "urllib2$")
- urls = [
- # Thanks to Fred for finding these!
- 'gopher://gopher.lib.ncsu.edu/11/library/stacks/Alex',
- 'gopher://gopher.vt.edu:10010/10/33',
- ]
- self._test_urls(urls, self._extra_handlers())
-
- def test_file(self):
- TESTFN = test_support.TESTFN
- f = open(TESTFN, 'w')
- try:
- f.write('hi there\n')
- f.close()
- urls = [
- 'file:'+sanepathname2url(os.path.abspath(TESTFN)),
-
- # XXX bug, should raise URLError
- #('file://nonsensename/etc/passwd', None, urllib2.URLError)
- ('file://nonsensename/etc/passwd', None, (OSError, socket.error))
- ]
- self._test_urls(urls, self._extra_handlers())
- finally:
- os.remove(TESTFN)
-
- def test_http(self):
- urls = [
- 'http://www.espn.com/', # redirect
- 'http://www.python.org/Spanish/Inquistion/',
- ('http://www.python.org/cgi-bin/faqw.py',
- 'query=pythonistas&querytype=simple&casefold=yes&req=search', None),
- 'http://www.python.org/',
- ]
- self._test_urls(urls, self._extra_handlers())
-
- # XXX Following test depends on machine configurations that are internal
- # to CNRI. Need to set up a public server with the right authentication
- # configuration for test purposes.
-
-## def test_cnri(self):
-## if socket.gethostname() == 'bitdiddle':
-## localhost = 'bitdiddle.cnri.reston.va.us'
-## elif socket.gethostname() == 'bitdiddle.concentric.net':
-## localhost = 'localhost'
-## else:
-## localhost = None
-## if localhost is not None:
-## urls = [
-## 'file://%s/etc/passwd' % localhost,
-## 'http://%s/simple/' % localhost,
-## 'http://%s/digest/' % localhost,
-## 'http://%s/not/found.h' % localhost,
-## ]
-
-## bauth = HTTPBasicAuthHandler()
-## bauth.add_password('basic_test_realm', localhost, 'jhylton',
-## 'password')
-## dauth = HTTPDigestAuthHandler()
-## dauth.add_password('digest_test_realm', localhost, 'jhylton',
-## 'password')
-
-## self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
-
- def _test_urls(self, urls, handlers):
- import socket
- import time
- import logging
- debug = logging.getLogger("test_urllib2").debug
-
- urllib2.install_opener(urllib2.build_opener(*handlers))
-
- for url in urls:
- if isinstance(url, tuple):
- url, req, expected_err = url
- else:
- req = expected_err = None
- debug(url)
- try:
- f = urllib2.urlopen(url, req)
- except (IOError, socket.error, OSError), err:
- debug(err)
- if expected_err:
- self.assert_(isinstance(err, expected_err))
- else:
- buf = f.read()
- f.close()
- debug("read %d bytes" % len(buf))
- debug("******** next url coming up...")
- time.sleep(0.1)
-
- def _extra_handlers(self):
- handlers = []
-
- handlers.append(urllib2.GopherHandler)
-
- cfh = urllib2.CacheFTPHandler()
- cfh.setTimeout(1)
- handlers.append(cfh)
-
-## # XXX try out some custom proxy objects too!
-## def at_cnri(req):
-## host = req.get_host()
-## debug(host)
-## if host[-18:] == '.cnri.reston.va.us':
-## return True
-## p = CustomProxy('http', at_cnri, 'proxy.cnri.reston.va.us')
-## ph = CustomProxyHandler(p)
-## handlers.append(ph)
-
- return handlers
-
def test_main(verbose=None):
+ from test import test_urllib2
+ test_support.run_doctest(test_urllib2, verbose)
test_support.run_doctest(urllib2, verbose)
tests = (TrivialTests,
OpenerDirectorTests,
HandlerTests,
MiscTests)
- if test_support.is_resource_enabled('network'):
- tests += (NetworkTests,)
test_support.run_unittest(*tests)
if __name__ == "__main__":