diff options
Diffstat (limited to 'Lib/test/test_httplib.py')
| -rw-r--r-- | Lib/test/test_httplib.py | 303 | 
1 files changed, 269 insertions, 34 deletions
diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py index 30b6c0c..69aa381 100644 --- a/Lib/test/test_httplib.py +++ b/Lib/test/test_httplib.py @@ -18,6 +18,26 @@ CERT_fakehostname = os.path.join(here, 'keycert2.pem')  # Root cert file (CA) for svn.python.org's cert  CACERT_svn_python_org = os.path.join(here, 'https_svn_python_org_root.pem') +# constants for testing chunked encoding +chunked_start = ( +    'HTTP/1.1 200 OK\r\n' +    'Transfer-Encoding: chunked\r\n\r\n' +    'a\r\n' +    'hello worl\r\n' +    '3\r\n' +    'd! \r\n' +    '8\r\n' +    'and now \r\n' +    '22\r\n' +    'for something completely different\r\n' +) +chunked_expected = b'hello world! and now for something completely different' +chunk_extension = ";foo=bar" +last_chunk = "0\r\n" +last_chunk_extended = "0" + chunk_extension + "\r\n" +trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" +chunked_end = "\r\n" +  HOST = support.HOST  class FakeSocket: @@ -36,7 +56,10 @@ class FakeSocket:      def makefile(self, mode, bufsize=None):          if mode != 'r' and mode != 'rb':              raise client.UnimplementedFileMode() -        return self.fileclass(self.text) +        # keep the file around so we can check how much was read from it +        self.file = self.fileclass(self.text) +        self.file.close = lambda:None #nerf close () +        return self.file  class EPipeSocket(FakeSocket): @@ -430,20 +453,8 @@ class BasicTest(TestCase):              conn.request('POST', 'test', conn)      def test_chunked(self): -        chunked_start = ( -            'HTTP/1.1 200 OK\r\n' -            'Transfer-Encoding: chunked\r\n\r\n' -            'a\r\n' -            'hello worl\r\n' -            '3\r\n' -            'd! \r\n' -            '8\r\n' -            'and now \r\n' -            '22\r\n' -            'for something completely different\r\n' -        ) -        expected = b'hello world! and now for something completely different' -        sock = FakeSocket(chunked_start + '0\r\n') +        expected = chunked_expected +        sock = FakeSocket(chunked_start + last_chunk + chunked_end)          resp = client.HTTPResponse(sock, method="GET")          resp.begin()          self.assertEqual(resp.read(), expected) @@ -451,7 +462,7 @@ class BasicTest(TestCase):          # Various read sizes          for n in range(1, 12): -            sock = FakeSocket(chunked_start + '0\r\n') +            sock = FakeSocket(chunked_start + last_chunk + chunked_end)              resp = client.HTTPResponse(sock, method="GET")              resp.begin()              self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected) @@ -474,23 +485,12 @@ class BasicTest(TestCase):                  resp.close()      def test_readinto_chunked(self): -        chunked_start = ( -            'HTTP/1.1 200 OK\r\n' -            'Transfer-Encoding: chunked\r\n\r\n' -            'a\r\n' -            'hello worl\r\n' -            '3\r\n' -            'd! \r\n' -            '8\r\n' -            'and now \r\n' -            '22\r\n' -            'for something completely different\r\n' -        ) -        expected = b'hello world! and now for something completely different' + +        expected = chunked_expected          nexpected = len(expected)          b = bytearray(128) -        sock = FakeSocket(chunked_start + '0\r\n') +        sock = FakeSocket(chunked_start + last_chunk + chunked_end)          resp = client.HTTPResponse(sock, method="GET")          resp.begin()          n = resp.readinto(b) @@ -500,7 +500,7 @@ class BasicTest(TestCase):          # Various read sizes          for n in range(1, 12): -            sock = FakeSocket(chunked_start + '0\r\n') +            sock = FakeSocket(chunked_start + last_chunk + chunked_end)              resp = client.HTTPResponse(sock, method="GET")              resp.begin()              m = memoryview(b) @@ -536,7 +536,7 @@ class BasicTest(TestCase):              '1\r\n'              'd\r\n'          ) -        sock = FakeSocket(chunked_start + '0\r\n') +        sock = FakeSocket(chunked_start + last_chunk + chunked_end)          resp = client.HTTPResponse(sock, method="HEAD")          resp.begin()          self.assertEqual(resp.read(), b'') @@ -556,7 +556,7 @@ class BasicTest(TestCase):              '1\r\n'              'd\r\n'          ) -        sock = FakeSocket(chunked_start + '0\r\n') +        sock = FakeSocket(chunked_start + last_chunk + chunked_end)          resp = client.HTTPResponse(sock, method="HEAD")          resp.begin()          b = bytearray(5) @@ -631,6 +631,7 @@ class BasicTest(TestCase):              + '0' * 65536 + 'a\r\n'              'hello world\r\n'              '0\r\n' +            '\r\n'          )          resp = client.HTTPResponse(FakeSocket(body))          resp.begin() @@ -670,6 +671,239 @@ class BasicTest(TestCase):          conn.request('POST', '/', body)          self.assertGreater(sock.sendall_calls, 1) +    def test_chunked_extension(self): +        extra = '3;foo=bar\r\n' + 'abc\r\n' +        expected = chunked_expected + b'abc' + +        sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end) +        resp = client.HTTPResponse(sock, method="GET") +        resp.begin() +        self.assertEqual(resp.read(), expected) +        resp.close() + +    def test_chunked_missing_end(self): +        """some servers may serve up a short chunked encoding stream""" +        expected = chunked_expected +        sock = FakeSocket(chunked_start + last_chunk)  #no terminating crlf +        resp = client.HTTPResponse(sock, method="GET") +        resp.begin() +        self.assertEqual(resp.read(), expected) +        resp.close() + +    def test_chunked_trailers(self): +        """See that trailers are read and ignored""" +        expected = chunked_expected +        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end) +        resp = client.HTTPResponse(sock, method="GET") +        resp.begin() +        self.assertEqual(resp.read(), expected) +        # we should have reached the end of the file +        self.assertEqual(sock.file.read(100), b"") #we read to the end +        resp.close() + +    def test_chunked_sync(self): +        """Check that we don't read past the end of the chunked-encoding stream""" +        expected = chunked_expected +        extradata = "extradata" +        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata) +        resp = client.HTTPResponse(sock, method="GET") +        resp.begin() +        self.assertEqual(resp.read(), expected) +        # the file should now have our extradata ready to be read +        self.assertEqual(sock.file.read(100), extradata.encode("ascii")) #we read to the end +        resp.close() + +    def test_content_length_sync(self): +        """Check that we don't read past the end of the Content-Length stream""" +        extradata = "extradata" +        expected = b"Hello123\r\n" +        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello123\r\n' + extradata) +        resp = client.HTTPResponse(sock, method="GET") +        resp.begin() +        self.assertEqual(resp.read(), expected) +        # the file should now have our extradata ready to be read +        self.assertEqual(sock.file.read(100), extradata.encode("ascii")) #we read to the end +        resp.close() + +class ExtendedReadTest(TestCase): +    """ +    Test peek(), read1(), readline() +    """ +    lines = ( +        'HTTP/1.1 200 OK\r\n' +        '\r\n' +        'hello world!\n' +        'and now \n' +        'for something completely different\n' +        'foo' +        ) +    lines_expected = lines[lines.find('hello'):].encode("ascii") +    lines_chunked = ( +        'HTTP/1.1 200 OK\r\n' +        'Transfer-Encoding: chunked\r\n\r\n' +        'a\r\n' +        'hello worl\r\n' +        '3\r\n' +        'd!\n\r\n' +        '9\r\n' +        'and now \n\r\n' +        '23\r\n' +        'for something completely different\n\r\n' +        '3\r\n' +        'foo\r\n' +        '0\r\n' # terminating chunk +        '\r\n'  # end of trailers +    ) + +    def setUp(self): +        sock = FakeSocket(self.lines) +        resp = client.HTTPResponse(sock, method="GET") +        resp.begin() +        resp.fp = io.BufferedReader(resp.fp) +        self.resp = resp + + + +    def test_peek(self): +        resp = self.resp +        # patch up the buffered peek so that it returns not too much stuff +        oldpeek = resp.fp.peek +        def mypeek(n=-1): +            p = oldpeek(n) +            if n >= 0: +                return p[:n] +            return p[:10] +        resp.fp.peek = mypeek + +        all = [] +        while True: +            # try a short peek +            p = resp.peek(3) +            if p: +                self.assertGreater(len(p), 0) +                # then unbounded peek +                p2 = resp.peek() +                self.assertGreaterEqual(len(p2), len(p)) +                self.assertTrue(p2.startswith(p)) +                next = resp.read(len(p2)) +                self.assertEqual(next, p2) +            else: +                next = resp.read() +                self.assertFalse(next) +            all.append(next) +            if not next: +                break +        self.assertEqual(b"".join(all), self.lines_expected) + +    def test_readline(self): +        resp = self.resp +        self._verify_readline(self.resp.readline, self.lines_expected) + +    def _verify_readline(self, readline, expected): +        all = [] +        while True: +            # short readlines +            line = readline(5) +            if line and line != b"foo": +                if len(line) < 5: +                    self.assertTrue(line.endswith(b"\n")) +            all.append(line) +            if not line: +                break +        self.assertEqual(b"".join(all), expected) + +    def test_read1(self): +        resp = self.resp +        def r(): +            res = resp.read1(4) +            self.assertLessEqual(len(res), 4) +            return res +        readliner = Readliner(r) +        self._verify_readline(readliner.readline, self.lines_expected) + +    def test_read1_unbounded(self): +        resp = self.resp +        all = [] +        while True: +            data = resp.read1() +            if not data: +                break +            all.append(data) +        self.assertEqual(b"".join(all), self.lines_expected) + +    def test_read1_bounded(self): +        resp = self.resp +        all = [] +        while True: +            data = resp.read1(10) +            if not data: +                break +            self.assertLessEqual(len(data), 10) +            all.append(data) +        self.assertEqual(b"".join(all), self.lines_expected) + +    def test_read1_0(self): +        self.assertEqual(self.resp.read1(0), b"") + +    def test_peek_0(self): +        p = self.resp.peek(0) +        self.assertLessEqual(0, len(p)) + +class ExtendedReadTestChunked(ExtendedReadTest): +    """ +    Test peek(), read1(), readline() in chunked mode +    """ +    lines = ( +        'HTTP/1.1 200 OK\r\n' +        'Transfer-Encoding: chunked\r\n\r\n' +        'a\r\n' +        'hello worl\r\n' +        '3\r\n' +        'd!\n\r\n' +        '9\r\n' +        'and now \n\r\n' +        '23\r\n' +        'for something completely different\n\r\n' +        '3\r\n' +        'foo\r\n' +        '0\r\n' # terminating chunk +        '\r\n'  # end of trailers +    ) + + +class Readliner: +    """ +    a simple readline class that uses an arbitrary read function and buffering +    """ +    def __init__(self, readfunc): +        self.readfunc = readfunc +        self.remainder = b"" + +    def readline(self, limit): +        data = [] +        datalen = 0 +        read = self.remainder +        try: +            while True: +                idx = read.find(b'\n') +                if idx != -1: +                    break +                if datalen + len(read) >= limit: +                    idx = limit - datalen - 1 +                # read more data +                data.append(read) +                read = self.readfunc() +                if not read: +                    idx = 0 #eof condition +                    break +            idx += 1 +            data.append(read[:idx]) +            self.remainder = read[idx:] +            return b"".join(data) +        except: +            self.remainder = b"".join(data) +            raise +  class OfflineTest(TestCase):      def test_responses(self):          self.assertEqual(client.responses[client.NOT_FOUND], "Not Found") @@ -973,7 +1207,8 @@ class HTTPResponseTest(TestCase):  def test_main(verbose=None):      support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,                           HTTPSTest, RequestBodyTest, SourceAddressTest, -                         HTTPResponseTest) +                         HTTPResponseTest, ExtendedReadTest, +                         ExtendedReadTestChunked)  if __name__ == '__main__':      test_main()  | 
