diff options
Diffstat (limited to 'Lib/test/test_zlib.py')
| -rw-r--r-- | Lib/test/test_zlib.py | 256 |
1 files changed, 159 insertions, 97 deletions
diff --git a/Lib/test/test_zlib.py b/Lib/test/test_zlib.py index 4661c1d..1daa8f8 100644 --- a/Lib/test/test_zlib.py +++ b/Lib/test/test_zlib.py @@ -7,10 +7,23 @@ from test.support import bigmemtest, _1G, _4G zlib = support.import_module('zlib') -try: - import mmap -except ImportError: - mmap = None +requires_Compress_copy = unittest.skipUnless( + hasattr(zlib.compressobj(), "copy"), + 'requires Compress.copy()') +requires_Decompress_copy = unittest.skipUnless( + hasattr(zlib.decompressobj(), "copy"), + 'requires Decompress.copy()') + + +class VersionTestCase(unittest.TestCase): + + def test_library_version(self): + # Test that the major version of the actual library in use matches the + # major version that we were compiled against. We can't guarantee that + # the minor versions will match (even on the machine on which the module + # was compiled), and the API is stable between minor versions, so + # testing only the major versions avoids spurious failures. + self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0]) class ChecksumTestCase(unittest.TestCase): @@ -173,10 +186,8 @@ class CompressTestCase(BaseCompressTestCase, unittest.TestCase): def test_big_decompress_buffer(self, size): self.check_big_decompress_buffer(size, zlib.decompress) - @bigmemtest(size=_4G + 100, memuse=1) + @bigmemtest(size=_4G + 100, memuse=1, dry_run=False) def test_length_overflow(self, size): - if size < _4G + 100: - self.skipTest("not enough free memory, need at least 4 GB") data = b'x' * size try: self.assertRaises(OverflowError, zlib.compress, data, 1) @@ -377,39 +388,39 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): "mode=%i, level=%i") % (sync, level)) del obj + @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'), + 'requires zlib.Z_SYNC_FLUSH') def test_odd_flush(self): # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 import random + # Testing on 17K of "random" data - if hasattr(zlib, 'Z_SYNC_FLUSH'): - # Testing on 17K of "random" data - - # Create compressor and decompressor objects - co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) - dco = zlib.decompressobj() + # Create compressor and decompressor objects + co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) + dco = zlib.decompressobj() - # Try 17K of data - # generate random data stream + # Try 17K of data + # generate random data stream + try: + # In 2.3 and later, WichmannHill is the RNG of the bug report + gen = random.WichmannHill() + except AttributeError: try: - # In 2.3 and later, WichmannHill is the RNG of the bug report - gen = random.WichmannHill() + # 2.2 called it Random + gen = random.Random() except AttributeError: - try: - # 2.2 called it Random - gen = random.Random() - except AttributeError: - # others might simply have a single RNG - gen = random - gen.seed(1) - data = genblock(1, 17 * 1024, generator=gen) - - # compress, sync-flush, and decompress - first = co.compress(data) - second = co.flush(zlib.Z_SYNC_FLUSH) - expanded = dco.decompress(first + second) - - # if decompressed data is different from the input data, choke. - self.assertEqual(expanded, data, "17K random source doesn't match") + # others might simply have a single RNG + gen = random + gen.seed(1) + data = genblock(1, 17 * 1024, generator=gen) + + # compress, sync-flush, and decompress + first = co.compress(data) + second = co.flush(zlib.Z_SYNC_FLUSH) + expanded = dco.decompress(first + second) + + # if decompressed data is different from the input data, choke. + self.assertEqual(expanded, data, "17K random source doesn't match") def test_empty_flush(self): # Test that calling .flush() on unused objects works. @@ -421,6 +432,35 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): dco = zlib.decompressobj() self.assertEqual(dco.flush(), b"") # Returns nothing + def test_dictionary(self): + h = HAMLET_SCENE + # Build a simulated dictionary out of the words in HAMLET. + words = h.split() + random.shuffle(words) + zdict = b''.join(words) + # Use it to compress HAMLET. + co = zlib.compressobj(zdict=zdict) + cd = co.compress(h) + co.flush() + # Verify that it will decompress with the dictionary. + dco = zlib.decompressobj(zdict=zdict) + self.assertEqual(dco.decompress(cd) + dco.flush(), h) + # Verify that it fails when not given the dictionary. + dco = zlib.decompressobj() + self.assertRaises(zlib.error, dco.decompress, cd) + + def test_dictionary_streaming(self): + # This simulates the reuse of a compressor object for compressing + # several separate data streams. + co = zlib.compressobj(zdict=HAMLET_SCENE) + do = zlib.decompressobj(zdict=HAMLET_SCENE) + piece = HAMLET_SCENE[1000:1500] + d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH) + d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH) + d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH) + self.assertEqual(do.decompress(d0), piece) + self.assertEqual(do.decompress(d1), piece[100:]) + self.assertEqual(do.decompress(d2), piece[:-100]) + def test_decompress_incomplete_stream(self): # This is 'foo', deflated x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' @@ -434,6 +474,26 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): y += dco.flush() self.assertEqual(y, b'foo') + def test_decompress_eof(self): + x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' + dco = zlib.decompressobj() + self.assertFalse(dco.eof) + dco.decompress(x[:-5]) + self.assertFalse(dco.eof) + dco.decompress(x[-5:]) + self.assertTrue(dco.eof) + dco.flush() + self.assertTrue(dco.eof) + + def test_decompress_eof_incomplete_stream(self): + x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' + dco = zlib.decompressobj() + self.assertFalse(dco.eof) + dco.decompress(x[:-5]) + self.assertFalse(dco.eof) + dco.flush() + self.assertFalse(dco.eof) + def test_decompress_unused_data(self): # Repeated calls to decompress() after EOF should accumulate data in # dco.unused_data, instead of just storing the arg to the last call. @@ -455,6 +515,7 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): data += dco.decompress( dco.unconsumed_tail + x[i : i + step], maxlen) data += dco.flush() + self.assertTrue(dco.eof) self.assertEqual(data, source) self.assertEqual(dco.unconsumed_tail, b'') self.assertEqual(dco.unused_data, remainder) @@ -471,67 +532,69 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): data = zlib.compress(input2) self.assertEqual(dco.flush(), input1[1:]) - if hasattr(zlib.compressobj(), "copy"): - def test_compresscopy(self): - # Test copying a compression object - data0 = HAMLET_SCENE - data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") - c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) - bufs0 = [] - bufs0.append(c0.compress(data0)) - - c1 = c0.copy() - bufs1 = bufs0[:] - - bufs0.append(c0.compress(data0)) - bufs0.append(c0.flush()) - s0 = b''.join(bufs0) - - bufs1.append(c1.compress(data1)) - bufs1.append(c1.flush()) - s1 = b''.join(bufs1) - - self.assertEqual(zlib.decompress(s0),data0+data0) - self.assertEqual(zlib.decompress(s1),data0+data1) - - def test_badcompresscopy(self): - # Test copying a compression object in an inconsistent state - c = zlib.compressobj() - c.compress(HAMLET_SCENE) - c.flush() - self.assertRaises(ValueError, c.copy) - - if hasattr(zlib.decompressobj(), "copy"): - def test_decompresscopy(self): - # Test copying a decompression object - data = HAMLET_SCENE - comp = zlib.compress(data) - # Test type of return value - self.assertIsInstance(comp, bytes) - - d0 = zlib.decompressobj() - bufs0 = [] - bufs0.append(d0.decompress(comp[:32])) - - d1 = d0.copy() - bufs1 = bufs0[:] - - bufs0.append(d0.decompress(comp[32:])) - s0 = b''.join(bufs0) - - bufs1.append(d1.decompress(comp[32:])) - s1 = b''.join(bufs1) - - self.assertEqual(s0,s1) - self.assertEqual(s0,data) - - def test_baddecompresscopy(self): - # Test copying a compression object in an inconsistent state - data = zlib.compress(HAMLET_SCENE) - d = zlib.decompressobj() - d.decompress(data) - d.flush() - self.assertRaises(ValueError, d.copy) + @requires_Compress_copy + def test_compresscopy(self): + # Test copying a compression object + data0 = HAMLET_SCENE + data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") + c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) + bufs0 = [] + bufs0.append(c0.compress(data0)) + + c1 = c0.copy() + bufs1 = bufs0[:] + + bufs0.append(c0.compress(data0)) + bufs0.append(c0.flush()) + s0 = b''.join(bufs0) + + bufs1.append(c1.compress(data1)) + bufs1.append(c1.flush()) + s1 = b''.join(bufs1) + + self.assertEqual(zlib.decompress(s0),data0+data0) + self.assertEqual(zlib.decompress(s1),data0+data1) + + @requires_Compress_copy + def test_badcompresscopy(self): + # Test copying a compression object in an inconsistent state + c = zlib.compressobj() + c.compress(HAMLET_SCENE) + c.flush() + self.assertRaises(ValueError, c.copy) + + @requires_Decompress_copy + def test_decompresscopy(self): + # Test copying a decompression object + data = HAMLET_SCENE + comp = zlib.compress(data) + # Test type of return value + self.assertIsInstance(comp, bytes) + + d0 = zlib.decompressobj() + bufs0 = [] + bufs0.append(d0.decompress(comp[:32])) + + d1 = d0.copy() + bufs1 = bufs0[:] + + bufs0.append(d0.decompress(comp[32:])) + s0 = b''.join(bufs0) + + bufs1.append(d1.decompress(comp[32:])) + s1 = b''.join(bufs1) + + self.assertEqual(s0,s1) + self.assertEqual(s0,data) + + @requires_Decompress_copy + def test_baddecompresscopy(self): + # Test copying a compression object in an inconsistent state + data = zlib.compress(HAMLET_SCENE) + d = zlib.decompressobj() + d.decompress(data) + d.flush() + self.assertRaises(ValueError, d.copy) # Memory use of the following functions takes into account overallocation @@ -547,10 +610,8 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): decompress = lambda s: d.decompress(s) + d.flush() self.check_big_decompress_buffer(size, decompress) - @bigmemtest(size=_4G + 100, memuse=1) + @bigmemtest(size=_4G + 100, memuse=1, dry_run=False) def test_length_overflow(self, size): - if size < _4G + 100: - self.skipTest("not enough free memory, need at least 4 GB") data = b'x' * size c = zlib.compressobj(1) d = zlib.decompressobj() @@ -651,6 +712,7 @@ LAERTES def test_main(): support.run_unittest( + VersionTestCase, ChecksumTestCase, ChecksumBigBufferTestCase, ExceptionTestCase, |
