summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_urllib.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_urllib.py')
-rw-r--r--Lib/test/test_urllib.py217
1 files changed, 161 insertions, 56 deletions
diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py
index 2dec4e9..16236ef 100644
--- a/Lib/test/test_urllib.py
+++ b/Lib/test/test_urllib.py
@@ -7,8 +7,10 @@ import http.client
import email.message
import io
import unittest
+from unittest.mock import patch
from test import support
import os
+import ssl
import sys
import tempfile
from nturl2path import url2pathname, pathname2url
@@ -16,6 +18,7 @@ from nturl2path import url2pathname, pathname2url
from base64 import b64encode
import collections
+
def hexescape(char):
"""Escape char as RFC 2396 specifies"""
hex_repr = hex(ord(char))[2:].upper()
@@ -46,48 +49,73 @@ def urlopen(url, data=None, proxies=None):
return opener.open(url, data)
-class FakeHTTPMixin(object):
- def fakehttp(self, fakedata):
- class FakeSocket(io.BytesIO):
- io_refs = 1
+def fakehttp(fakedata):
+ class FakeSocket(io.BytesIO):
+ io_refs = 1
- def sendall(self, data):
- FakeHTTPConnection.buf = data
+ def sendall(self, data):
+ FakeHTTPConnection.buf = data
- def makefile(self, *args, **kwds):
- self.io_refs += 1
- return self
+ def makefile(self, *args, **kwds):
+ self.io_refs += 1
+ return self
- def read(self, amt=None):
- if self.closed:
- return b""
- return io.BytesIO.read(self, amt)
+ def read(self, amt=None):
+ if self.closed:
+ return b""
+ return io.BytesIO.read(self, amt)
- def readline(self, length=None):
- if self.closed:
- return b""
- return io.BytesIO.readline(self, length)
+ def readline(self, length=None):
+ if self.closed:
+ return b""
+ return io.BytesIO.readline(self, length)
- def close(self):
- self.io_refs -= 1
- if self.io_refs == 0:
- io.BytesIO.close(self)
+ def close(self):
+ self.io_refs -= 1
+ if self.io_refs == 0:
+ io.BytesIO.close(self)
+
+ class FakeHTTPConnection(http.client.HTTPConnection):
- class FakeHTTPConnection(http.client.HTTPConnection):
+ # buffer to store data for verification in urlopen tests.
+ buf = None
+ fakesock = FakeSocket(fakedata)
- # buffer to store data for verification in urlopen tests.
- buf = None
+ def connect(self):
+ self.sock = self.fakesock
- def connect(self):
- self.sock = FakeSocket(fakedata)
+ return FakeHTTPConnection
+
+class FakeHTTPMixin(object):
+ def fakehttp(self, fakedata):
self._connection_class = http.client.HTTPConnection
- http.client.HTTPConnection = FakeHTTPConnection
+ http.client.HTTPConnection = fakehttp(fakedata)
def unfakehttp(self):
http.client.HTTPConnection = self._connection_class
+class FakeFTPMixin(object):
+ def fakeftp(self):
+ class FakeFtpWrapper(object):
+ def __init__(self, user, passwd, host, port, dirs, timeout=None,
+ persistent=True):
+ pass
+
+ def retrfile(self, file, type):
+ return io.BytesIO(), 0
+
+ def close(self):
+ pass
+
+ self._ftpwrapper_class = urllib.request.ftpwrapper
+ urllib.request.ftpwrapper = FakeFtpWrapper
+
+ def unfakeftp(self):
+ urllib.request.ftpwrapper = self._ftpwrapper_class
+
+
class urlopen_FileTests(unittest.TestCase):
"""Test urlopen() opening a temporary file.
@@ -194,7 +222,7 @@ class ProxyTests(unittest.TestCase):
self.env.set('NO_PROXY', 'localhost, anotherdomain.com, newdomain.com')
self.assertTrue(urllib.request.proxy_bypass_environment('anotherdomain.com'))
-class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin):
+class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin):
"""Test urlopen() opening a fake http connection."""
def check_read(self, ver):
@@ -238,7 +266,7 @@ class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin):
self.check_read(b"1.1")
def test_read_bogus(self):
- # urlopen() should raise IOError for many error codes.
+ # urlopen() should raise OSError for many error codes.
self.fakehttp(b'''HTTP/1.1 401 Authentication Required
Date: Wed, 02 Jan 2008 03:03:54 GMT
Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
@@ -246,12 +274,12 @@ Connection: close
Content-Type: text/html; charset=iso-8859-1
''')
try:
- self.assertRaises(IOError, urlopen, "http://python.org/")
+ self.assertRaises(OSError, urlopen, "http://python.org/")
finally:
self.unfakehttp()
def test_invalid_redirect(self):
- # urlopen() should raise IOError for many error codes.
+ # urlopen() should raise OSError for many error codes.
self.fakehttp(b'''HTTP/1.1 302 Found
Date: Wed, 02 Jan 2008 03:03:54 GMT
Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
@@ -266,19 +294,20 @@ Content-Type: text/html; charset=iso-8859-1
self.unfakehttp()
def test_empty_socket(self):
- # urlopen() raises IOError if the underlying socket does not send any
+ # urlopen() raises OSError if the underlying socket does not send any
# data. (#1680230)
self.fakehttp(b'')
try:
- self.assertRaises(IOError, urlopen, "http://something")
+ self.assertRaises(OSError, urlopen, "http://something")
finally:
self.unfakehttp()
def test_missing_localfile(self):
# Test for #10836
- # 3.3 - URLError is not captured, explicit IOError is raised.
- with self.assertRaises(IOError):
+ with self.assertRaises(urllib.error.URLError) as e:
urlopen('file://localhost/a/file/which/doesnot/exists.py')
+ self.assertTrue(e.exception.filename)
+ self.assertTrue(e.exception.reason)
def test_file_notexists(self):
fd, tmp_file = tempfile.mkstemp()
@@ -291,20 +320,30 @@ Content-Type: text/html; charset=iso-8859-1
os.close(fd)
os.unlink(tmp_file)
self.assertFalse(os.path.exists(tmp_file))
- # 3.3 - IOError instead of URLError
- with self.assertRaises(IOError):
+ with self.assertRaises(urllib.error.URLError):
urlopen(tmp_fileurl)
def test_ftp_nohost(self):
test_ftp_url = 'ftp:///path'
- # 3.3 - IOError instead of URLError
- with self.assertRaises(IOError):
+ with self.assertRaises(urllib.error.URLError) as e:
urlopen(test_ftp_url)
+ self.assertFalse(e.exception.filename)
+ self.assertTrue(e.exception.reason)
def test_ftp_nonexisting(self):
- # 3.3 - IOError instead of URLError
- with self.assertRaises(IOError):
+ with self.assertRaises(urllib.error.URLError) as e:
urlopen('ftp://localhost/a/file/which/doesnot/exists.py')
+ self.assertFalse(e.exception.filename)
+ self.assertTrue(e.exception.reason)
+
+ @patch.object(urllib.request, 'MAXFTPCACHE', 0)
+ def test_ftp_cache_pruning(self):
+ self.fakeftp()
+ try:
+ urllib.request.ftpcache['test'] = urllib.request.ftpwrapper('user', 'pass', 'localhost', 21, [])
+ urlopen('ftp://localhost')
+ finally:
+ self.unfakeftp()
def test_userpass_inurl(self):
@@ -341,6 +380,86 @@ Content-Type: text/html; charset=iso-8859-1
with support.check_warnings(('',DeprecationWarning)):
urllib.request.URLopener()
+ def test_cafile_and_context(self):
+ context = ssl.create_default_context()
+ with self.assertRaises(ValueError):
+ urllib.request.urlopen(
+ "https://localhost", cafile="/nonexistent/path", context=context
+ )
+
+class urlopen_DataTests(unittest.TestCase):
+ """Test urlopen() opening a data URL."""
+
+ def setUp(self):
+ # text containing URL special- and unicode-characters
+ self.text = "test data URLs :;,%=& \u00f6 \u00c4 "
+ # 2x1 pixel RGB PNG image with one black and one white pixel
+ self.image = (
+ b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x02\x00\x00\x00'
+ b'\x01\x08\x02\x00\x00\x00{@\xe8\xdd\x00\x00\x00\x01sRGB\x00\xae'
+ b'\xce\x1c\xe9\x00\x00\x00\x0fIDAT\x08\xd7c```\xf8\xff\xff?\x00'
+ b'\x06\x01\x02\xfe\no/\x1e\x00\x00\x00\x00IEND\xaeB`\x82')
+
+ self.text_url = (
+ "data:text/plain;charset=UTF-8,test%20data%20URLs%20%3A%3B%2C%25%3"
+ "D%26%20%C3%B6%20%C3%84%20")
+ self.text_url_base64 = (
+ "data:text/plain;charset=ISO-8859-1;base64,dGVzdCBkYXRhIFVSTHMgOjs"
+ "sJT0mIPYgxCA%3D")
+ # base64 encoded data URL that contains ignorable spaces,
+ # such as "\n", " ", "%0A", and "%20".
+ self.image_url = (
+ "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAIAAAABCAIAAAB7\n"
+ "QOjdAAAAAXNSR0IArs4c6QAAAA9JREFUCNdj%0AYGBg%2BP//PwAGAQL%2BCm8 "
+ "vHgAAAABJRU5ErkJggg%3D%3D%0A%20")
+
+ self.text_url_resp = urllib.request.urlopen(self.text_url)
+ self.text_url_base64_resp = urllib.request.urlopen(
+ self.text_url_base64)
+ self.image_url_resp = urllib.request.urlopen(self.image_url)
+
+ def test_interface(self):
+ # Make sure object returned by urlopen() has the specified methods
+ for attr in ("read", "readline", "readlines",
+ "close", "info", "geturl", "getcode", "__iter__"):
+ self.assertTrue(hasattr(self.text_url_resp, attr),
+ "object returned by urlopen() lacks %s attribute" %
+ attr)
+
+ def test_info(self):
+ self.assertIsInstance(self.text_url_resp.info(), email.message.Message)
+ self.assertEqual(self.text_url_base64_resp.info().get_params(),
+ [('text/plain', ''), ('charset', 'ISO-8859-1')])
+ self.assertEqual(self.image_url_resp.info()['content-length'],
+ str(len(self.image)))
+ self.assertEqual(urllib.request.urlopen("data:,").info().get_params(),
+ [('text/plain', ''), ('charset', 'US-ASCII')])
+
+ def test_geturl(self):
+ self.assertEqual(self.text_url_resp.geturl(), self.text_url)
+ self.assertEqual(self.text_url_base64_resp.geturl(),
+ self.text_url_base64)
+ self.assertEqual(self.image_url_resp.geturl(), self.image_url)
+
+ def test_read_text(self):
+ self.assertEqual(self.text_url_resp.read().decode(
+ dict(self.text_url_resp.info().get_params())['charset']), self.text)
+
+ def test_read_text_base64(self):
+ self.assertEqual(self.text_url_base64_resp.read().decode(
+ dict(self.text_url_base64_resp.info().get_params())['charset']),
+ self.text)
+
+ def test_read_image(self):
+ self.assertEqual(self.image_url_resp.read(), self.image)
+
+ def test_missing_comma(self):
+ self.assertRaises(ValueError,urllib.request.urlopen,'data:text/plain')
+
+ def test_invalid_base64_data(self):
+ # missing padding character
+ self.assertRaises(ValueError,urllib.request.urlopen,'data:;base64,Cg=')
+
class urlretrieve_FileTests(unittest.TestCase):
"""Test urllib.urlretrieve() on local files"""
@@ -1175,21 +1294,6 @@ class Pathname_Tests(unittest.TestCase):
class Utility_Tests(unittest.TestCase):
"""Testcase to test the various utility functions in the urllib."""
- def test_splitpasswd(self):
- """Some of password examples are not sensible, but it is added to
- confirming to RFC2617 and addressing issue4675.
- """
- self.assertEqual(('user', 'ab'),urllib.parse.splitpasswd('user:ab'))
- self.assertEqual(('user', 'a\nb'),urllib.parse.splitpasswd('user:a\nb'))
- self.assertEqual(('user', 'a\tb'),urllib.parse.splitpasswd('user:a\tb'))
- self.assertEqual(('user', 'a\rb'),urllib.parse.splitpasswd('user:a\rb'))
- self.assertEqual(('user', 'a\fb'),urllib.parse.splitpasswd('user:a\fb'))
- self.assertEqual(('user', 'a\vb'),urllib.parse.splitpasswd('user:a\vb'))
- self.assertEqual(('user', 'a:b'),urllib.parse.splitpasswd('user:a:b'))
- self.assertEqual(('user', 'a b'),urllib.parse.splitpasswd('user:a b'))
- self.assertEqual(('user 2', 'ab'),urllib.parse.splitpasswd('user 2:ab'))
- self.assertEqual(('user+1', 'a+b'),urllib.parse.splitpasswd('user+1:a+b'))
-
def test_thishost(self):
"""Test the urllib.request.thishost utility function returns a tuple"""
self.assertIsInstance(urllib.request.thishost(), tuple)
@@ -1291,6 +1395,7 @@ class URLopener_Tests(unittest.TestCase):
# self.assertEqual(ftp.ftp.sock.gettimeout(), 30)
# ftp.close()
+
class RequestTests(unittest.TestCase):
"""Unit tests for urllib.request.Request."""