diff options
Diffstat (limited to 'Lib/test/test_lzma.py')
-rw-r--r-- | Lib/test/test_lzma.py | 116 |
1 files changed, 110 insertions, 6 deletions
diff --git a/Lib/test/test_lzma.py b/Lib/test/test_lzma.py index 07fadbd..2d39099 100644 --- a/Lib/test/test_lzma.py +++ b/Lib/test/test_lzma.py @@ -1,4 +1,5 @@ -from io import BytesIO, UnsupportedOperation +import _compression +from io import BytesIO, UnsupportedOperation, DEFAULT_BUFFER_SIZE import os import pickle import random @@ -135,6 +136,97 @@ class CompressorDecompressorTestCase(unittest.TestCase): self.assertTrue(lzd.eof) self.assertEqual(lzd.unused_data, b"") + def test_decompressor_chunks_maxsize(self): + lzd = LZMADecompressor() + max_length = 100 + out = [] + + # Feed first half the input + len_ = len(COMPRESSED_XZ) // 2 + out.append(lzd.decompress(COMPRESSED_XZ[:len_], + max_length=max_length)) + self.assertFalse(lzd.needs_input) + self.assertEqual(len(out[-1]), max_length) + + # Retrieve more data without providing more input + out.append(lzd.decompress(b'', max_length=max_length)) + self.assertFalse(lzd.needs_input) + self.assertEqual(len(out[-1]), max_length) + + # Retrieve more data while providing more input + out.append(lzd.decompress(COMPRESSED_XZ[len_:], + max_length=max_length)) + self.assertLessEqual(len(out[-1]), max_length) + + # Retrieve remaining uncompressed data + while not lzd.eof: + out.append(lzd.decompress(b'', max_length=max_length)) + self.assertLessEqual(len(out[-1]), max_length) + + out = b"".join(out) + self.assertEqual(out, INPUT) + self.assertEqual(lzd.check, lzma.CHECK_CRC64) + self.assertEqual(lzd.unused_data, b"") + + def test_decompressor_inputbuf_1(self): + # Test reusing input buffer after moving existing + # contents to beginning + lzd = LZMADecompressor() + out = [] + + # Create input buffer and fill it + self.assertEqual(lzd.decompress(COMPRESSED_XZ[:100], + max_length=0), b'') + + # Retrieve some results, freeing capacity at beginning + # of input buffer + out.append(lzd.decompress(b'', 2)) + + # Add more data that fits into input buffer after + # moving existing data to beginning + out.append(lzd.decompress(COMPRESSED_XZ[100:105], 15)) + + # Decompress rest of data + out.append(lzd.decompress(COMPRESSED_XZ[105:])) + self.assertEqual(b''.join(out), INPUT) + + def test_decompressor_inputbuf_2(self): + # Test reusing input buffer by appending data at the + # end right away + lzd = LZMADecompressor() + out = [] + + # Create input buffer and empty it + self.assertEqual(lzd.decompress(COMPRESSED_XZ[:200], + max_length=0), b'') + out.append(lzd.decompress(b'')) + + # Fill buffer with new data + out.append(lzd.decompress(COMPRESSED_XZ[200:280], 2)) + + # Append some more data, not enough to require resize + out.append(lzd.decompress(COMPRESSED_XZ[280:300], 2)) + + # Decompress rest of data + out.append(lzd.decompress(COMPRESSED_XZ[300:])) + self.assertEqual(b''.join(out), INPUT) + + def test_decompressor_inputbuf_3(self): + # Test reusing input buffer after extending it + + lzd = LZMADecompressor() + out = [] + + # Create almost full input buffer + out.append(lzd.decompress(COMPRESSED_XZ[:200], 5)) + + # Add even more data to it, requiring resize + out.append(lzd.decompress(COMPRESSED_XZ[200:300], 5)) + + # Decompress rest of data + out.append(lzd.decompress(COMPRESSED_XZ[300:])) + self.assertEqual(b''.join(out), INPUT) + def test_decompressor_unused_data(self): lzd = LZMADecompressor() extra = b"fooblibar" @@ -681,13 +773,13 @@ class FileTestCase(unittest.TestCase): def test_read_multistream_buffer_size_aligned(self): # Test the case where a stream boundary coincides with the end # of the raw read buffer. - saved_buffer_size = lzma._BUFFER_SIZE - lzma._BUFFER_SIZE = len(COMPRESSED_XZ) + saved_buffer_size = _compression.BUFFER_SIZE + _compression.BUFFER_SIZE = len(COMPRESSED_XZ) try: with LZMAFile(BytesIO(COMPRESSED_XZ * 5)) as f: self.assertEqual(f.read(), INPUT * 5) finally: - lzma._BUFFER_SIZE = saved_buffer_size + _compression.BUFFER_SIZE = saved_buffer_size def test_read_trailing_junk(self): with LZMAFile(BytesIO(COMPRESSED_XZ + COMPRESSED_BOGUS)) as f: @@ -738,7 +830,7 @@ class FileTestCase(unittest.TestCase): with LZMAFile(BytesIO(), "w") as f: self.assertRaises(ValueError, f.read) with LZMAFile(BytesIO(COMPRESSED_XZ)) as f: - self.assertRaises(TypeError, f.read, None) + self.assertRaises(TypeError, f.read, float()) def test_read_bad_data(self): with LZMAFile(BytesIO(COMPRESSED_BOGUS)) as f: @@ -834,6 +926,17 @@ class FileTestCase(unittest.TestCase): with LZMAFile(BytesIO(COMPRESSED_XZ)) as f: self.assertListEqual(f.readlines(), lines) + def test_decompress_limited(self): + """Decompressed data buffering should be limited""" + bomb = lzma.compress(bytes(int(2e6)), preset=6) + self.assertLess(len(bomb), _compression.BUFFER_SIZE) + + decomp = LZMAFile(BytesIO(bomb)) + self.assertEqual(bytes(1), decomp.read(1)) + max_decomp = 1 + DEFAULT_BUFFER_SIZE + self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp, + "Excessive amount of data was decompressed") + def test_write(self): with BytesIO() as dst: with LZMAFile(dst, "w") as f: @@ -999,7 +1102,8 @@ class FileTestCase(unittest.TestCase): self.assertRaises(ValueError, f.seek, 0) with LZMAFile(BytesIO(COMPRESSED_XZ)) as f: self.assertRaises(ValueError, f.seek, 0, 3) - self.assertRaises(ValueError, f.seek, 9, ()) + # io.BufferedReader raises TypeError instead of ValueError + self.assertRaises((TypeError, ValueError), f.seek, 9, ()) self.assertRaises(TypeError, f.seek, None) self.assertRaises(TypeError, f.seek, b"derp") |