diff options
author | Guido van Rossum <guido@python.org> | 2003-02-03 20:45:52 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2003-02-03 20:45:52 (GMT) |
commit | 7d9ea5013f2f6122aa83a68429bf2dd5e5a00017 (patch) | |
tree | fd7852cc158ec0ffd49a320515b380b2dafe6798 /Lib/test/test_zlib.py | |
parent | 94c30c012431c8495c73850a4438b0b7a3a2b9d4 (diff) | |
download | cpython-7d9ea5013f2f6122aa83a68429bf2dd5e5a00017.zip cpython-7d9ea5013f2f6122aa83a68429bf2dd5e5a00017.tar.gz cpython-7d9ea5013f2f6122aa83a68429bf2dd5e5a00017.tar.bz2 |
- Thanks to Scott David Daniels, a subtle bug in how the zlib
extension implemented flush() was fixed. Scott also rewrite the
zlib test suite using the unittest module. (SF bug #640230 and
patch #678531.)
Backport candidate I think.
Diffstat (limited to 'Lib/test/test_zlib.py')
-rw-r--r-- | Lib/test/test_zlib.py | 607 |
1 files changed, 444 insertions, 163 deletions
diff --git a/Lib/test/test_zlib.py b/Lib/test/test_zlib.py index b7d99c3..dc41e63 100644 --- a/Lib/test/test_zlib.py +++ b/Lib/test/test_zlib.py @@ -1,169 +1,419 @@ +import unittest +from test import test_support import zlib -import sys -import imp -from test.test_support import TestFailed - -try: - t = imp.find_module('test_zlib') - file = t[0] -except ImportError: - file = open(__file__) -buf = file.read() * 8 -file.close() - -# test the checksums (hex so the test doesn't break on 64-bit machines) -def fix(x): - return "0x%x" % (x & 0xffffffffL) -print fix(zlib.crc32('penguin')), fix(zlib.crc32('penguin', 1)) -print fix(zlib.adler32('penguin')), fix(zlib.adler32('penguin', 1)) - -# make sure we generate some expected errors -try: - zlib.compress('ERROR', zlib.MAX_WBITS + 1) -except zlib.error, msg: - print "expecting", msg -try: - zlib.compressobj(1, 8, 0) -except ValueError, msg: - print "expecting", msg -try: - zlib.decompressobj(0) -except ValueError, msg: - print "expecting", msg - -x = zlib.compress(buf) -y = zlib.decompress(x) -if buf != y: - print "normal compression/decompression failed" -else: - print "normal compression/decompression succeeded" - -buf = buf * 16 - -co = zlib.compressobj(8, 8, -15) -x1 = co.compress(buf) -x2 = co.flush() -try: - co.flush() - print "Oops - second flush worked when it should not have!" -except zlib.error: - pass - -x = x1 + x2 - -dc = zlib.decompressobj(-15) -y1 = dc.decompress(x) -y2 = dc.flush() -y = y1 + y2 -if buf != y: - print "compress/decompression obj failed" -else: - print "compress/decompression obj succeeded" - -co = zlib.compressobj(2, 8, -12, 9, 1) -bufs = [] -for i in range(0, len(buf), 256): - bufs.append(co.compress(buf[i:i+256])) -bufs.append(co.flush()) -combuf = ''.join(bufs) - -decomp1 = zlib.decompress(combuf, -12, -5) -if decomp1 != buf: - print "decompress with init options failed" -else: - print "decompress with init options succeeded" - -deco = zlib.decompressobj(-12) -bufs = [] -for i in range(0, len(combuf), 128): - bufs.append(deco.decompress(combuf[i:i+128])) -bufs.append(deco.flush()) -decomp2 = ''.join(bufs) -if decomp2 != buf: - print "decompressobj with init options failed" -else: - print "decompressobj with init options succeeded" - -print "should be '':", `deco.unconsumed_tail` - -# Check a decompression object with max_length specified -deco = zlib.decompressobj(-12) -cb = combuf -bufs = [] -while cb: - max_length = 1 + len(cb)/10 - chunk = deco.decompress(cb, max_length) - if len(chunk) > max_length: - print 'chunk too big (%d>%d)' % (len(chunk),max_length) - bufs.append(chunk) - cb = deco.unconsumed_tail -bufs.append(deco.flush()) -decomp2 = ''.join(buf) -if decomp2 != buf: - print "max_length decompressobj failed" -else: - print "max_length decompressobj succeeded" - -# Misc tests of max_length -deco = zlib.decompressobj(-12) -try: - deco.decompress("", -1) -except ValueError: - pass -else: - print "failed to raise value error on bad max_length" -print "unconsumed_tail should be '':", `deco.unconsumed_tail` - -# Test flush() with the various options, using all the different levels -# in order to provide more variations. -sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] -sync_opt = [getattr(zlib, opt) for opt in sync_opt if hasattr(zlib, opt)] - -for sync in sync_opt: - for level in range(10): - obj = zlib.compressobj( level ) - d = obj.compress( buf[:3000] ) - d = d + obj.flush( sync ) - d = d + obj.compress( buf[3000:] ) - d = d + obj.flush() - if zlib.decompress(d) != buf: - print "Decompress failed: flush mode=%i, level=%i" % (sync,level) - del obj - -# Test for the odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 - import random -random.seed(1) - -print 'Testing on 17K of random data' - -if hasattr(zlib, 'Z_SYNC_FLUSH'): - - # Create compressor and decompressor objects - c=zlib.compressobj(9) - d=zlib.decompressobj() - - # Try 17K of data - # generate random data stream - a="" - for i in range(17*1024): - a=a+chr(random.randint(0,255)) - - # compress, sync-flush, and decompress - t = d.decompress( c.compress(a)+c.flush(zlib.Z_SYNC_FLUSH) ) - # if decompressed data is different from the input data, choke. - if len(t) != len(a): - print len(a),len(t),len(d.unused_data) - raise TestFailed, "output of 17K doesn't match" - -def ignore(): - """An empty function with a big string. - - Make the compression algorithm work a little harder. - """ - - """ +# print test_support.TESTFN + +def getbuf(): + # This was in the original. Avoid non-repeatable sources. + # Left here (unused) in case something wants to be done with it. + import imp + try: + t = imp.find_module('test_zlib') + file = t[0] + except ImportError: + file = open(__file__) + buf = file.read() * 8 + file.close() + return buf + + + +class ChecksumTestCase(unittest.TestCase): + # checksum test cases + def test_crc32start(self): + self.assertEqual(zlib.crc32(""), zlib.crc32("", 0)) + + def test_crc32empty(self): + self.assertEqual(zlib.crc32("", 0), 0) + self.assertEqual(zlib.crc32("", 1), 1) + self.assertEqual(zlib.crc32("", 432), 432) + + def test_adler32start(self): + self.assertEqual(zlib.adler32(""), zlib.adler32("", 1)) + + def test_adler32empty(self): + self.assertEqual(zlib.adler32("", 0), 0) + self.assertEqual(zlib.adler32("", 1), 1) + self.assertEqual(zlib.adler32("", 432), 432) + + def assertEqual32(self, seen, expected): + # 32-bit values masked -- checksums on 32- vs 64- bit machines + # This is important if bit 31 (0x08000000L) is set. + self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL) + + def test_penguins(self): + self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L) + self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94) + self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6) + self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7) + + self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0)) + self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1)) + + + +class ExceptionTestCase(unittest.TestCase): + # make sure we generate some expected errors + def test_bigbits(self): + # specifying total bits too large causes an error + self.assertRaises(zlib.error, + zlib.compress, 'ERROR', zlib.MAX_WBITS + 1) + + def test_badcompressobj(self): + # verify failure on building compress object with bad params + self.assertRaises(ValueError, zlib.compressobj, 1, 8, 0) + + def test_baddecompressobj(self): + # verify failure on building decompress object with bad params + self.assertRaises(ValueError, zlib.decompressobj, 0) + + + +class CompressTestCase(unittest.TestCase): + # Test compression in one go (whole message compression) + def test_speech(self): + # decompress(compress(data)) better be data + x = zlib.compress(hamlet_scene) + self.assertEqual(zlib.decompress(x), hamlet_scene) + + def test_speech8(self): + # decompress(compress(data)) better be data -- more compression chances + data = hamlet_scene * 8 + x = zlib.compress(data) + self.assertEqual(zlib.decompress(x), data) + + def test_speech16(self): + # decompress(compress(data)) better be data -- more compression chances + data = hamlet_scene * 16 + x = zlib.compress(data) + self.assertEqual(zlib.decompress(x), data) + + def test_speech128(self): + # decompress(compress(data)) better be data -- more compression chances + data = hamlet_scene * 8 * 16 + x = zlib.compress(data) + self.assertEqual(zlib.decompress(x), data) + + def test_monotonic(self): + # higher compression levels should not expand compressed size + data = hamlet_scene * 8 * 16 + last = length = len(zlib.compress(data, 0)) + self.failUnless(last > len(data), "compress level 0 always expands") + for level in range(10): + length = len(zlib.compress(data, level)) + self.failUnless(length <= last, + 'compress level %d more effective than %d!' % ( + level-1, level)) + last = length + + + +class CompressObjectTestCase(unittest.TestCase): + # Test compression object + def test_pairsmall(self): + # use compress object in straightforward manner, decompress w/ object + data = hamlet_scene + co = zlib.compressobj(8, 8, -15) + x1 = co.compress(data) + x2 = co.flush() + self.assertRaises(zlib.error, co.flush) # second flush should not work + dco = zlib.decompressobj(-15) + y1 = dco.decompress(x1 + x2) + y2 = dco.flush() + self.assertEqual(data, y1 + y2) + + def test_pair(self): + # straightforward compress/decompress objects, more compression + data = hamlet_scene * 8 * 16 + co = zlib.compressobj(8, 8, -15) + x1 = co.compress(data) + x2 = co.flush() + self.assertRaises(zlib.error, co.flush) # second flush should not work + dco = zlib.decompressobj(-15) + y1 = dco.decompress(x1 + x2) + y2 = dco.flush() + self.assertEqual(data, y1 + y2) + + def test_compressincremental(self): + # compress object in steps, decompress object as one-shot + data = hamlet_scene * 8 * 16 + co = zlib.compressobj(2, 8, -12, 9, 1) + bufs = [] + for i in range(0, len(data), 256): + bufs.append(co.compress(data[i:i+256])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + + dco = zlib.decompressobj(-15) + y1 = dco.decompress(''.join(bufs)) + y2 = dco.flush() + self.assertEqual(data, y1 + y2) + + def test_decompressincremental(self): + # compress object in steps, decompress object in steps + data = hamlet_scene * 8 * 16 + co = zlib.compressobj(2, 8, -12, 9, 1) + bufs = [] + for i in range(0, len(data), 256): + bufs.append(co.compress(data[i:i+256])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + + self.assertEqual(data, zlib.decompress(combuf, -12, -5)) + + dco = zlib.decompressobj(-12) + bufs = [] + for i in range(0, len(combuf), 128): + bufs.append(dco.decompress(combuf[i:i+128])) + self.assertEqual('', dco.unconsumed_tail, ######## + "(A) uct should be '': not %d long" % + len(dco.unconsumed_tail)) + bufs.append(dco.flush()) + self.assertEqual('', dco.unconsumed_tail, ######## + "(B) uct should be '': not %d long" % + len(dco.unconsumed_tail)) + self.assertEqual(data, ''.join(bufs)) + # Failure means: "decompressobj with init options failed" + + def test_decompinc(self,sizes=[128],flush=True,source=None,cx=256,dcx=64): + # compress object in steps, decompress object in steps, loop sizes + source = source or hamlet_scene + for reps in sizes: + data = source * reps + co = zlib.compressobj(2, 8, -12, 9, 1) + bufs = [] + for i in range(0, len(data), cx): + bufs.append(co.compress(data[i:i+cx])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + + self.assertEqual(data, zlib.decompress(combuf, -12, -5)) + + dco = zlib.decompressobj(-12) + bufs = [] + for i in range(0, len(combuf), dcx): + bufs.append(dco.decompress(combuf[i:i+dcx])) + self.assertEqual('', dco.unconsumed_tail, ######## + "(A) uct should be '': not %d long" % + len(dco.unconsumed_tail)) + if flush: + bufs.append(dco.flush()) + else: + while True: + chunk = dco.decompress('') + if chunk: + bufs.append(chunk) + else: + break + self.assertEqual('', dco.unconsumed_tail, ######## + "(B) uct should be '': not %d long" % + len(dco.unconsumed_tail)) + self.assertEqual(data, ''.join(bufs)) + # Failure means: "decompressobj with init options failed" + + def test_decompimax(self,sizes=[128],flush=True,source=None,cx=256,dcx=64): + # compress in steps, decompress in length-restricted steps, loop sizes + source = source or hamlet_scene + for reps in sizes: + # Check a decompression object with max_length specified + data = source * reps + co = zlib.compressobj(2, 8, -12, 9, 1) + bufs = [] + for i in range(0, len(data), cx): + bufs.append(co.compress(data[i:i+cx])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + self.assertEqual(data, zlib.decompress(combuf, -12, -5), + 'compressed data failure') + + dco = zlib.decompressobj(-12) + bufs = [] + cb = combuf + while cb: + #max_length = 1 + len(cb)/10 + chunk = dco.decompress(cb, dcx) + self.failIf(len(chunk) > dcx, + 'chunk too big (%d>%d)' % (len(chunk), dcx)) + bufs.append(chunk) + cb = dco.unconsumed_tail + if flush: + bufs.append(dco.flush()) + else: + while True: + chunk = dco.decompress('', dcx) + self.failIf(len(chunk) > dcx, + 'chunk too big in tail (%d>%d)' % (len(chunk), dcx)) + if chunk: + bufs.append(chunk) + else: + break + self.assertEqual(len(data), len(''.join(bufs))) + self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') + + def test_decompressmaxlen(self): + # Check a decompression object with max_length specified + data = hamlet_scene * 8 * 16 + co = zlib.compressobj(2, 8, -12, 9, 1) + bufs = [] + for i in range(0, len(data), 256): + bufs.append(co.compress(data[i:i+256])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + self.assertEqual(data, zlib.decompress(combuf, -12, -5), + 'compressed data failure') + + dco = zlib.decompressobj(-12) + bufs = [] + cb = combuf + while cb: + max_length = 1 + len(cb)/10 + chunk = dco.decompress(cb, max_length) + self.failIf(len(chunk) > max_length, + 'chunk too big (%d>%d)' % (len(chunk),max_length)) + bufs.append(chunk) + cb = dco.unconsumed_tail + bufs.append(dco.flush()) + self.assertEqual(len(data), len(''.join(bufs))) + self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') + + def test_decompressmaxlenflushless(self): + # identical to test_decompressmaxlen except flush is replaced + # with an equivalent. This works and other fails on (eg) 2.2.2 + data = hamlet_scene * 8 * 16 + co = zlib.compressobj(2, 8, -12, 9, 1) + bufs = [] + for i in range(0, len(data), 256): + bufs.append(co.compress(data[i:i+256])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + self.assertEqual(data, zlib.decompress(combuf, -12, -5), + 'compressed data mismatch') + + dco = zlib.decompressobj(-12) + bufs = [] + cb = combuf + while cb: + max_length = 1 + len(cb)/10 + chunk = dco.decompress(cb, max_length) + self.failIf(len(chunk) > max_length, + 'chunk too big (%d>%d)' % (len(chunk),max_length)) + bufs.append(chunk) + cb = dco.unconsumed_tail + + #bufs.append(dco.flush()) + while len(chunk): + chunk = dco.decompress('', max_length) + self.failIf(len(chunk) > max_length, + 'chunk too big (%d>%d)' % (len(chunk),max_length)) + bufs.append(chunk) + + self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') + + def test_maxlenmisc(self): + # Misc tests of max_length + dco = zlib.decompressobj(-12) + self.assertRaises(ValueError, dco.decompress, "", -1) + self.assertEqual('', dco.unconsumed_tail) + + def test_flushes(self): + # Test flush() with the various options, using all the + # different levels in order to provide more variations. + sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] + sync_opt = [getattr(zlib, opt) for opt in sync_opt + if hasattr(zlib, opt)] + data = hamlet_scene * 8 + + for sync in sync_opt: + for level in range(10): + obj = zlib.compressobj( level ) + a = obj.compress( data[:3000] ) + b = obj.flush( sync ) + c = obj.compress( data[3000:] ) + d = obj.flush() + self.assertEqual(zlib.decompress(''.join([a,b,c,d])), + data, ("Decompress failed: flush " + "mode=%i, level=%i") % (sync, level)) + del obj + + def test_odd_flush(self): + # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 + import random + + if hasattr(zlib, 'Z_SYNC_FLUSH'): + # Testing on 17K of "random" data + + # Create compressor and decompressor objects + co = zlib.compressobj(9) + dco = zlib.decompressobj() + + # Try 17K of data + # generate random data stream + try: + # In 2.3 and later, WichmannHill is the RNG of the bug report + gen = random.WichmannHill() + except AttributeError: + try: + # 2.2 called it Random + gen = random.Random() + except AttributeError: + # others might simply have a single RNG + gen = random + gen.seed(1) + data = genblock(1, 17 * 1024, generator=gen) + + # compress, sync-flush, and decompress + first = co.compress(data) + second = co.flush(zlib.Z_SYNC_FLUSH) + expanded = dco.decompress(first + second) + + # if decompressed data is different from the input data, choke. + self.assertEqual(expanded, data, "17K random source doesn't match") + + def test_manydecompinc(self): + # Run incremental decompress test for a large range of sizes + self.test_decompinc(sizes=[1<<n for n in range(8)], + flush=True, cx=32, dcx=4) + + def test_manydecompimax(self): + # Run incremental decompress maxlen test for a large range of sizes + # avoid the flush bug + self.test_decompimax(sizes=[1<<n for n in range(8)], + flush=False, cx=32, dcx=4) + + def test_manydecompimaxflush(self): + # Run incremental decompress maxlen test for a large range of sizes + # avoid the flush bug + self.test_decompimax(sizes=[1<<n for n in range(8)], + flush=True, cx=32, dcx=4) + + +def genblock(seed, length, step=1024, generator=random): + """length-byte stream of random data from a seed (in step-byte blocks).""" + if seed is not None: + generator.seed(seed) + randint = generator.randint + if length < step or step < 2: + step = length + blocks = [] + for i in range(0, length, step): + blocks.append(''.join([chr(randint(0,255)) + for x in range(step)])) + return ''.join(blocks)[:length] + + + +def choose_lines(source, number, seed=None, generator=random): + """Return a list of number lines randomly chosen from the source""" + if seed is not None: + generator.seed(seed) + sources = source.split('\n') + return [generator.choice(sources) for n in range(number)] + + + +hamlet_scene = """ LAERTES O, fear me not. @@ -226,3 +476,34 @@ LAERTES Farewell. """ + + +def test_main(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(ChecksumTestCase)) + suite.addTest(unittest.makeSuite(ExceptionTestCase)) + suite.addTest(unittest.makeSuite(CompressTestCase)) + suite.addTest(unittest.makeSuite(CompressObjectTestCase)) + test_support.run_suite(suite) + +if __name__ == "__main__": + test_main() + +def test(tests=''): + if not tests: tests = 'o' + suite = unittest.TestSuite() + if 'k' in tests: suite.addTest(unittest.makeSuite(ChecksumTestCase)) + if 'x' in tests: suite.addTest(unittest.makeSuite(ExceptionTestCase)) + if 'c' in tests: suite.addTest(unittest.makeSuite(CompressTestCase)) + if 'o' in tests: suite.addTest(unittest.makeSuite(CompressObjectTestCase)) + test_support.run_suite(suite) + +if False: + import sys + sys.path.insert(1, '/Py23Src/python/dist/src/Lib/test') + import test_zlib as tz + ts, ut = tz.test_support, tz.unittest + su = ut.TestSuite() + su.addTest(ut.makeSuite(tz.CompressTestCase)) + ts.run_suite(su) + |