diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2015-01-17 15:22:18 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2015-01-17 15:22:18 (GMT) |
commit | 26795baaa812c74087e97f9119ec143451e23daa (patch) | |
tree | 2f9926c2fa66ab7d22fa0c842860b42cd38f7726 /Lib/test/test_lzma.py | |
parent | e262074ede78087c079094e28bb1d09a40072471 (diff) | |
download | cpython-26795baaa812c74087e97f9119ec143451e23daa.zip cpython-26795baaa812c74087e97f9119ec143451e23daa.tar.gz cpython-26795baaa812c74087e97f9119ec143451e23daa.tar.bz2 |
Issue #15955: Add an option to limit output size when decompressing LZMA data.
Patch by Nikolaus Rath and Martin Panter.
Diffstat (limited to 'Lib/test/test_lzma.py')
-rw-r--r-- | Lib/test/test_lzma.py | 91 |
1 files changed, 91 insertions, 0 deletions
diff --git a/Lib/test/test_lzma.py b/Lib/test/test_lzma.py index 07fadbd..cded28c 100644 --- a/Lib/test/test_lzma.py +++ b/Lib/test/test_lzma.py @@ -135,6 +135,97 @@ class CompressorDecompressorTestCase(unittest.TestCase): self.assertTrue(lzd.eof) self.assertEqual(lzd.unused_data, b"") + def test_decompressor_chunks_maxsize(self): + lzd = LZMADecompressor() + max_length = 100 + out = [] + + # Feed first half the input + len_ = len(COMPRESSED_XZ) // 2 + out.append(lzd.decompress(COMPRESSED_XZ[:len_], + max_length=max_length)) + self.assertFalse(lzd.needs_input) + self.assertEqual(len(out[-1]), max_length) + + # Retrieve more data without providing more input + out.append(lzd.decompress(b'', max_length=max_length)) + self.assertFalse(lzd.needs_input) + self.assertEqual(len(out[-1]), max_length) + + # Retrieve more data while providing more input + out.append(lzd.decompress(COMPRESSED_XZ[len_:], + max_length=max_length)) + self.assertLessEqual(len(out[-1]), max_length) + + # Retrieve remaining uncompressed data + while not lzd.eof: + out.append(lzd.decompress(b'', max_length=max_length)) + self.assertLessEqual(len(out[-1]), max_length) + + out = b"".join(out) + self.assertEqual(out, INPUT) + self.assertEqual(lzd.check, lzma.CHECK_CRC64) + self.assertEqual(lzd.unused_data, b"") + + def test_decompressor_inputbuf_1(self): + # Test reusing input buffer after moving existing + # contents to beginning + lzd = LZMADecompressor() + out = [] + + # Create input buffer and fill it + self.assertEqual(lzd.decompress(COMPRESSED_XZ[:100], + max_length=0), b'') + + # Retrieve some results, freeing capacity at beginning + # of input buffer + out.append(lzd.decompress(b'', 2)) + + # Add more data that fits into input buffer after + # moving existing data to beginning + out.append(lzd.decompress(COMPRESSED_XZ[100:105], 15)) + + # Decompress rest of data + out.append(lzd.decompress(COMPRESSED_XZ[105:])) + self.assertEqual(b''.join(out), INPUT) + + def test_decompressor_inputbuf_2(self): + # Test reusing input buffer by appending data at the + # end right away + lzd = LZMADecompressor() + out = [] + + # Create input buffer and empty it + self.assertEqual(lzd.decompress(COMPRESSED_XZ[:200], + max_length=0), b'') + out.append(lzd.decompress(b'')) + + # Fill buffer with new data + out.append(lzd.decompress(COMPRESSED_XZ[200:280], 2)) + + # Append some more data, not enough to require resize + out.append(lzd.decompress(COMPRESSED_XZ[280:300], 2)) + + # Decompress rest of data + out.append(lzd.decompress(COMPRESSED_XZ[300:])) + self.assertEqual(b''.join(out), INPUT) + + def test_decompressor_inputbuf_3(self): + # Test reusing input buffer after extending it + + lzd = LZMADecompressor() + out = [] + + # Create almost full input buffer + out.append(lzd.decompress(COMPRESSED_XZ[:200], 5)) + + # Add even more data to it, requiring resize + out.append(lzd.decompress(COMPRESSED_XZ[200:300], 5)) + + # Decompress rest of data + out.append(lzd.decompress(COMPRESSED_XZ[300:])) + self.assertEqual(b''.join(out), INPUT) + def test_decompressor_unused_data(self): lzd = LZMADecompressor() extra = b"fooblibar" |