diff options
Diffstat (limited to 'Lib/test/test_zlib.py')
| -rw-r--r-- | Lib/test/test_zlib.py | 199 |
1 files changed, 165 insertions, 34 deletions
diff --git a/Lib/test/test_zlib.py b/Lib/test/test_zlib.py index e1575c4..6fea893 100644 --- a/Lib/test/test_zlib.py +++ b/Lib/test/test_zlib.py @@ -47,16 +47,11 @@ class ChecksumTestCase(unittest.TestCase): self.assertEqual(zlib.adler32(b"", 1), 1) self.assertEqual(zlib.adler32(b"", 432), 432) - def assertEqual32(self, seen, expected): - # 32-bit values masked -- checksums on 32- vs 64- bit machines - # This is important if bit 31 (0x08000000L) is set. - self.assertEqual(seen & 0x0FFFFFFFF, expected & 0x0FFFFFFFF) - def test_penguins(self): - self.assertEqual32(zlib.crc32(b"penguin", 0), 0x0e5c1a120) - self.assertEqual32(zlib.crc32(b"penguin", 1), 0x43b6aa94) - self.assertEqual32(zlib.adler32(b"penguin", 0), 0x0bcf02f6) - self.assertEqual32(zlib.adler32(b"penguin", 1), 0x0bd602f7) + self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120) + self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94) + self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6) + self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7) self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0)) self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1)) @@ -122,11 +117,19 @@ class ExceptionTestCase(unittest.TestCase): self.assertRaises(ValueError, zlib.decompressobj().flush, 0) self.assertRaises(ValueError, zlib.decompressobj().flush, -1) + @support.cpython_only + def test_overflow(self): + with self.assertRaisesRegex(OverflowError, 'int too large'): + zlib.decompress(b'', 15, sys.maxsize + 1) + with self.assertRaisesRegex(OverflowError, 'int too large'): + zlib.decompressobj().decompress(b'', sys.maxsize + 1) + with self.assertRaisesRegex(OverflowError, 'int too large'): + zlib.decompressobj().flush(sys.maxsize + 1) + class BaseCompressTestCase(object): def check_big_compress_buffer(self, size, compress_func): _1M = 1024 * 1024 - fmt = "%%0%dx" % (2 * _1M) # Generate 10MB worth of random, and expand it by repeating it. # The assumption is that zlib's memory is not big enough to exploit # such spread out redundancy. @@ -170,7 +173,7 @@ class CompressTestCase(BaseCompressTestCase, unittest.TestCase): self.assertEqual(zlib.decompress(ob), data) def test_incomplete_stream(self): - # An useful error message is given + # A useful error message is given x = zlib.compress(HAMLET_SCENE) self.assertRaisesRegex(zlib.error, "Error -5 while decompressing data: incomplete or truncated stream", @@ -187,14 +190,27 @@ class CompressTestCase(BaseCompressTestCase, unittest.TestCase): def test_big_decompress_buffer(self, size): self.check_big_decompress_buffer(size, zlib.decompress) - @bigmemtest(size=_4G + 100, memuse=1, dry_run=False) - def test_length_overflow(self, size): + @bigmemtest(size=_4G, memuse=1) + def test_large_bufsize(self, size): + # Test decompress(bufsize) parameter greater than the internal limit + data = HAMLET_SCENE * 10 + compressed = zlib.compress(data, 1) + self.assertEqual(zlib.decompress(compressed, 15, size), data) + + def test_custom_bufsize(self): + data = HAMLET_SCENE * 10 + compressed = zlib.compress(data, 1) + self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data) + + @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') + @bigmemtest(size=_4G + 100, memuse=4) + def test_64bit_compress(self, size): data = b'x' * size try: - self.assertRaises(OverflowError, zlib.compress, data, 1) - self.assertRaises(OverflowError, zlib.decompress, data) + comp = zlib.compress(data, 0) + self.assertEqual(zlib.decompress(comp), data) finally: - data = None + comp = data = None class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): @@ -364,6 +380,21 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): self.assertRaises(ValueError, dco.decompress, b"", -1) self.assertEqual(b'', dco.unconsumed_tail) + def test_maxlen_large(self): + # Sizes up to sys.maxsize should be accepted, although zlib is + # internally limited to expressing sizes with unsigned int + data = HAMLET_SCENE * 10 + self.assertGreater(len(data), zlib.DEF_BUF_SIZE) + compressed = zlib.compress(data, 1) + dco = zlib.decompressobj() + self.assertEqual(dco.decompress(compressed, sys.maxsize), data) + + def test_maxlen_custom(self): + data = HAMLET_SCENE * 10 + compressed = zlib.compress(data, 1) + dco = zlib.decompressobj() + self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100]) + def test_clear_unconsumed_tail(self): # Issue #12050: calling decompress() without providing max_length # should clear the unconsumed_tail attribute. @@ -525,6 +556,15 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): self.assertEqual(dco.unconsumed_tail, b'') self.assertEqual(dco.unused_data, remainder) + # issue27164 + def test_decompress_raw_with_dictionary(self): + zdict = b'abcdefghijklmnopqrstuvwxyz' + co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) + comp = co.compress(zdict) + co.flush() + dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) + uncomp = dco.decompress(comp) + dco.flush() + self.assertEqual(zdict, uncomp) + def test_flush_with_freed_input(self): # Issue #16411: decompressor accesses input to last decompress() call # in flush(), even if this object has been freed in the meanwhile. @@ -537,6 +577,22 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): data = zlib.compress(input2) self.assertEqual(dco.flush(), input1[1:]) + @bigmemtest(size=_4G, memuse=1) + def test_flush_large_length(self, size): + # Test flush(length) parameter greater than internal limit UINT_MAX + input = HAMLET_SCENE * 10 + data = zlib.compress(input, 1) + dco = zlib.decompressobj() + dco.decompress(data, 1) + self.assertEqual(dco.flush(size), input[1:]) + + def test_flush_custom_length(self): + input = HAMLET_SCENE * 10 + data = zlib.compress(input, 1) + dco = zlib.decompressobj() + dco.decompress(data, 1) + self.assertEqual(dco.flush(CustomInt()), input[1:]) + @requires_Compress_copy def test_compresscopy(self): # Test copying a compression object @@ -625,16 +681,97 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): decompress = lambda s: d.decompress(s) + d.flush() self.check_big_decompress_buffer(size, decompress) - @bigmemtest(size=_4G + 100, memuse=1, dry_run=False) - def test_length_overflow(self, size): + @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') + @bigmemtest(size=_4G + 100, memuse=4) + def test_64bit_compress(self, size): data = b'x' * size - c = zlib.compressobj(1) - d = zlib.decompressobj() + co = zlib.compressobj(0) + do = zlib.decompressobj() try: - self.assertRaises(OverflowError, c.compress, data) - self.assertRaises(OverflowError, d.decompress, data) + comp = co.compress(data) + co.flush() + uncomp = do.decompress(comp) + do.flush() + self.assertEqual(uncomp, data) finally: - data = None + comp = uncomp = data = None + + @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') + @bigmemtest(size=_4G + 100, memuse=3) + def test_large_unused_data(self, size): + data = b'abcdefghijklmnop' + unused = b'x' * size + comp = zlib.compress(data) + unused + do = zlib.decompressobj() + try: + uncomp = do.decompress(comp) + do.flush() + self.assertEqual(unused, do.unused_data) + self.assertEqual(uncomp, data) + finally: + unused = comp = do = None + + @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') + @bigmemtest(size=_4G + 100, memuse=5) + def test_large_unconsumed_tail(self, size): + data = b'x' * size + do = zlib.decompressobj() + try: + comp = zlib.compress(data, 0) + uncomp = do.decompress(comp, 1) + do.flush() + self.assertEqual(uncomp, data) + self.assertEqual(do.unconsumed_tail, b'') + finally: + comp = uncomp = data = None + + def test_wbits(self): + # wbits=0 only supported since zlib v1.2.3.5 + # Register "1.2.3" as "1.2.3.0" + v = (zlib.ZLIB_RUNTIME_VERSION + ".0").split(".", 4) + supports_wbits_0 = int(v[0]) > 1 or int(v[0]) == 1 \ + and (int(v[1]) > 2 or int(v[1]) == 2 + and (int(v[2]) > 3 or int(v[2]) == 3 and int(v[3]) >= 5)) + + co = zlib.compressobj(level=1, wbits=15) + zlib15 = co.compress(HAMLET_SCENE) + co.flush() + self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE) + if supports_wbits_0: + self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE) + self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE) + with self.assertRaisesRegex(zlib.error, 'invalid window size'): + zlib.decompress(zlib15, 14) + dco = zlib.decompressobj(wbits=32 + 15) + self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE) + dco = zlib.decompressobj(wbits=14) + with self.assertRaisesRegex(zlib.error, 'invalid window size'): + dco.decompress(zlib15) + + co = zlib.compressobj(level=1, wbits=9) + zlib9 = co.compress(HAMLET_SCENE) + co.flush() + self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE) + self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE) + if supports_wbits_0: + self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE) + self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE) + dco = zlib.decompressobj(wbits=32 + 9) + self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE) + + co = zlib.compressobj(level=1, wbits=-15) + deflate15 = co.compress(HAMLET_SCENE) + co.flush() + self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE) + dco = zlib.decompressobj(wbits=-15) + self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE) + + co = zlib.compressobj(level=1, wbits=-9) + deflate9 = co.compress(HAMLET_SCENE) + co.flush() + self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE) + self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE) + dco = zlib.decompressobj(wbits=-9) + self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE) + + co = zlib.compressobj(level=1, wbits=16 + 15) + gzip = co.compress(HAMLET_SCENE) + co.flush() + self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE) + self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE) + dco = zlib.decompressobj(32 + 15) + self.assertEqual(dco.decompress(gzip), HAMLET_SCENE) def genblock(seed, length, step=1024, generator=random): @@ -725,16 +862,10 @@ LAERTES """ -def test_main(): - support.run_unittest( - VersionTestCase, - ChecksumTestCase, - ChecksumBigBufferTestCase, - ExceptionTestCase, - CompressTestCase, - CompressObjectTestCase - ) +class CustomInt: + def __int__(self): + return 100 + if __name__ == "__main__": - unittest.main() # XXX - ###test_main() + unittest.main() |
