diff options
Diffstat (limited to 'Lib/test/test_httplib.py')
| -rw-r--r-- | Lib/test/test_httplib.py | 411 | 
1 files changed, 349 insertions, 62 deletions
diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py index 4c57867..9c1550e 100644 --- a/Lib/test/test_httplib.py +++ b/Lib/test/test_httplib.py @@ -18,6 +18,26 @@ CERT_fakehostname = os.path.join(here, 'keycert2.pem')  # Self-signed cert file for self-signed.pythontest.net  CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem') +# constants for testing chunked encoding +chunked_start = ( +    'HTTP/1.1 200 OK\r\n' +    'Transfer-Encoding: chunked\r\n\r\n' +    'a\r\n' +    'hello worl\r\n' +    '3\r\n' +    'd! \r\n' +    '8\r\n' +    'and now \r\n' +    '22\r\n' +    'for something completely different\r\n' +) +chunked_expected = b'hello world! and now for something completely different' +chunk_extension = ";foo=bar" +last_chunk = "0\r\n" +last_chunk_extended = "0" + chunk_extension + "\r\n" +trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" +chunked_end = "\r\n" +  HOST = support.HOST  class FakeSocket: @@ -50,6 +70,9 @@ class FakeSocket:      def close(self):          pass +    def setsockopt(self, level, optname, value): +        pass +  class EPipeSocket(FakeSocket):      def __init__(self, text, pipe_trigger): @@ -452,20 +475,8 @@ class BasicTest(TestCase):              conn.request('POST', 'test', conn)      def test_chunked(self): -        chunked_start = ( -            'HTTP/1.1 200 OK\r\n' -            'Transfer-Encoding: chunked\r\n\r\n' -            'a\r\n' -            'hello worl\r\n' -            '3\r\n' -            'd! \r\n' -            '8\r\n' -            'and now \r\n' -            '22\r\n' -            'for something completely different\r\n' -        ) -        expected = b'hello world! and now for something completely different' -        sock = FakeSocket(chunked_start + '0\r\n') +        expected = chunked_expected +        sock = FakeSocket(chunked_start + last_chunk + chunked_end)          resp = client.HTTPResponse(sock, method="GET")          resp.begin()          self.assertEqual(resp.read(), expected) @@ -473,7 +484,7 @@ class BasicTest(TestCase):          # Various read sizes          for n in range(1, 12): -            sock = FakeSocket(chunked_start + '0\r\n') +            sock = FakeSocket(chunked_start + last_chunk + chunked_end)              resp = client.HTTPResponse(sock, method="GET")              resp.begin()              self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected) @@ -496,23 +507,12 @@ class BasicTest(TestCase):                  resp.close()      def test_readinto_chunked(self): -        chunked_start = ( -            'HTTP/1.1 200 OK\r\n' -            'Transfer-Encoding: chunked\r\n\r\n' -            'a\r\n' -            'hello worl\r\n' -            '3\r\n' -            'd! \r\n' -            '8\r\n' -            'and now \r\n' -            '22\r\n' -            'for something completely different\r\n' -        ) -        expected = b'hello world! and now for something completely different' + +        expected = chunked_expected          nexpected = len(expected)          b = bytearray(128) -        sock = FakeSocket(chunked_start + '0\r\n') +        sock = FakeSocket(chunked_start + last_chunk + chunked_end)          resp = client.HTTPResponse(sock, method="GET")          resp.begin()          n = resp.readinto(b) @@ -522,7 +522,7 @@ class BasicTest(TestCase):          # Various read sizes          for n in range(1, 12): -            sock = FakeSocket(chunked_start + '0\r\n') +            sock = FakeSocket(chunked_start + last_chunk + chunked_end)              resp = client.HTTPResponse(sock, method="GET")              resp.begin()              m = memoryview(b) @@ -558,7 +558,7 @@ class BasicTest(TestCase):              '1\r\n'              'd\r\n'          ) -        sock = FakeSocket(chunked_start + '0\r\n') +        sock = FakeSocket(chunked_start + last_chunk + chunked_end)          resp = client.HTTPResponse(sock, method="HEAD")          resp.begin()          self.assertEqual(resp.read(), b'') @@ -578,7 +578,7 @@ class BasicTest(TestCase):              '1\r\n'              'd\r\n'          ) -        sock = FakeSocket(chunked_start + '0\r\n') +        sock = FakeSocket(chunked_start + last_chunk + chunked_end)          resp = client.HTTPResponse(sock, method="HEAD")          resp.begin()          b = bytearray(5) @@ -653,6 +653,7 @@ class BasicTest(TestCase):              + '0' * 65536 + 'a\r\n'              'hello world\r\n'              '0\r\n' +            '\r\n'          )          resp = client.HTTPResponse(FakeSocket(body))          resp.begin() @@ -670,28 +671,6 @@ class BasicTest(TestCase):          resp.close()          self.assertTrue(resp.closed) -    def test_delayed_ack_opt(self): -        # Test that Nagle/delayed_ack optimistaion works correctly. - -        # For small payloads, it should coalesce the body with -        # headers, resulting in a single sendall() call -        conn = client.HTTPConnection('example.com') -        sock = FakeSocket(None) -        conn.sock = sock -        body = b'x' * (conn.mss - 1) -        conn.request('POST', '/', body) -        self.assertEqual(sock.sendall_calls, 1) - -        # For large payloads, it should send the headers and -        # then the body, resulting in more than one sendall() -        # call -        conn = client.HTTPConnection('example.com') -        sock = FakeSocket(None) -        conn.sock = sock -        body = b'x' * conn.mss -        conn.request('POST', '/', body) -        self.assertGreater(sock.sendall_calls, 1) -      def test_error_leak(self):          # Test that the socket is not leaked if getresponse() fails          conn = client.HTTPConnection('example.com') @@ -708,6 +687,239 @@ class BasicTest(TestCase):          self.assertTrue(response.closed)          self.assertTrue(conn.sock.file_closed) +    def test_chunked_extension(self): +        extra = '3;foo=bar\r\n' + 'abc\r\n' +        expected = chunked_expected + b'abc' + +        sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end) +        resp = client.HTTPResponse(sock, method="GET") +        resp.begin() +        self.assertEqual(resp.read(), expected) +        resp.close() + +    def test_chunked_missing_end(self): +        """some servers may serve up a short chunked encoding stream""" +        expected = chunked_expected +        sock = FakeSocket(chunked_start + last_chunk)  #no terminating crlf +        resp = client.HTTPResponse(sock, method="GET") +        resp.begin() +        self.assertEqual(resp.read(), expected) +        resp.close() + +    def test_chunked_trailers(self): +        """See that trailers are read and ignored""" +        expected = chunked_expected +        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end) +        resp = client.HTTPResponse(sock, method="GET") +        resp.begin() +        self.assertEqual(resp.read(), expected) +        # we should have reached the end of the file +        self.assertEqual(sock.file.read(100), b"") #we read to the end +        resp.close() + +    def test_chunked_sync(self): +        """Check that we don't read past the end of the chunked-encoding stream""" +        expected = chunked_expected +        extradata = "extradata" +        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata) +        resp = client.HTTPResponse(sock, method="GET") +        resp.begin() +        self.assertEqual(resp.read(), expected) +        # the file should now have our extradata ready to be read +        self.assertEqual(sock.file.read(100), extradata.encode("ascii")) #we read to the end +        resp.close() + +    def test_content_length_sync(self): +        """Check that we don't read past the end of the Content-Length stream""" +        extradata = "extradata" +        expected = b"Hello123\r\n" +        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello123\r\n' + extradata) +        resp = client.HTTPResponse(sock, method="GET") +        resp.begin() +        self.assertEqual(resp.read(), expected) +        # the file should now have our extradata ready to be read +        self.assertEqual(sock.file.read(100), extradata.encode("ascii")) #we read to the end +        resp.close() + +class ExtendedReadTest(TestCase): +    """ +    Test peek(), read1(), readline() +    """ +    lines = ( +        'HTTP/1.1 200 OK\r\n' +        '\r\n' +        'hello world!\n' +        'and now \n' +        'for something completely different\n' +        'foo' +        ) +    lines_expected = lines[lines.find('hello'):].encode("ascii") +    lines_chunked = ( +        'HTTP/1.1 200 OK\r\n' +        'Transfer-Encoding: chunked\r\n\r\n' +        'a\r\n' +        'hello worl\r\n' +        '3\r\n' +        'd!\n\r\n' +        '9\r\n' +        'and now \n\r\n' +        '23\r\n' +        'for something completely different\n\r\n' +        '3\r\n' +        'foo\r\n' +        '0\r\n' # terminating chunk +        '\r\n'  # end of trailers +    ) + +    def setUp(self): +        sock = FakeSocket(self.lines) +        resp = client.HTTPResponse(sock, method="GET") +        resp.begin() +        resp.fp = io.BufferedReader(resp.fp) +        self.resp = resp + + + +    def test_peek(self): +        resp = self.resp +        # patch up the buffered peek so that it returns not too much stuff +        oldpeek = resp.fp.peek +        def mypeek(n=-1): +            p = oldpeek(n) +            if n >= 0: +                return p[:n] +            return p[:10] +        resp.fp.peek = mypeek + +        all = [] +        while True: +            # try a short peek +            p = resp.peek(3) +            if p: +                self.assertGreater(len(p), 0) +                # then unbounded peek +                p2 = resp.peek() +                self.assertGreaterEqual(len(p2), len(p)) +                self.assertTrue(p2.startswith(p)) +                next = resp.read(len(p2)) +                self.assertEqual(next, p2) +            else: +                next = resp.read() +                self.assertFalse(next) +            all.append(next) +            if not next: +                break +        self.assertEqual(b"".join(all), self.lines_expected) + +    def test_readline(self): +        resp = self.resp +        self._verify_readline(self.resp.readline, self.lines_expected) + +    def _verify_readline(self, readline, expected): +        all = [] +        while True: +            # short readlines +            line = readline(5) +            if line and line != b"foo": +                if len(line) < 5: +                    self.assertTrue(line.endswith(b"\n")) +            all.append(line) +            if not line: +                break +        self.assertEqual(b"".join(all), expected) + +    def test_read1(self): +        resp = self.resp +        def r(): +            res = resp.read1(4) +            self.assertLessEqual(len(res), 4) +            return res +        readliner = Readliner(r) +        self._verify_readline(readliner.readline, self.lines_expected) + +    def test_read1_unbounded(self): +        resp = self.resp +        all = [] +        while True: +            data = resp.read1() +            if not data: +                break +            all.append(data) +        self.assertEqual(b"".join(all), self.lines_expected) + +    def test_read1_bounded(self): +        resp = self.resp +        all = [] +        while True: +            data = resp.read1(10) +            if not data: +                break +            self.assertLessEqual(len(data), 10) +            all.append(data) +        self.assertEqual(b"".join(all), self.lines_expected) + +    def test_read1_0(self): +        self.assertEqual(self.resp.read1(0), b"") + +    def test_peek_0(self): +        p = self.resp.peek(0) +        self.assertLessEqual(0, len(p)) + +class ExtendedReadTestChunked(ExtendedReadTest): +    """ +    Test peek(), read1(), readline() in chunked mode +    """ +    lines = ( +        'HTTP/1.1 200 OK\r\n' +        'Transfer-Encoding: chunked\r\n\r\n' +        'a\r\n' +        'hello worl\r\n' +        '3\r\n' +        'd!\n\r\n' +        '9\r\n' +        'and now \n\r\n' +        '23\r\n' +        'for something completely different\n\r\n' +        '3\r\n' +        'foo\r\n' +        '0\r\n' # terminating chunk +        '\r\n'  # end of trailers +    ) + + +class Readliner: +    """ +    a simple readline class that uses an arbitrary read function and buffering +    """ +    def __init__(self, readfunc): +        self.readfunc = readfunc +        self.remainder = b"" + +    def readline(self, limit): +        data = [] +        datalen = 0 +        read = self.remainder +        try: +            while True: +                idx = read.find(b'\n') +                if idx != -1: +                    break +                if datalen + len(read) >= limit: +                    idx = limit - datalen - 1 +                # read more data +                data.append(read) +                read = self.readfunc() +                if not read: +                    idx = 0 #eof condition +                    break +            idx += 1 +            data.append(read[:idx]) +            self.remainder = read[idx:] +            return b"".join(data) +        except: +            self.remainder = b"".join(data) +            raise +  class OfflineTest(TestCase):      def test_all(self): @@ -727,13 +939,74 @@ class OfflineTest(TestCase):      def test_responses(self):          self.assertEqual(client.responses[client.NOT_FOUND], "Not Found") +    def test_client_constants(self): +        # Make sure we don't break backward compatibility with 3.4 +        expected = [ +            'CONTINUE', +            'SWITCHING_PROTOCOLS', +            'PROCESSING', +            'OK', +            'CREATED', +            'ACCEPTED', +            'NON_AUTHORITATIVE_INFORMATION', +            'NO_CONTENT', +            'RESET_CONTENT', +            'PARTIAL_CONTENT', +            'MULTI_STATUS', +            'IM_USED', +            'MULTIPLE_CHOICES', +            'MOVED_PERMANENTLY', +            'FOUND', +            'SEE_OTHER', +            'NOT_MODIFIED', +            'USE_PROXY', +            'TEMPORARY_REDIRECT', +            'BAD_REQUEST', +            'UNAUTHORIZED', +            'PAYMENT_REQUIRED', +            'FORBIDDEN', +            'NOT_FOUND', +            'METHOD_NOT_ALLOWED', +            'NOT_ACCEPTABLE', +            'PROXY_AUTHENTICATION_REQUIRED', +            'REQUEST_TIMEOUT', +            'CONFLICT', +            'GONE', +            'LENGTH_REQUIRED', +            'PRECONDITION_FAILED', +            'REQUEST_ENTITY_TOO_LARGE', +            'REQUEST_URI_TOO_LONG', +            'UNSUPPORTED_MEDIA_TYPE', +            'REQUESTED_RANGE_NOT_SATISFIABLE', +            'EXPECTATION_FAILED', +            'UNPROCESSABLE_ENTITY', +            'LOCKED', +            'FAILED_DEPENDENCY', +            'UPGRADE_REQUIRED', +            'PRECONDITION_REQUIRED', +            'TOO_MANY_REQUESTS', +            'REQUEST_HEADER_FIELDS_TOO_LARGE', +            'INTERNAL_SERVER_ERROR', +            'NOT_IMPLEMENTED', +            'BAD_GATEWAY', +            'SERVICE_UNAVAILABLE', +            'GATEWAY_TIMEOUT', +            'HTTP_VERSION_NOT_SUPPORTED', +            'INSUFFICIENT_STORAGE', +            'NOT_EXTENDED', +            'NETWORK_AUTHENTICATION_REQUIRED', +        ] +        for const in expected: +            with self.subTest(constant=const): +                self.assertTrue(hasattr(client, const)) +  class SourceAddressTest(TestCase):      def setUp(self):          self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)          self.port = support.bind_port(self.serv)          self.source_port = support.find_unused_port() -        self.serv.listen(5) +        self.serv.listen()          self.conn = None      def tearDown(self): @@ -765,7 +1038,7 @@ class TimeoutTest(TestCase):      def setUp(self):          self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)          TimeoutTest.PORT = support.bind_port(self.serv) -        self.serv.listen(5) +        self.serv.listen()      def tearDown(self):          self.serv.close() @@ -1075,17 +1348,18 @@ class TunnelTests(TestCase):              'HTTP/1.1 200 OK\r\n' # Reply to HEAD              'Content-Length: 42\r\n\r\n'          ) - -        def create_connection(address, timeout=None, source_address=None): -            return FakeSocket(response_text, host=address[0], port=address[1]) -          self.host = 'proxy.com'          self.conn = client.HTTPConnection(self.host) -        self.conn._create_connection = create_connection +        self.conn._create_connection = self._create_connection(response_text)      def tearDown(self):          self.conn.close() +    def _create_connection(self, response_text): +        def create_connection(address, timeout=None, source_address=None): +            return FakeSocket(response_text, host=address[0], port=address[1]) +        return create_connection +      def test_set_tunnel_host_port_headers(self):          tunnel_host = 'destination.com'          tunnel_port = 8888 @@ -1126,13 +1400,26 @@ class TunnelTests(TestCase):          self.assertIn(b'CONNECT destination.com', self.conn.sock.data)          self.assertIn(b'Host: destination.com', self.conn.sock.data) +    def test_tunnel_debuglog(self): +        expected_header = 'X-Dummy: 1' +        response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header) + +        self.conn.set_debuglevel(1) +        self.conn._create_connection = self._create_connection(response_text) +        self.conn.set_tunnel('destination.com') + +        with support.captured_stdout() as output: +            self.conn.request('PUT', '/', '') +        lines = output.getvalue().splitlines() +        self.assertIn('header: {}'.format(expected_header), lines)  @support.reap_threads  def test_main(verbose=None):      support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,                           HTTPSTest, RequestBodyTest, SourceAddressTest, -                         HTTPResponseTest, TunnelTests) +                         HTTPResponseTest, ExtendedReadTest, +                         ExtendedReadTestChunked, TunnelTests)  if __name__ == '__main__':      test_main()  | 
