diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2010-05-07 17:08:54 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2010-05-07 17:08:54 (GMT) |
commit | 4b3fe14d4bf56be8fb3e922052f0da65b6948a31 (patch) | |
tree | a5cded081d9f1a53ebf2536f8da74d371d52d3e9 /Lib/test | |
parent | c5852813b8cba1cdd86f6a9f8a25239180f8c3be (diff) | |
download | cpython-4b3fe14d4bf56be8fb3e922052f0da65b6948a31.zip cpython-4b3fe14d4bf56be8fb3e922052f0da65b6948a31.tar.gz cpython-4b3fe14d4bf56be8fb3e922052f0da65b6948a31.tar.bz2 |
Merged revisions 80928 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/branches/py3k
................
r80928 | antoine.pitrou | 2010-05-07 19:04:02 +0200 (ven., 07 mai 2010) | 11 lines
Merged revisions 80926 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r80926 | antoine.pitrou | 2010-05-07 18:50:34 +0200 (ven., 07 mai 2010) | 5 lines
Issue #8571: Fix an internal error when compressing or decompressing a
chunk larger than 1GB with the zlib module's compressor and decompressor
objects.
........
................
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/test_zlib.py | 63 |
1 files changed, 60 insertions, 3 deletions
diff --git a/Lib/test/test_zlib.py b/Lib/test/test_zlib.py index c6bdda1..1b13274 100644 --- a/Lib/test/test_zlib.py +++ b/Lib/test/test_zlib.py @@ -2,6 +2,7 @@ import unittest from test import support import binascii import random +from test.support import precisionbigmemtest, _1G zlib = support.import_module('zlib') @@ -93,8 +94,39 @@ class ExceptionTestCase(unittest.TestCase): self.assertRaises(ValueError, zlib.decompressobj().flush, -1) - -class CompressTestCase(unittest.TestCase): +class BaseCompressTestCase(object): + def check_big_compress_buffer(self, size, compress_func): + _1M = 1024 * 1024 + fmt = "%%0%dx" % (2 * _1M) + # Generate 10MB worth of random, and expand it by repeating it. + # The assumption is that zlib's memory is not big enough to exploit + # such spread out redundancy. + data = b''.join([random.getrandbits(8 * _1M).to_bytes(_1M, 'little') + for i in range(10)]) + data = data * (size // len(data) + 1) + try: + compress_func(data) + finally: + # Release memory + data = None + + def check_big_decompress_buffer(self, size, decompress_func): + data = b'x' * size + try: + compressed = zlib.compress(data, 1) + finally: + # Release memory + data = None + data = decompress_func(compressed) + # Sanity check + try: + self.assertEqual(len(data), size) + self.assertEqual(len(data.strip(b'x')), 0) + finally: + data = None + + +class CompressTestCase(BaseCompressTestCase, unittest.TestCase): # Test compression in one go (whole message compression) def test_speech(self): x = zlib.compress(HAMLET_SCENE) @@ -108,9 +140,19 @@ class CompressTestCase(unittest.TestCase): for ob in x, bytearray(x): self.assertEqual(zlib.decompress(ob), data) + # Memory use of the following functions takes into account overallocation + + @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3) + def test_big_compress_buffer(self, size): + compress = lambda s: zlib.compress(s, 1) + self.check_big_compress_buffer(size, compress) + @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2) + def test_big_decompress_buffer(self, size): + self.check_big_decompress_buffer(size, zlib.decompress) -class CompressObjectTestCase(unittest.TestCase): + +class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): # Test compression object def test_pair(self): # straightforward compress/decompress objects @@ -399,6 +441,21 @@ class CompressObjectTestCase(unittest.TestCase): d.flush() self.assertRaises(ValueError, d.copy) + # Memory use of the following functions takes into account overallocation + + @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3) + def test_big_compress_buffer(self, size): + c = zlib.compressobj(1) + compress = lambda s: c.compress(s) + c.flush() + self.check_big_compress_buffer(size, compress) + + @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2) + def test_big_decompress_buffer(self, size): + d = zlib.decompressobj() + decompress = lambda s: d.decompress(s) + d.flush() + self.check_big_decompress_buffer(size, decompress) + + def genblock(seed, length, step=1024, generator=random): """length-byte stream of random data from a seed (in step-byte blocks).""" if seed is not None: |