summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_httplib.py151
-rw-r--r--Lib/test/test_urllib2.py103
2 files changed, 228 insertions, 26 deletions
diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py
index 1768a34..a179612 100644
--- a/Lib/test/test_httplib.py
+++ b/Lib/test/test_httplib.py
@@ -314,6 +314,124 @@ class HeaderTests(TestCase):
conn.putheader(name, value)
+class TransferEncodingTest(TestCase):
+ expected_body = b"It's just a flesh wound"
+
+ def test_endheaders_chunked(self):
+ conn = client.HTTPConnection('example.com')
+ conn.sock = FakeSocket(b'')
+ conn.putrequest('POST', '/')
+ conn.endheaders(self._make_body(), encode_chunked=True)
+
+ _, _, body = self._parse_request(conn.sock.data)
+ body = self._parse_chunked(body)
+ self.assertEqual(body, self.expected_body)
+
+ def test_explicit_headers(self):
+ # explicit chunked
+ conn = client.HTTPConnection('example.com')
+ conn.sock = FakeSocket(b'')
+ # this shouldn't actually be automatically chunk-encoded because the
+ # calling code has explicitly stated that it's taking care of it
+ conn.request(
+ 'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
+
+ _, headers, body = self._parse_request(conn.sock.data)
+ self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
+ self.assertEqual(headers['Transfer-Encoding'], 'chunked')
+ self.assertEqual(body, self.expected_body)
+
+ # explicit chunked, string body
+ conn = client.HTTPConnection('example.com')
+ conn.sock = FakeSocket(b'')
+ conn.request(
+ 'POST', '/', self.expected_body.decode('latin-1'),
+ {'Transfer-Encoding': 'chunked'})
+
+ _, headers, body = self._parse_request(conn.sock.data)
+ self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
+ self.assertEqual(headers['Transfer-Encoding'], 'chunked')
+ self.assertEqual(body, self.expected_body)
+
+ # User-specified TE, but request() does the chunk encoding
+ conn = client.HTTPConnection('example.com')
+ conn.sock = FakeSocket(b'')
+ conn.request('POST', '/',
+ headers={'Transfer-Encoding': 'gzip, chunked'},
+ encode_chunked=True,
+ body=self._make_body())
+ _, headers, body = self._parse_request(conn.sock.data)
+ self.assertNotIn('content-length', [k.lower() for k in headers])
+ self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
+ self.assertEqual(self._parse_chunked(body), self.expected_body)
+
+ def test_request(self):
+ for empty_lines in (False, True,):
+ conn = client.HTTPConnection('example.com')
+ conn.sock = FakeSocket(b'')
+ conn.request(
+ 'POST', '/', self._make_body(empty_lines=empty_lines))
+
+ _, headers, body = self._parse_request(conn.sock.data)
+ body = self._parse_chunked(body)
+ self.assertEqual(body, self.expected_body)
+ self.assertEqual(headers['Transfer-Encoding'], 'chunked')
+
+ # Content-Length and Transfer-Encoding SHOULD not be sent in the
+ # same request
+ self.assertNotIn('content-length', [k.lower() for k in headers])
+
+ def _make_body(self, empty_lines=False):
+ lines = self.expected_body.split(b' ')
+ for idx, line in enumerate(lines):
+ # for testing handling empty lines
+ if empty_lines and idx % 2:
+ yield b''
+ if idx < len(lines) - 1:
+ yield line + b' '
+ else:
+ yield line
+
+ def _parse_request(self, data):
+ lines = data.split(b'\r\n')
+ request = lines[0]
+ headers = {}
+ n = 1
+ while n < len(lines) and len(lines[n]) > 0:
+ key, val = lines[n].split(b':')
+ key = key.decode('latin-1').strip()
+ headers[key] = val.decode('latin-1').strip()
+ n += 1
+
+ return request, headers, b'\r\n'.join(lines[n + 1:])
+
+ def _parse_chunked(self, data):
+ body = []
+ trailers = {}
+ n = 0
+ lines = data.split(b'\r\n')
+ # parse body
+ while True:
+ size, chunk = lines[n:n+2]
+ size = int(size, 16)
+
+ if size == 0:
+ n += 1
+ break
+
+ self.assertEqual(size, len(chunk))
+ body.append(chunk)
+
+ n += 2
+ # we /should/ hit the end chunk, but check against the size of
+ # lines so we're not stuck in an infinite loop should we get
+ # malformed data
+ if n > len(lines):
+ break
+
+ return b''.join(body)
+
+
class BasicTest(TestCase):
def test_status_lines(self):
# Test HTTP status lines
@@ -564,11 +682,11 @@ class BasicTest(TestCase):
yield None
yield 'data_two'
- class UpdatingFile():
+ class UpdatingFile(io.TextIOBase):
mode = 'r'
d = data()
def read(self, blocksize=-1):
- return self.d.__next__()
+ return next(self.d)
expected = b'data'
@@ -1546,6 +1664,26 @@ class RequestBodyTest(TestCase):
message = client.parse_headers(f)
return message, f
+ def test_list_body(self):
+ # Note that no content-length is automatically calculated for
+ # an iterable. The request will fall back to send chunked
+ # transfer encoding.
+ cases = (
+ ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
+ ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
+ )
+ for body, expected in cases:
+ with self.subTest(body):
+ self.conn = client.HTTPConnection('example.com')
+ self.conn.sock = self.sock = FakeSocket('')
+
+ self.conn.request('PUT', '/url', body)
+ msg, f = self.get_headers_and_fp()
+ self.assertNotIn('Content-Type', msg)
+ self.assertNotIn('Content-Length', msg)
+ self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
+ self.assertEqual(expected, f.read())
+
def test_manual_content_length(self):
# Set an incorrect content-length so that we can verify that
# it will not be over-ridden by the library.
@@ -1588,8 +1726,13 @@ class RequestBodyTest(TestCase):
message, f = self.get_headers_and_fp()
self.assertEqual("text/plain", message.get_content_type())
self.assertIsNone(message.get_charset())
- self.assertEqual("4", message.get("content-length"))
- self.assertEqual(b'body', f.read())
+ # Note that the length of text files is unpredictable
+ # because it depends on character encoding and line ending
+ # translation. No content-length will be set, the body
+ # will be sent using chunked transfer encoding.
+ self.assertIsNone(message.get("content-length"))
+ self.assertEqual("chunked", message.get("transfer-encoding"))
+ self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
def test_binary_file_body(self):
self.addCleanup(support.unlink, support.TESTFN)
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py
index eda7ccc..0eea0c7 100644
--- a/Lib/test/test_urllib2.py
+++ b/Lib/test/test_urllib2.py
@@ -7,6 +7,8 @@ import io
import socket
import array
import sys
+import tempfile
+import subprocess
import urllib.request
# The proxy bypass method imported below has logic specific to the OSX
@@ -335,7 +337,8 @@ class MockHTTPClass:
else:
self._tunnel_headers.clear()
- def request(self, method, url, body=None, headers=None):
+ def request(self, method, url, body=None, headers=None, *,
+ encode_chunked=False):
self.method = method
self.selector = url
if headers is not None:
@@ -343,6 +346,7 @@ class MockHTTPClass:
self.req_headers.sort()
if body:
self.data = body
+ self.encode_chunked = encode_chunked
if self.raise_on_endheaders:
raise OSError()
@@ -908,41 +912,96 @@ class HandlerTests(unittest.TestCase):
self.assertEqual(req.unredirected_hdrs["Host"], "baz")
self.assertEqual(req.unredirected_hdrs["Spam"], "foo")
- # Check iterable body support
- def iterable_body():
- yield b"one"
- yield b"two"
- yield b"three"
+ def test_http_body_file(self):
+ # A regular file - Content Length is calculated unless already set.
- for headers in {}, {"Content-Length": 11}:
- req = Request("http://example.com/", iterable_body(), headers)
- if not headers:
- # Having an iterable body without a Content-Length should
- # raise an exception
- self.assertRaises(ValueError, h.do_request_, req)
- else:
+ h = urllib.request.AbstractHTTPHandler()
+ o = h.parent = MockOpener()
+
+ file_obj = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
+ file_path = file_obj.name
+ file_obj.write(b"Something\nSomething\nSomething\n")
+ file_obj.close()
+
+ for headers in {}, {"Content-Length": 30}:
+ with open(file_path, "rb") as f:
+ req = Request("http://example.com/", f, headers)
newreq = h.do_request_(req)
+ self.assertEqual(int(newreq.get_header('Content-length')), 30)
- # A file object.
- # Test only Content-Length attribute of request.
+ os.unlink(file_path)
+
+ def test_http_body_fileobj(self):
+ # A file object - Content Length is calculated unless already set.
+ # (Note that there are some subtle differences to a regular
+ # file, that is why we are testing both cases.)
+
+ h = urllib.request.AbstractHTTPHandler()
+ o = h.parent = MockOpener()
file_obj = io.BytesIO()
file_obj.write(b"Something\nSomething\nSomething\n")
for headers in {}, {"Content-Length": 30}:
+ file_obj.seek(0)
req = Request("http://example.com/", file_obj, headers)
- if not headers:
- # Having an iterable body without a Content-Length should
- # raise an exception
- self.assertRaises(ValueError, h.do_request_, req)
- else:
- newreq = h.do_request_(req)
- self.assertEqual(int(newreq.get_header('Content-length')), 30)
+ newreq = h.do_request_(req)
+ self.assertEqual(int(newreq.get_header('Content-length')), 30)
file_obj.close()
+ def test_http_body_pipe(self):
+ # A file reading from a pipe.
+ # A pipe cannot be seek'ed. There is no way to determine the
+ # content length up front. Thus, do_request_() should fall
+ # back to Transfer-encoding chunked.
+
+ h = urllib.request.AbstractHTTPHandler()
+ o = h.parent = MockOpener()
+
+ cmd = [sys.executable, "-c",
+ r"import sys; "
+ r"sys.stdout.buffer.write(b'Something\nSomething\nSomething\n')"]
+ for headers in {}, {"Content-Length": 30}:
+ with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc:
+ req = Request("http://example.com/", proc.stdout, headers)
+ newreq = h.do_request_(req)
+ if not headers:
+ self.assertEqual(newreq.get_header('Content-length'), None)
+ self.assertEqual(newreq.get_header('Transfer-encoding'),
+ 'chunked')
+ else:
+ self.assertEqual(int(newreq.get_header('Content-length')),
+ 30)
+
+ def test_http_body_iterable(self):
+ # Generic iterable. There is no way to determine the content
+ # length up front. Fall back to Transfer-encoding chunked.
+
+ h = urllib.request.AbstractHTTPHandler()
+ o = h.parent = MockOpener()
+
+ def iterable_body():
+ yield b"one"
+ yield b"two"
+ yield b"three"
+
+ for headers in {}, {"Content-Length": 11}:
+ req = Request("http://example.com/", iterable_body(), headers)
+ newreq = h.do_request_(req)
+ if not headers:
+ self.assertEqual(newreq.get_header('Content-length'), None)
+ self.assertEqual(newreq.get_header('Transfer-encoding'),
+ 'chunked')
+ else:
+ self.assertEqual(int(newreq.get_header('Content-length')), 11)
+
+ def test_http_body_array(self):
# array.array Iterable - Content Length is calculated
+ h = urllib.request.AbstractHTTPHandler()
+ o = h.parent = MockOpener()
+
iterable_array = array.array("I",[1,2,3,4])
for headers in {}, {"Content-Length": 16}: