From 7d9ea5013f2f6122aa83a68429bf2dd5e5a00017 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Mon, 3 Feb 2003 20:45:52 +0000 Subject: - Thanks to Scott David Daniels, a subtle bug in how the zlib extension implemented flush() was fixed. Scott also rewrite the zlib test suite using the unittest module. (SF bug #640230 and patch #678531.) Backport candidate I think. --- Lib/test/output/test_zlib | 14 -- Lib/test/test_zlib.py | 607 +++++++++++++++++++++++++++++++++------------- Misc/ACKS | 1 + Misc/NEWS | 5 + Modules/zlibmodule.c | 57 ++++- 5 files changed, 495 insertions(+), 189 deletions(-) delete mode 100644 Lib/test/output/test_zlib diff --git a/Lib/test/output/test_zlib b/Lib/test/output/test_zlib deleted file mode 100644 index 1c2e2e9..0000000 --- a/Lib/test/output/test_zlib +++ /dev/null @@ -1,14 +0,0 @@ -test_zlib -0xe5c1a120 0x43b6aa94 -0xbd602f7 0xbd602f7 -expecting Bad compression level -expecting Invalid initialization option -expecting Invalid initialization option -normal compression/decompression succeeded -compress/decompression obj succeeded -decompress with init options succeeded -decompressobj with init options succeeded -should be '': '' -max_length decompressobj succeeded -unconsumed_tail should be '': '' -Testing on 17K of random data diff --git a/Lib/test/test_zlib.py b/Lib/test/test_zlib.py index b7d99c3..dc41e63 100644 --- a/Lib/test/test_zlib.py +++ b/Lib/test/test_zlib.py @@ -1,169 +1,419 @@ +import unittest +from test import test_support import zlib -import sys -import imp -from test.test_support import TestFailed - -try: - t = imp.find_module('test_zlib') - file = t[0] -except ImportError: - file = open(__file__) -buf = file.read() * 8 -file.close() - -# test the checksums (hex so the test doesn't break on 64-bit machines) -def fix(x): - return "0x%x" % (x & 0xffffffffL) -print fix(zlib.crc32('penguin')), fix(zlib.crc32('penguin', 1)) -print fix(zlib.adler32('penguin')), fix(zlib.adler32('penguin', 1)) - -# make sure we generate some expected errors -try: - zlib.compress('ERROR', zlib.MAX_WBITS + 1) -except zlib.error, msg: - print "expecting", msg -try: - zlib.compressobj(1, 8, 0) -except ValueError, msg: - print "expecting", msg -try: - zlib.decompressobj(0) -except ValueError, msg: - print "expecting", msg - -x = zlib.compress(buf) -y = zlib.decompress(x) -if buf != y: - print "normal compression/decompression failed" -else: - print "normal compression/decompression succeeded" - -buf = buf * 16 - -co = zlib.compressobj(8, 8, -15) -x1 = co.compress(buf) -x2 = co.flush() -try: - co.flush() - print "Oops - second flush worked when it should not have!" -except zlib.error: - pass - -x = x1 + x2 - -dc = zlib.decompressobj(-15) -y1 = dc.decompress(x) -y2 = dc.flush() -y = y1 + y2 -if buf != y: - print "compress/decompression obj failed" -else: - print "compress/decompression obj succeeded" - -co = zlib.compressobj(2, 8, -12, 9, 1) -bufs = [] -for i in range(0, len(buf), 256): - bufs.append(co.compress(buf[i:i+256])) -bufs.append(co.flush()) -combuf = ''.join(bufs) - -decomp1 = zlib.decompress(combuf, -12, -5) -if decomp1 != buf: - print "decompress with init options failed" -else: - print "decompress with init options succeeded" - -deco = zlib.decompressobj(-12) -bufs = [] -for i in range(0, len(combuf), 128): - bufs.append(deco.decompress(combuf[i:i+128])) -bufs.append(deco.flush()) -decomp2 = ''.join(bufs) -if decomp2 != buf: - print "decompressobj with init options failed" -else: - print "decompressobj with init options succeeded" - -print "should be '':", `deco.unconsumed_tail` - -# Check a decompression object with max_length specified -deco = zlib.decompressobj(-12) -cb = combuf -bufs = [] -while cb: - max_length = 1 + len(cb)/10 - chunk = deco.decompress(cb, max_length) - if len(chunk) > max_length: - print 'chunk too big (%d>%d)' % (len(chunk),max_length) - bufs.append(chunk) - cb = deco.unconsumed_tail -bufs.append(deco.flush()) -decomp2 = ''.join(buf) -if decomp2 != buf: - print "max_length decompressobj failed" -else: - print "max_length decompressobj succeeded" - -# Misc tests of max_length -deco = zlib.decompressobj(-12) -try: - deco.decompress("", -1) -except ValueError: - pass -else: - print "failed to raise value error on bad max_length" -print "unconsumed_tail should be '':", `deco.unconsumed_tail` - -# Test flush() with the various options, using all the different levels -# in order to provide more variations. -sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] -sync_opt = [getattr(zlib, opt) for opt in sync_opt if hasattr(zlib, opt)] - -for sync in sync_opt: - for level in range(10): - obj = zlib.compressobj( level ) - d = obj.compress( buf[:3000] ) - d = d + obj.flush( sync ) - d = d + obj.compress( buf[3000:] ) - d = d + obj.flush() - if zlib.decompress(d) != buf: - print "Decompress failed: flush mode=%i, level=%i" % (sync,level) - del obj - -# Test for the odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 - import random -random.seed(1) - -print 'Testing on 17K of random data' - -if hasattr(zlib, 'Z_SYNC_FLUSH'): - - # Create compressor and decompressor objects - c=zlib.compressobj(9) - d=zlib.decompressobj() - - # Try 17K of data - # generate random data stream - a="" - for i in range(17*1024): - a=a+chr(random.randint(0,255)) - - # compress, sync-flush, and decompress - t = d.decompress( c.compress(a)+c.flush(zlib.Z_SYNC_FLUSH) ) - # if decompressed data is different from the input data, choke. - if len(t) != len(a): - print len(a),len(t),len(d.unused_data) - raise TestFailed, "output of 17K doesn't match" - -def ignore(): - """An empty function with a big string. - - Make the compression algorithm work a little harder. - """ - - """ +# print test_support.TESTFN + +def getbuf(): + # This was in the original. Avoid non-repeatable sources. + # Left here (unused) in case something wants to be done with it. + import imp + try: + t = imp.find_module('test_zlib') + file = t[0] + except ImportError: + file = open(__file__) + buf = file.read() * 8 + file.close() + return buf + + + +class ChecksumTestCase(unittest.TestCase): + # checksum test cases + def test_crc32start(self): + self.assertEqual(zlib.crc32(""), zlib.crc32("", 0)) + + def test_crc32empty(self): + self.assertEqual(zlib.crc32("", 0), 0) + self.assertEqual(zlib.crc32("", 1), 1) + self.assertEqual(zlib.crc32("", 432), 432) + + def test_adler32start(self): + self.assertEqual(zlib.adler32(""), zlib.adler32("", 1)) + + def test_adler32empty(self): + self.assertEqual(zlib.adler32("", 0), 0) + self.assertEqual(zlib.adler32("", 1), 1) + self.assertEqual(zlib.adler32("", 432), 432) + + def assertEqual32(self, seen, expected): + # 32-bit values masked -- checksums on 32- vs 64- bit machines + # This is important if bit 31 (0x08000000L) is set. + self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL) + + def test_penguins(self): + self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L) + self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94) + self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6) + self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7) + + self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0)) + self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1)) + + + +class ExceptionTestCase(unittest.TestCase): + # make sure we generate some expected errors + def test_bigbits(self): + # specifying total bits too large causes an error + self.assertRaises(zlib.error, + zlib.compress, 'ERROR', zlib.MAX_WBITS + 1) + + def test_badcompressobj(self): + # verify failure on building compress object with bad params + self.assertRaises(ValueError, zlib.compressobj, 1, 8, 0) + + def test_baddecompressobj(self): + # verify failure on building decompress object with bad params + self.assertRaises(ValueError, zlib.decompressobj, 0) + + + +class CompressTestCase(unittest.TestCase): + # Test compression in one go (whole message compression) + def test_speech(self): + # decompress(compress(data)) better be data + x = zlib.compress(hamlet_scene) + self.assertEqual(zlib.decompress(x), hamlet_scene) + + def test_speech8(self): + # decompress(compress(data)) better be data -- more compression chances + data = hamlet_scene * 8 + x = zlib.compress(data) + self.assertEqual(zlib.decompress(x), data) + + def test_speech16(self): + # decompress(compress(data)) better be data -- more compression chances + data = hamlet_scene * 16 + x = zlib.compress(data) + self.assertEqual(zlib.decompress(x), data) + + def test_speech128(self): + # decompress(compress(data)) better be data -- more compression chances + data = hamlet_scene * 8 * 16 + x = zlib.compress(data) + self.assertEqual(zlib.decompress(x), data) + + def test_monotonic(self): + # higher compression levels should not expand compressed size + data = hamlet_scene * 8 * 16 + last = length = len(zlib.compress(data, 0)) + self.failUnless(last > len(data), "compress level 0 always expands") + for level in range(10): + length = len(zlib.compress(data, level)) + self.failUnless(length <= last, + 'compress level %d more effective than %d!' % ( + level-1, level)) + last = length + + + +class CompressObjectTestCase(unittest.TestCase): + # Test compression object + def test_pairsmall(self): + # use compress object in straightforward manner, decompress w/ object + data = hamlet_scene + co = zlib.compressobj(8, 8, -15) + x1 = co.compress(data) + x2 = co.flush() + self.assertRaises(zlib.error, co.flush) # second flush should not work + dco = zlib.decompressobj(-15) + y1 = dco.decompress(x1 + x2) + y2 = dco.flush() + self.assertEqual(data, y1 + y2) + + def test_pair(self): + # straightforward compress/decompress objects, more compression + data = hamlet_scene * 8 * 16 + co = zlib.compressobj(8, 8, -15) + x1 = co.compress(data) + x2 = co.flush() + self.assertRaises(zlib.error, co.flush) # second flush should not work + dco = zlib.decompressobj(-15) + y1 = dco.decompress(x1 + x2) + y2 = dco.flush() + self.assertEqual(data, y1 + y2) + + def test_compressincremental(self): + # compress object in steps, decompress object as one-shot + data = hamlet_scene * 8 * 16 + co = zlib.compressobj(2, 8, -12, 9, 1) + bufs = [] + for i in range(0, len(data), 256): + bufs.append(co.compress(data[i:i+256])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + + dco = zlib.decompressobj(-15) + y1 = dco.decompress(''.join(bufs)) + y2 = dco.flush() + self.assertEqual(data, y1 + y2) + + def test_decompressincremental(self): + # compress object in steps, decompress object in steps + data = hamlet_scene * 8 * 16 + co = zlib.compressobj(2, 8, -12, 9, 1) + bufs = [] + for i in range(0, len(data), 256): + bufs.append(co.compress(data[i:i+256])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + + self.assertEqual(data, zlib.decompress(combuf, -12, -5)) + + dco = zlib.decompressobj(-12) + bufs = [] + for i in range(0, len(combuf), 128): + bufs.append(dco.decompress(combuf[i:i+128])) + self.assertEqual('', dco.unconsumed_tail, ######## + "(A) uct should be '': not %d long" % + len(dco.unconsumed_tail)) + bufs.append(dco.flush()) + self.assertEqual('', dco.unconsumed_tail, ######## + "(B) uct should be '': not %d long" % + len(dco.unconsumed_tail)) + self.assertEqual(data, ''.join(bufs)) + # Failure means: "decompressobj with init options failed" + + def test_decompinc(self,sizes=[128],flush=True,source=None,cx=256,dcx=64): + # compress object in steps, decompress object in steps, loop sizes + source = source or hamlet_scene + for reps in sizes: + data = source * reps + co = zlib.compressobj(2, 8, -12, 9, 1) + bufs = [] + for i in range(0, len(data), cx): + bufs.append(co.compress(data[i:i+cx])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + + self.assertEqual(data, zlib.decompress(combuf, -12, -5)) + + dco = zlib.decompressobj(-12) + bufs = [] + for i in range(0, len(combuf), dcx): + bufs.append(dco.decompress(combuf[i:i+dcx])) + self.assertEqual('', dco.unconsumed_tail, ######## + "(A) uct should be '': not %d long" % + len(dco.unconsumed_tail)) + if flush: + bufs.append(dco.flush()) + else: + while True: + chunk = dco.decompress('') + if chunk: + bufs.append(chunk) + else: + break + self.assertEqual('', dco.unconsumed_tail, ######## + "(B) uct should be '': not %d long" % + len(dco.unconsumed_tail)) + self.assertEqual(data, ''.join(bufs)) + # Failure means: "decompressobj with init options failed" + + def test_decompimax(self,sizes=[128],flush=True,source=None,cx=256,dcx=64): + # compress in steps, decompress in length-restricted steps, loop sizes + source = source or hamlet_scene + for reps in sizes: + # Check a decompression object with max_length specified + data = source * reps + co = zlib.compressobj(2, 8, -12, 9, 1) + bufs = [] + for i in range(0, len(data), cx): + bufs.append(co.compress(data[i:i+cx])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + self.assertEqual(data, zlib.decompress(combuf, -12, -5), + 'compressed data failure') + + dco = zlib.decompressobj(-12) + bufs = [] + cb = combuf + while cb: + #max_length = 1 + len(cb)/10 + chunk = dco.decompress(cb, dcx) + self.failIf(len(chunk) > dcx, + 'chunk too big (%d>%d)' % (len(chunk), dcx)) + bufs.append(chunk) + cb = dco.unconsumed_tail + if flush: + bufs.append(dco.flush()) + else: + while True: + chunk = dco.decompress('', dcx) + self.failIf(len(chunk) > dcx, + 'chunk too big in tail (%d>%d)' % (len(chunk), dcx)) + if chunk: + bufs.append(chunk) + else: + break + self.assertEqual(len(data), len(''.join(bufs))) + self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') + + def test_decompressmaxlen(self): + # Check a decompression object with max_length specified + data = hamlet_scene * 8 * 16 + co = zlib.compressobj(2, 8, -12, 9, 1) + bufs = [] + for i in range(0, len(data), 256): + bufs.append(co.compress(data[i:i+256])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + self.assertEqual(data, zlib.decompress(combuf, -12, -5), + 'compressed data failure') + + dco = zlib.decompressobj(-12) + bufs = [] + cb = combuf + while cb: + max_length = 1 + len(cb)/10 + chunk = dco.decompress(cb, max_length) + self.failIf(len(chunk) > max_length, + 'chunk too big (%d>%d)' % (len(chunk),max_length)) + bufs.append(chunk) + cb = dco.unconsumed_tail + bufs.append(dco.flush()) + self.assertEqual(len(data), len(''.join(bufs))) + self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') + + def test_decompressmaxlenflushless(self): + # identical to test_decompressmaxlen except flush is replaced + # with an equivalent. This works and other fails on (eg) 2.2.2 + data = hamlet_scene * 8 * 16 + co = zlib.compressobj(2, 8, -12, 9, 1) + bufs = [] + for i in range(0, len(data), 256): + bufs.append(co.compress(data[i:i+256])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + self.assertEqual(data, zlib.decompress(combuf, -12, -5), + 'compressed data mismatch') + + dco = zlib.decompressobj(-12) + bufs = [] + cb = combuf + while cb: + max_length = 1 + len(cb)/10 + chunk = dco.decompress(cb, max_length) + self.failIf(len(chunk) > max_length, + 'chunk too big (%d>%d)' % (len(chunk),max_length)) + bufs.append(chunk) + cb = dco.unconsumed_tail + + #bufs.append(dco.flush()) + while len(chunk): + chunk = dco.decompress('', max_length) + self.failIf(len(chunk) > max_length, + 'chunk too big (%d>%d)' % (len(chunk),max_length)) + bufs.append(chunk) + + self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') + + def test_maxlenmisc(self): + # Misc tests of max_length + dco = zlib.decompressobj(-12) + self.assertRaises(ValueError, dco.decompress, "", -1) + self.assertEqual('', dco.unconsumed_tail) + + def test_flushes(self): + # Test flush() with the various options, using all the + # different levels in order to provide more variations. + sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] + sync_opt = [getattr(zlib, opt) for opt in sync_opt + if hasattr(zlib, opt)] + data = hamlet_scene * 8 + + for sync in sync_opt: + for level in range(10): + obj = zlib.compressobj( level ) + a = obj.compress( data[:3000] ) + b = obj.flush( sync ) + c = obj.compress( data[3000:] ) + d = obj.flush() + self.assertEqual(zlib.decompress(''.join([a,b,c,d])), + data, ("Decompress failed: flush " + "mode=%i, level=%i") % (sync, level)) + del obj + + def test_odd_flush(self): + # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 + import random + + if hasattr(zlib, 'Z_SYNC_FLUSH'): + # Testing on 17K of "random" data + + # Create compressor and decompressor objects + co = zlib.compressobj(9) + dco = zlib.decompressobj() + + # Try 17K of data + # generate random data stream + try: + # In 2.3 and later, WichmannHill is the RNG of the bug report + gen = random.WichmannHill() + except AttributeError: + try: + # 2.2 called it Random + gen = random.Random() + except AttributeError: + # others might simply have a single RNG + gen = random + gen.seed(1) + data = genblock(1, 17 * 1024, generator=gen) + + # compress, sync-flush, and decompress + first = co.compress(data) + second = co.flush(zlib.Z_SYNC_FLUSH) + expanded = dco.decompress(first + second) + + # if decompressed data is different from the input data, choke. + self.assertEqual(expanded, data, "17K random source doesn't match") + + def test_manydecompinc(self): + # Run incremental decompress test for a large range of sizes + self.test_decompinc(sizes=[1<zst)); - if (err != Z_OK) - zlib_error(self->zst, err, "from inflateEnd()"); - else { - self->is_initialised = 0; - retval = PyString_FromStringAndSize(NULL, 0); + start_total_out = self->zst.total_out; + self->zst.avail_out = length; + self->zst.next_out = (Byte *)PyString_AS_STRING(retval); + + Py_BEGIN_ALLOW_THREADS + err = inflate(&(self->zst), Z_FINISH); + Py_END_ALLOW_THREADS + + /* while Z_OK and the output buffer is full, there might be more output, + so extend the output buffer and try again */ + while ((err == Z_OK || err == Z_BUF_ERROR) && self->zst.avail_out == 0) { + if (_PyString_Resize(&retval, length << 1) < 0) + goto error; + self->zst.next_out = (Byte *)PyString_AS_STRING(retval) + length; + self->zst.avail_out = length; + length = length << 1; + + Py_BEGIN_ALLOW_THREADS + err = inflate(&(self->zst), Z_FINISH); + Py_END_ALLOW_THREADS + } + + /* If flushmode is Z_FINISH, we also have to call deflateEnd() to free + various data structures. Note we should only get Z_STREAM_END when + flushmode is Z_FINISH */ + if (err == Z_STREAM_END) { + err = inflateEnd(&(self->zst)); + self->is_initialised = 0; + if (err != Z_OK) { + zlib_error(self->zst, err, "from inflateEnd()"); + Py_DECREF(retval); + retval = NULL; + goto error; + } } + _PyString_Resize(&retval, self->zst.total_out - start_total_out); + +error: LEAVE_ZLIB @@ -868,6 +899,8 @@ PyInit_zlib(void) if (ver != NULL) PyModule_AddObject(m, "ZLIB_VERSION", ver); + PyModule_AddStringConstant(m, "__version__", "1.0"); + #ifdef WITH_THREAD zlib_lock = PyThread_allocate_lock(); #endif /* WITH_THREAD */ -- cgit v0.12