diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2015-02-26 12:08:07 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2015-02-26 12:08:07 (GMT) |
commit | e71258a0e67cf744f5f2c0bca15f1d66871ce050 (patch) | |
tree | df8d2e81cd048157d5f8a8fe7c7f32e268352c9f /Lib/test/test_bz2.py | |
parent | 87f50158ee72bb2ff29c5f44f0b0efbb83845d46 (diff) | |
download | cpython-e71258a0e67cf744f5f2c0bca15f1d66871ce050.zip cpython-e71258a0e67cf744f5f2c0bca15f1d66871ce050.tar.gz cpython-e71258a0e67cf744f5f2c0bca15f1d66871ce050.tar.bz2 |
Issue #15955: Add an option to limit the output size in bz2.decompress().
Patch by Nikolaus Rath.
Diffstat (limited to 'Lib/test/test_bz2.py')
-rw-r--r-- | Lib/test/test_bz2.py | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py index 1535e8e..2fe3f2a 100644 --- a/Lib/test/test_bz2.py +++ b/Lib/test/test_bz2.py @@ -5,6 +5,7 @@ import unittest from io import BytesIO import os import pickle +import glob import random import subprocess import sys @@ -51,6 +52,19 @@ class BaseTest(unittest.TestCase): EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00' BAD_DATA = b'this is not a valid bzip2 file' + # Some tests need more than one block of uncompressed data. Since one block + # is at least 100 kB, we gather some data dynamically and compress it. + # Note that this assumes that compression works correctly, so we cannot + # simply use the bigger test data for all tests. + test_size = 0 + BIG_TEXT = bytearray(128*1024) + for fname in glob.glob(os.path.join(os.path.dirname(__file__), '*.py')): + with open(fname, 'rb') as fh: + test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:]) + if test_size > 128*1024: + break + BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1) + def setUp(self): self.filename = support.TESTFN @@ -707,6 +721,95 @@ class BZ2DecompressorTest(BaseTest): with self.assertRaises(TypeError): pickle.dumps(BZ2Decompressor(), proto) + def testDecompressorChunksMaxsize(self): + bzd = BZ2Decompressor() + max_length = 100 + out = [] + + # Feed some input + len_ = len(self.BIG_DATA) - 64 + out.append(bzd.decompress(self.BIG_DATA[:len_], + max_length=max_length)) + self.assertFalse(bzd.needs_input) + self.assertEqual(len(out[-1]), max_length) + + # Retrieve more data without providing more input + out.append(bzd.decompress(b'', max_length=max_length)) + self.assertFalse(bzd.needs_input) + self.assertEqual(len(out[-1]), max_length) + + # Retrieve more data while providing more input + out.append(bzd.decompress(self.BIG_DATA[len_:], + max_length=max_length)) + self.assertLessEqual(len(out[-1]), max_length) + + # Retrieve remaining uncompressed data + while not bzd.eof: + out.append(bzd.decompress(b'', max_length=max_length)) + self.assertLessEqual(len(out[-1]), max_length) + + out = b"".join(out) + self.assertEqual(out, self.BIG_TEXT) + self.assertEqual(bzd.unused_data, b"") + + def test_decompressor_inputbuf_1(self): + # Test reusing input buffer after moving existing + # contents to beginning + bzd = BZ2Decompressor() + out = [] + + # Create input buffer and fill it + self.assertEqual(bzd.decompress(self.DATA[:100], + max_length=0), b'') + + # Retrieve some results, freeing capacity at beginning + # of input buffer + out.append(bzd.decompress(b'', 2)) + + # Add more data that fits into input buffer after + # moving existing data to beginning + out.append(bzd.decompress(self.DATA[100:105], 15)) + + # Decompress rest of data + out.append(bzd.decompress(self.DATA[105:])) + self.assertEqual(b''.join(out), self.TEXT) + + def test_decompressor_inputbuf_2(self): + # Test reusing input buffer by appending data at the + # end right away + bzd = BZ2Decompressor() + out = [] + + # Create input buffer and empty it + self.assertEqual(bzd.decompress(self.DATA[:200], + max_length=0), b'') + out.append(bzd.decompress(b'')) + + # Fill buffer with new data + out.append(bzd.decompress(self.DATA[200:280], 2)) + + # Append some more data, not enough to require resize + out.append(bzd.decompress(self.DATA[280:300], 2)) + + # Decompress rest of data + out.append(bzd.decompress(self.DATA[300:])) + self.assertEqual(b''.join(out), self.TEXT) + + def test_decompressor_inputbuf_3(self): + # Test reusing input buffer after extending it + + bzd = BZ2Decompressor() + out = [] + + # Create almost full input buffer + out.append(bzd.decompress(self.DATA[:200], 5)) + + # Add even more data to it, requiring resize + out.append(bzd.decompress(self.DATA[200:300], 5)) + + # Decompress rest of data + out.append(bzd.decompress(self.DATA[300:])) + self.assertEqual(b''.join(out), self.TEXT) class CompressDecompressTest(BaseTest): def testCompress(self): |