diff options
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/test_httplib.py | 151 | ||||
-rw-r--r-- | Lib/test/test_urllib2.py | 103 |
2 files changed, 228 insertions, 26 deletions
diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py index 1768a34..a179612 100644 --- a/Lib/test/test_httplib.py +++ b/Lib/test/test_httplib.py @@ -314,6 +314,124 @@ class HeaderTests(TestCase): conn.putheader(name, value) +class TransferEncodingTest(TestCase): + expected_body = b"It's just a flesh wound" + + def test_endheaders_chunked(self): + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + conn.putrequest('POST', '/') + conn.endheaders(self._make_body(), encode_chunked=True) + + _, _, body = self._parse_request(conn.sock.data) + body = self._parse_chunked(body) + self.assertEqual(body, self.expected_body) + + def test_explicit_headers(self): + # explicit chunked + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + # this shouldn't actually be automatically chunk-encoded because the + # calling code has explicitly stated that it's taking care of it + conn.request( + 'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'}) + + _, headers, body = self._parse_request(conn.sock.data) + self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) + self.assertEqual(headers['Transfer-Encoding'], 'chunked') + self.assertEqual(body, self.expected_body) + + # explicit chunked, string body + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + conn.request( + 'POST', '/', self.expected_body.decode('latin-1'), + {'Transfer-Encoding': 'chunked'}) + + _, headers, body = self._parse_request(conn.sock.data) + self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) + self.assertEqual(headers['Transfer-Encoding'], 'chunked') + self.assertEqual(body, self.expected_body) + + # User-specified TE, but request() does the chunk encoding + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + conn.request('POST', '/', + headers={'Transfer-Encoding': 'gzip, chunked'}, + encode_chunked=True, + body=self._make_body()) + _, headers, body = self._parse_request(conn.sock.data) + self.assertNotIn('content-length', [k.lower() for k in headers]) + self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked') + self.assertEqual(self._parse_chunked(body), self.expected_body) + + def test_request(self): + for empty_lines in (False, True,): + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + conn.request( + 'POST', '/', self._make_body(empty_lines=empty_lines)) + + _, headers, body = self._parse_request(conn.sock.data) + body = self._parse_chunked(body) + self.assertEqual(body, self.expected_body) + self.assertEqual(headers['Transfer-Encoding'], 'chunked') + + # Content-Length and Transfer-Encoding SHOULD not be sent in the + # same request + self.assertNotIn('content-length', [k.lower() for k in headers]) + + def _make_body(self, empty_lines=False): + lines = self.expected_body.split(b' ') + for idx, line in enumerate(lines): + # for testing handling empty lines + if empty_lines and idx % 2: + yield b'' + if idx < len(lines) - 1: + yield line + b' ' + else: + yield line + + def _parse_request(self, data): + lines = data.split(b'\r\n') + request = lines[0] + headers = {} + n = 1 + while n < len(lines) and len(lines[n]) > 0: + key, val = lines[n].split(b':') + key = key.decode('latin-1').strip() + headers[key] = val.decode('latin-1').strip() + n += 1 + + return request, headers, b'\r\n'.join(lines[n + 1:]) + + def _parse_chunked(self, data): + body = [] + trailers = {} + n = 0 + lines = data.split(b'\r\n') + # parse body + while True: + size, chunk = lines[n:n+2] + size = int(size, 16) + + if size == 0: + n += 1 + break + + self.assertEqual(size, len(chunk)) + body.append(chunk) + + n += 2 + # we /should/ hit the end chunk, but check against the size of + # lines so we're not stuck in an infinite loop should we get + # malformed data + if n > len(lines): + break + + return b''.join(body) + + class BasicTest(TestCase): def test_status_lines(self): # Test HTTP status lines @@ -564,11 +682,11 @@ class BasicTest(TestCase): yield None yield 'data_two' - class UpdatingFile(): + class UpdatingFile(io.TextIOBase): mode = 'r' d = data() def read(self, blocksize=-1): - return self.d.__next__() + return next(self.d) expected = b'data' @@ -1546,6 +1664,26 @@ class RequestBodyTest(TestCase): message = client.parse_headers(f) return message, f + def test_list_body(self): + # Note that no content-length is automatically calculated for + # an iterable. The request will fall back to send chunked + # transfer encoding. + cases = ( + ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), + ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), + ) + for body, expected in cases: + with self.subTest(body): + self.conn = client.HTTPConnection('example.com') + self.conn.sock = self.sock = FakeSocket('') + + self.conn.request('PUT', '/url', body) + msg, f = self.get_headers_and_fp() + self.assertNotIn('Content-Type', msg) + self.assertNotIn('Content-Length', msg) + self.assertEqual(msg.get('Transfer-Encoding'), 'chunked') + self.assertEqual(expected, f.read()) + def test_manual_content_length(self): # Set an incorrect content-length so that we can verify that # it will not be over-ridden by the library. @@ -1588,8 +1726,13 @@ class RequestBodyTest(TestCase): message, f = self.get_headers_and_fp() self.assertEqual("text/plain", message.get_content_type()) self.assertIsNone(message.get_charset()) - self.assertEqual("4", message.get("content-length")) - self.assertEqual(b'body', f.read()) + # Note that the length of text files is unpredictable + # because it depends on character encoding and line ending + # translation. No content-length will be set, the body + # will be sent using chunked transfer encoding. + self.assertIsNone(message.get("content-length")) + self.assertEqual("chunked", message.get("transfer-encoding")) + self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read()) def test_binary_file_body(self): self.addCleanup(support.unlink, support.TESTFN) diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py index eda7ccc..0eea0c7 100644 --- a/Lib/test/test_urllib2.py +++ b/Lib/test/test_urllib2.py @@ -7,6 +7,8 @@ import io import socket import array import sys +import tempfile +import subprocess import urllib.request # The proxy bypass method imported below has logic specific to the OSX @@ -335,7 +337,8 @@ class MockHTTPClass: else: self._tunnel_headers.clear() - def request(self, method, url, body=None, headers=None): + def request(self, method, url, body=None, headers=None, *, + encode_chunked=False): self.method = method self.selector = url if headers is not None: @@ -343,6 +346,7 @@ class MockHTTPClass: self.req_headers.sort() if body: self.data = body + self.encode_chunked = encode_chunked if self.raise_on_endheaders: raise OSError() @@ -908,41 +912,96 @@ class HandlerTests(unittest.TestCase): self.assertEqual(req.unredirected_hdrs["Host"], "baz") self.assertEqual(req.unredirected_hdrs["Spam"], "foo") - # Check iterable body support - def iterable_body(): - yield b"one" - yield b"two" - yield b"three" + def test_http_body_file(self): + # A regular file - Content Length is calculated unless already set. - for headers in {}, {"Content-Length": 11}: - req = Request("http://example.com/", iterable_body(), headers) - if not headers: - # Having an iterable body without a Content-Length should - # raise an exception - self.assertRaises(ValueError, h.do_request_, req) - else: + h = urllib.request.AbstractHTTPHandler() + o = h.parent = MockOpener() + + file_obj = tempfile.NamedTemporaryFile(mode='w+b', delete=False) + file_path = file_obj.name + file_obj.write(b"Something\nSomething\nSomething\n") + file_obj.close() + + for headers in {}, {"Content-Length": 30}: + with open(file_path, "rb") as f: + req = Request("http://example.com/", f, headers) newreq = h.do_request_(req) + self.assertEqual(int(newreq.get_header('Content-length')), 30) - # A file object. - # Test only Content-Length attribute of request. + os.unlink(file_path) + + def test_http_body_fileobj(self): + # A file object - Content Length is calculated unless already set. + # (Note that there are some subtle differences to a regular + # file, that is why we are testing both cases.) + + h = urllib.request.AbstractHTTPHandler() + o = h.parent = MockOpener() file_obj = io.BytesIO() file_obj.write(b"Something\nSomething\nSomething\n") for headers in {}, {"Content-Length": 30}: + file_obj.seek(0) req = Request("http://example.com/", file_obj, headers) - if not headers: - # Having an iterable body without a Content-Length should - # raise an exception - self.assertRaises(ValueError, h.do_request_, req) - else: - newreq = h.do_request_(req) - self.assertEqual(int(newreq.get_header('Content-length')), 30) + newreq = h.do_request_(req) + self.assertEqual(int(newreq.get_header('Content-length')), 30) file_obj.close() + def test_http_body_pipe(self): + # A file reading from a pipe. + # A pipe cannot be seek'ed. There is no way to determine the + # content length up front. Thus, do_request_() should fall + # back to Transfer-encoding chunked. + + h = urllib.request.AbstractHTTPHandler() + o = h.parent = MockOpener() + + cmd = [sys.executable, "-c", + r"import sys; " + r"sys.stdout.buffer.write(b'Something\nSomething\nSomething\n')"] + for headers in {}, {"Content-Length": 30}: + with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc: + req = Request("http://example.com/", proc.stdout, headers) + newreq = h.do_request_(req) + if not headers: + self.assertEqual(newreq.get_header('Content-length'), None) + self.assertEqual(newreq.get_header('Transfer-encoding'), + 'chunked') + else: + self.assertEqual(int(newreq.get_header('Content-length')), + 30) + + def test_http_body_iterable(self): + # Generic iterable. There is no way to determine the content + # length up front. Fall back to Transfer-encoding chunked. + + h = urllib.request.AbstractHTTPHandler() + o = h.parent = MockOpener() + + def iterable_body(): + yield b"one" + yield b"two" + yield b"three" + + for headers in {}, {"Content-Length": 11}: + req = Request("http://example.com/", iterable_body(), headers) + newreq = h.do_request_(req) + if not headers: + self.assertEqual(newreq.get_header('Content-length'), None) + self.assertEqual(newreq.get_header('Transfer-encoding'), + 'chunked') + else: + self.assertEqual(int(newreq.get_header('Content-length')), 11) + + def test_http_body_array(self): # array.array Iterable - Content Length is calculated + h = urllib.request.AbstractHTTPHandler() + o = h.parent = MockOpener() + iterable_array = array.array("I",[1,2,3,4]) for headers in {}, {"Content-Length": 16}: |