diff options
Diffstat (limited to 'Lib/test/test_urllib.py')
-rw-r--r-- | Lib/test/test_urllib.py | 207 |
1 files changed, 164 insertions, 43 deletions
diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py index 482acc1..c6f6f61 100644 --- a/Lib/test/test_urllib.py +++ b/Lib/test/test_urllib.py @@ -11,7 +11,8 @@ from test import support import os import sys import tempfile -import warnings + +from base64 import b64encode import collections def hexescape(char): @@ -38,6 +39,49 @@ def urlopen(url, data=None, proxies=None): else: return opener.open(url, data) + +class FakeHTTPMixin(object): + def fakehttp(self, fakedata): + class FakeSocket(io.BytesIO): + io_refs = 1 + + def sendall(self, data): + FakeHTTPConnection.buf = data + + def makefile(self, *args, **kwds): + self.io_refs += 1 + return self + + def read(self, amt=None): + if self.closed: + return b"" + return io.BytesIO.read(self, amt) + + def readline(self, length=None): + if self.closed: + return b"" + return io.BytesIO.readline(self, length) + + def close(self): + self.io_refs -= 1 + if self.io_refs == 0: + io.BytesIO.close(self) + + class FakeHTTPConnection(http.client.HTTPConnection): + + # buffer to store data for verification in urlopen tests. + buf = None + + def connect(self): + self.sock = FakeSocket(fakedata) + + self._connection_class = http.client.HTTPConnection + http.client.HTTPConnection = FakeHTTPConnection + + def unfakehttp(self): + http.client.HTTPConnection = self._connection_class + + class urlopen_FileTests(unittest.TestCase): """Test urlopen() opening a temporary file. @@ -89,19 +133,18 @@ class urlopen_FileTests(unittest.TestCase): def test_fileno(self): file_num = self.returned_obj.fileno() - self.assertTrue(isinstance(file_num, int), - "fileno() did not return an int") + self.assertIsInstance(file_num, int, "fileno() did not return an int") self.assertEqual(os.read(file_num, len(self.text)), self.text, "Reading on the file descriptor returned by fileno() " "did not return the expected text") def test_close(self): - # Test close() by calling it hear and then having it be called again + # Test close() by calling it here and then having it be called again # by the tearDown() method for the test self.returned_obj.close() def test_info(self): - self.assertTrue(isinstance(self.returned_obj.info(), email.message.Message)) + self.assertIsInstance(self.returned_obj.info(), email.message.Message) def test_geturl(self): self.assertEqual(self.returned_obj.geturl(), self.pathname) @@ -113,10 +156,14 @@ class urlopen_FileTests(unittest.TestCase): # Test iterator # Don't need to count number of iterations since test would fail the # instant it returned anything beyond the first line from the - # comparison - for line in self.returned_obj.__iter__(): + # comparison. + # Use the iterator in the usual implicit way to test for ticket #4608. + for line in self.returned_obj: self.assertEqual(line, self.text) + def test_relativelocalfile(self): + self.assertRaises(ValueError,urllib.request.urlopen,'./' + self.pathname) + class ProxyTests(unittest.TestCase): def setUp(self): @@ -137,33 +184,15 @@ class ProxyTests(unittest.TestCase): proxies = urllib.request.getproxies_environment() # getproxies_environment use lowered case truncated (no '_proxy') keys self.assertEqual('localhost', proxies['no']) + # List of no_proxies with space. + self.env.set('NO_PROXY', 'localhost, anotherdomain.com, newdomain.com') + self.assertTrue(urllib.request.proxy_bypass_environment('anotherdomain.com')) - -class urlopen_HttpTests(unittest.TestCase): +class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin): """Test urlopen() opening a fake http connection.""" - def fakehttp(self, fakedata): - class FakeSocket(io.BytesIO): - def sendall(self, str): pass - def makefile(self, *args, **kwds): - return self - def read(self, amt=None): - if self.closed: return b"" - return io.BytesIO.read(self, amt) - def readline(self, length=None): - if self.closed: return b"" - return io.BytesIO.readline(self, length) - class FakeHTTPConnection(http.client.HTTPConnection): - def connect(self): - self.sock = FakeSocket(fakedata) - self._connection_class = http.client.HTTPConnection - http.client.HTTPConnection = FakeHTTPConnection - - def unfakehttp(self): - http.client.HTTPConnection = self._connection_class - - def test_read(self): - self.fakehttp(b"Hello!") + def check_read(self, ver): + self.fakehttp(b"HTTP/" + ver + b" 200 OK\r\n\r\nHello!") try: fp = urlopen("http://python.org/") self.assertEqual(fp.readline(), b"Hello!") @@ -176,13 +205,32 @@ class urlopen_HttpTests(unittest.TestCase): def test_url_fragment(self): # Issue #11703: geturl() omits fragments in the original URL. url = 'http://docs.python.org/library/urllib.html#OK' - self.fakehttp(b'Hello!') + self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!") try: fp = urllib.request.urlopen(url) self.assertEqual(fp.geturl(), url) finally: self.unfakehttp() + def test_willclose(self): + self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!") + try: + resp = urlopen("http://www.python.org") + self.assertTrue(resp.fp.will_close) + finally: + self.unfakehttp() + + def test_read_0_9(self): + # "0.9" response accepted (but not "simple responses" without + # a status line) + self.check_read(b"0.9") + + def test_read_1_0(self): + self.check_read(b"1.0") + + def test_read_1_1(self): + self.check_read(b"1.1") + def test_read_bogus(self): # urlopen() should raise IOError for many error codes. self.fakehttp(b'''HTTP/1.1 401 Authentication Required @@ -221,7 +269,7 @@ Content-Type: text/html; charset=iso-8859-1 self.unfakehttp() def test_userpass_inurl(self): - self.fakehttp(b"Hello!") + self.fakehttp(b"HTTP/1.0 200 OK\r\n\r\nHello!") try: fp = urlopen("http://user:pass@python.org/") self.assertEqual(fp.readline(), b"Hello!") @@ -231,6 +279,25 @@ Content-Type: text/html; charset=iso-8859-1 finally: self.unfakehttp() + def test_userpass_inurl_w_spaces(self): + self.fakehttp(b"HTTP/1.0 200 OK\r\n\r\nHello!") + try: + userpass = "a b:c d" + url = "http://{}@python.org/".format(userpass) + fakehttp_wrapper = http.client.HTTPConnection + authorization = ("Authorization: Basic %s\r\n" % + b64encode(userpass.encode("ASCII")).decode("ASCII")) + fp = urlopen(url) + # The authorization header must be in place + self.assertIn(authorization, fakehttp_wrapper.buf.decode("UTF-8")) + self.assertEqual(fp.readline(), b"Hello!") + self.assertEqual(fp.readline(), b"") + # the spaces are quoted in URL so no match + self.assertNotEqual(fp.geturl(), url) + self.assertEqual(fp.getcode(), 200) + finally: + self.unfakehttp() + class urlretrieve_FileTests(unittest.TestCase): """Test urllib.urlretrieve() on local files""" @@ -262,8 +329,12 @@ class urlretrieve_FileTests(unittest.TestCase): except: pass def constructLocalFileUrl(self, filePath): - return "file://%s" % urllib.request.pathname2url( - os.path.abspath(filePath)) + filePath = os.path.abspath(filePath) + try: + filePath.encode("utf8") + except UnicodeEncodeError: + raise unittest.SkipTest("filePath is not encodable to utf8") + return "file://%s" % urllib.request.pathname2url(filePath) def createNewTempFile(self, data=b""): """Creates a new temporary file containing the specified data, @@ -289,9 +360,9 @@ class urlretrieve_FileTests(unittest.TestCase): # a headers value is returned. result = urllib.request.urlretrieve("file:%s" % support.TESTFN) self.assertEqual(result[0], support.TESTFN) - self.assertTrue(isinstance(result[1], email.message.Message), - "did not get a email.message.Message instance " - "as second returned value") + self.assertIsInstance(result[1], email.message.Message, + "did not get a email.message.Message instance " + "as second returned value") def test_copy(self): # Test that setting the filename argument works. @@ -314,9 +385,9 @@ class urlretrieve_FileTests(unittest.TestCase): def test_reporthook(self): # Make sure that the reporthook works. def hooktester(count, block_size, total_size, count_holder=[0]): - self.assertTrue(isinstance(count, int)) - self.assertTrue(isinstance(block_size, int)) - self.assertTrue(isinstance(total_size, int)) + self.assertIsInstance(count, int) + self.assertIsInstance(block_size, int) + self.assertIsInstance(total_size, int) self.assertEqual(count, count_holder[0]) count_holder[0] = count_holder[0] + 1 second_temp = "%s.2" % support.TESTFN @@ -365,6 +436,48 @@ class urlretrieve_FileTests(unittest.TestCase): self.assertEqual(report[0][1], 8192) self.assertEqual(report[0][2], 8193) + +class urlretrieve_HttpTests(unittest.TestCase, FakeHTTPMixin): + """Test urllib.urlretrieve() using fake http connections""" + + def test_short_content_raises_ContentTooShortError(self): + self.fakehttp(b'''HTTP/1.1 200 OK +Date: Wed, 02 Jan 2008 03:03:54 GMT +Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e +Connection: close +Content-Length: 100 +Content-Type: text/html; charset=iso-8859-1 + +FF +''') + + def _reporthook(par1, par2, par3): + pass + + with self.assertRaises(urllib.error.ContentTooShortError): + try: + urllib.request.urlretrieve('http://example.com/', + reporthook=_reporthook) + finally: + self.unfakehttp() + + def test_short_content_raises_ContentTooShortError_without_reporthook(self): + self.fakehttp(b'''HTTP/1.1 200 OK +Date: Wed, 02 Jan 2008 03:03:54 GMT +Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e +Connection: close +Content-Length: 100 +Content-Type: text/html; charset=iso-8859-1 + +FF +''') + with self.assertRaises(urllib.error.ContentTooShortError): + try: + urllib.request.urlretrieve('http://example.com/') + finally: + self.unfakehttp() + + class QuotingTests(unittest.TestCase): """Tests for urllib.quote() and urllib.quote_plus() @@ -461,6 +574,7 @@ class QuotingTests(unittest.TestCase): result = urllib.parse.quote(partial_quote) self.assertEqual(expected, result, "using quote(): %r != %r" % (expected, result)) + result = urllib.parse.quote_plus(partial_quote) self.assertEqual(expected, result, "using quote_plus(): %r != %r" % (expected, result)) @@ -598,8 +712,7 @@ class UnquotingTests(unittest.TestCase): "%s" % result) self.assertRaises((TypeError, AttributeError), urllib.parse.unquote, None) self.assertRaises((TypeError, AttributeError), urllib.parse.unquote, ()) - with warnings.catch_warnings(): - warnings.simplefilter('ignore', BytesWarning) + with support.check_warnings(('', BytesWarning), quiet=True): self.assertRaises((TypeError, AttributeError), urllib.parse.unquote, b'') def test_unquoting_badpercent(self): @@ -1029,6 +1142,13 @@ class Utility_Tests(unittest.TestCase): self.assertEqual(('user', 'a\fb'),urllib.parse.splitpasswd('user:a\fb')) self.assertEqual(('user', 'a\vb'),urllib.parse.splitpasswd('user:a\vb')) self.assertEqual(('user', 'a:b'),urllib.parse.splitpasswd('user:a:b')) + self.assertEqual(('user', 'a b'),urllib.parse.splitpasswd('user:a b')) + self.assertEqual(('user 2', 'ab'),urllib.parse.splitpasswd('user 2:ab')) + self.assertEqual(('user+1', 'a+b'),urllib.parse.splitpasswd('user+1:a+b')) + + def test_thishost(self): + """Test the urllib.request.thishost utility function returns a tuple""" + self.assertIsInstance(urllib.request.thishost(), tuple) class URLopener_Tests(unittest.TestCase): @@ -1132,6 +1252,7 @@ def test_main(): urlopen_FileTests, urlopen_HttpTests, urlretrieve_FileTests, + urlretrieve_HttpTests, ProxyTests, QuotingTests, UnquotingTests, |