diff options
Diffstat (limited to 'Lib/test/test_httplib.py')
-rw-r--r-- | Lib/test/test_httplib.py | 151 |
1 files changed, 147 insertions, 4 deletions
diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py index 1768a34..a179612 100644 --- a/Lib/test/test_httplib.py +++ b/Lib/test/test_httplib.py @@ -314,6 +314,124 @@ class HeaderTests(TestCase): conn.putheader(name, value) +class TransferEncodingTest(TestCase): + expected_body = b"It's just a flesh wound" + + def test_endheaders_chunked(self): + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + conn.putrequest('POST', '/') + conn.endheaders(self._make_body(), encode_chunked=True) + + _, _, body = self._parse_request(conn.sock.data) + body = self._parse_chunked(body) + self.assertEqual(body, self.expected_body) + + def test_explicit_headers(self): + # explicit chunked + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + # this shouldn't actually be automatically chunk-encoded because the + # calling code has explicitly stated that it's taking care of it + conn.request( + 'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'}) + + _, headers, body = self._parse_request(conn.sock.data) + self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) + self.assertEqual(headers['Transfer-Encoding'], 'chunked') + self.assertEqual(body, self.expected_body) + + # explicit chunked, string body + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + conn.request( + 'POST', '/', self.expected_body.decode('latin-1'), + {'Transfer-Encoding': 'chunked'}) + + _, headers, body = self._parse_request(conn.sock.data) + self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) + self.assertEqual(headers['Transfer-Encoding'], 'chunked') + self.assertEqual(body, self.expected_body) + + # User-specified TE, but request() does the chunk encoding + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + conn.request('POST', '/', + headers={'Transfer-Encoding': 'gzip, chunked'}, + encode_chunked=True, + body=self._make_body()) + _, headers, body = self._parse_request(conn.sock.data) + self.assertNotIn('content-length', [k.lower() for k in headers]) + self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked') + self.assertEqual(self._parse_chunked(body), self.expected_body) + + def test_request(self): + for empty_lines in (False, True,): + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + conn.request( + 'POST', '/', self._make_body(empty_lines=empty_lines)) + + _, headers, body = self._parse_request(conn.sock.data) + body = self._parse_chunked(body) + self.assertEqual(body, self.expected_body) + self.assertEqual(headers['Transfer-Encoding'], 'chunked') + + # Content-Length and Transfer-Encoding SHOULD not be sent in the + # same request + self.assertNotIn('content-length', [k.lower() for k in headers]) + + def _make_body(self, empty_lines=False): + lines = self.expected_body.split(b' ') + for idx, line in enumerate(lines): + # for testing handling empty lines + if empty_lines and idx % 2: + yield b'' + if idx < len(lines) - 1: + yield line + b' ' + else: + yield line + + def _parse_request(self, data): + lines = data.split(b'\r\n') + request = lines[0] + headers = {} + n = 1 + while n < len(lines) and len(lines[n]) > 0: + key, val = lines[n].split(b':') + key = key.decode('latin-1').strip() + headers[key] = val.decode('latin-1').strip() + n += 1 + + return request, headers, b'\r\n'.join(lines[n + 1:]) + + def _parse_chunked(self, data): + body = [] + trailers = {} + n = 0 + lines = data.split(b'\r\n') + # parse body + while True: + size, chunk = lines[n:n+2] + size = int(size, 16) + + if size == 0: + n += 1 + break + + self.assertEqual(size, len(chunk)) + body.append(chunk) + + n += 2 + # we /should/ hit the end chunk, but check against the size of + # lines so we're not stuck in an infinite loop should we get + # malformed data + if n > len(lines): + break + + return b''.join(body) + + class BasicTest(TestCase): def test_status_lines(self): # Test HTTP status lines @@ -564,11 +682,11 @@ class BasicTest(TestCase): yield None yield 'data_two' - class UpdatingFile(): + class UpdatingFile(io.TextIOBase): mode = 'r' d = data() def read(self, blocksize=-1): - return self.d.__next__() + return next(self.d) expected = b'data' @@ -1546,6 +1664,26 @@ class RequestBodyTest(TestCase): message = client.parse_headers(f) return message, f + def test_list_body(self): + # Note that no content-length is automatically calculated for + # an iterable. The request will fall back to send chunked + # transfer encoding. + cases = ( + ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), + ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), + ) + for body, expected in cases: + with self.subTest(body): + self.conn = client.HTTPConnection('example.com') + self.conn.sock = self.sock = FakeSocket('') + + self.conn.request('PUT', '/url', body) + msg, f = self.get_headers_and_fp() + self.assertNotIn('Content-Type', msg) + self.assertNotIn('Content-Length', msg) + self.assertEqual(msg.get('Transfer-Encoding'), 'chunked') + self.assertEqual(expected, f.read()) + def test_manual_content_length(self): # Set an incorrect content-length so that we can verify that # it will not be over-ridden by the library. @@ -1588,8 +1726,13 @@ class RequestBodyTest(TestCase): message, f = self.get_headers_and_fp() self.assertEqual("text/plain", message.get_content_type()) self.assertIsNone(message.get_charset()) - self.assertEqual("4", message.get("content-length")) - self.assertEqual(b'body', f.read()) + # Note that the length of text files is unpredictable + # because it depends on character encoding and line ending + # translation. No content-length will be set, the body + # will be sent using chunked transfer encoding. + self.assertIsNone(message.get("content-length")) + self.assertEqual("chunked", message.get("transfer-encoding")) + self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read()) def test_binary_file_body(self): self.addCleanup(support.unlink, support.TESTFN) |