summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_httplib.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_httplib.py')
-rw-r--r--Lib/test/test_httplib.py151
1 files changed, 147 insertions, 4 deletions
diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py
index 1768a34..a179612 100644
--- a/Lib/test/test_httplib.py
+++ b/Lib/test/test_httplib.py
@@ -314,6 +314,124 @@ class HeaderTests(TestCase):
conn.putheader(name, value)
+class TransferEncodingTest(TestCase):
+ expected_body = b"It's just a flesh wound"
+
+ def test_endheaders_chunked(self):
+ conn = client.HTTPConnection('example.com')
+ conn.sock = FakeSocket(b'')
+ conn.putrequest('POST', '/')
+ conn.endheaders(self._make_body(), encode_chunked=True)
+
+ _, _, body = self._parse_request(conn.sock.data)
+ body = self._parse_chunked(body)
+ self.assertEqual(body, self.expected_body)
+
+ def test_explicit_headers(self):
+ # explicit chunked
+ conn = client.HTTPConnection('example.com')
+ conn.sock = FakeSocket(b'')
+ # this shouldn't actually be automatically chunk-encoded because the
+ # calling code has explicitly stated that it's taking care of it
+ conn.request(
+ 'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
+
+ _, headers, body = self._parse_request(conn.sock.data)
+ self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
+ self.assertEqual(headers['Transfer-Encoding'], 'chunked')
+ self.assertEqual(body, self.expected_body)
+
+ # explicit chunked, string body
+ conn = client.HTTPConnection('example.com')
+ conn.sock = FakeSocket(b'')
+ conn.request(
+ 'POST', '/', self.expected_body.decode('latin-1'),
+ {'Transfer-Encoding': 'chunked'})
+
+ _, headers, body = self._parse_request(conn.sock.data)
+ self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
+ self.assertEqual(headers['Transfer-Encoding'], 'chunked')
+ self.assertEqual(body, self.expected_body)
+
+ # User-specified TE, but request() does the chunk encoding
+ conn = client.HTTPConnection('example.com')
+ conn.sock = FakeSocket(b'')
+ conn.request('POST', '/',
+ headers={'Transfer-Encoding': 'gzip, chunked'},
+ encode_chunked=True,
+ body=self._make_body())
+ _, headers, body = self._parse_request(conn.sock.data)
+ self.assertNotIn('content-length', [k.lower() for k in headers])
+ self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
+ self.assertEqual(self._parse_chunked(body), self.expected_body)
+
+ def test_request(self):
+ for empty_lines in (False, True,):
+ conn = client.HTTPConnection('example.com')
+ conn.sock = FakeSocket(b'')
+ conn.request(
+ 'POST', '/', self._make_body(empty_lines=empty_lines))
+
+ _, headers, body = self._parse_request(conn.sock.data)
+ body = self._parse_chunked(body)
+ self.assertEqual(body, self.expected_body)
+ self.assertEqual(headers['Transfer-Encoding'], 'chunked')
+
+ # Content-Length and Transfer-Encoding SHOULD not be sent in the
+ # same request
+ self.assertNotIn('content-length', [k.lower() for k in headers])
+
+ def _make_body(self, empty_lines=False):
+ lines = self.expected_body.split(b' ')
+ for idx, line in enumerate(lines):
+ # for testing handling empty lines
+ if empty_lines and idx % 2:
+ yield b''
+ if idx < len(lines) - 1:
+ yield line + b' '
+ else:
+ yield line
+
+ def _parse_request(self, data):
+ lines = data.split(b'\r\n')
+ request = lines[0]
+ headers = {}
+ n = 1
+ while n < len(lines) and len(lines[n]) > 0:
+ key, val = lines[n].split(b':')
+ key = key.decode('latin-1').strip()
+ headers[key] = val.decode('latin-1').strip()
+ n += 1
+
+ return request, headers, b'\r\n'.join(lines[n + 1:])
+
+ def _parse_chunked(self, data):
+ body = []
+ trailers = {}
+ n = 0
+ lines = data.split(b'\r\n')
+ # parse body
+ while True:
+ size, chunk = lines[n:n+2]
+ size = int(size, 16)
+
+ if size == 0:
+ n += 1
+ break
+
+ self.assertEqual(size, len(chunk))
+ body.append(chunk)
+
+ n += 2
+ # we /should/ hit the end chunk, but check against the size of
+ # lines so we're not stuck in an infinite loop should we get
+ # malformed data
+ if n > len(lines):
+ break
+
+ return b''.join(body)
+
+
class BasicTest(TestCase):
def test_status_lines(self):
# Test HTTP status lines
@@ -564,11 +682,11 @@ class BasicTest(TestCase):
yield None
yield 'data_two'
- class UpdatingFile():
+ class UpdatingFile(io.TextIOBase):
mode = 'r'
d = data()
def read(self, blocksize=-1):
- return self.d.__next__()
+ return next(self.d)
expected = b'data'
@@ -1546,6 +1664,26 @@ class RequestBodyTest(TestCase):
message = client.parse_headers(f)
return message, f
+ def test_list_body(self):
+ # Note that no content-length is automatically calculated for
+ # an iterable. The request will fall back to send chunked
+ # transfer encoding.
+ cases = (
+ ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
+ ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
+ )
+ for body, expected in cases:
+ with self.subTest(body):
+ self.conn = client.HTTPConnection('example.com')
+ self.conn.sock = self.sock = FakeSocket('')
+
+ self.conn.request('PUT', '/url', body)
+ msg, f = self.get_headers_and_fp()
+ self.assertNotIn('Content-Type', msg)
+ self.assertNotIn('Content-Length', msg)
+ self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
+ self.assertEqual(expected, f.read())
+
def test_manual_content_length(self):
# Set an incorrect content-length so that we can verify that
# it will not be over-ridden by the library.
@@ -1588,8 +1726,13 @@ class RequestBodyTest(TestCase):
message, f = self.get_headers_and_fp()
self.assertEqual("text/plain", message.get_content_type())
self.assertIsNone(message.get_charset())
- self.assertEqual("4", message.get("content-length"))
- self.assertEqual(b'body', f.read())
+ # Note that the length of text files is unpredictable
+ # because it depends on character encoding and line ending
+ # translation. No content-length will be set, the body
+ # will be sent using chunked transfer encoding.
+ self.assertIsNone(message.get("content-length"))
+ self.assertEqual("chunked", message.get("transfer-encoding"))
+ self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
def test_binary_file_body(self):
self.addCleanup(support.unlink, support.TESTFN)