summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_lzma.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_lzma.py')
-rw-r--r--Lib/test/test_lzma.py116
1 files changed, 110 insertions, 6 deletions
diff --git a/Lib/test/test_lzma.py b/Lib/test/test_lzma.py
index 07fadbd..2d39099 100644
--- a/Lib/test/test_lzma.py
+++ b/Lib/test/test_lzma.py
@@ -1,4 +1,5 @@
-from io import BytesIO, UnsupportedOperation
+import _compression
+from io import BytesIO, UnsupportedOperation, DEFAULT_BUFFER_SIZE
import os
import pickle
import random
@@ -135,6 +136,97 @@ class CompressorDecompressorTestCase(unittest.TestCase):
self.assertTrue(lzd.eof)
self.assertEqual(lzd.unused_data, b"")
+ def test_decompressor_chunks_maxsize(self):
+ lzd = LZMADecompressor()
+ max_length = 100
+ out = []
+
+ # Feed first half the input
+ len_ = len(COMPRESSED_XZ) // 2
+ out.append(lzd.decompress(COMPRESSED_XZ[:len_],
+ max_length=max_length))
+ self.assertFalse(lzd.needs_input)
+ self.assertEqual(len(out[-1]), max_length)
+
+ # Retrieve more data without providing more input
+ out.append(lzd.decompress(b'', max_length=max_length))
+ self.assertFalse(lzd.needs_input)
+ self.assertEqual(len(out[-1]), max_length)
+
+ # Retrieve more data while providing more input
+ out.append(lzd.decompress(COMPRESSED_XZ[len_:],
+ max_length=max_length))
+ self.assertLessEqual(len(out[-1]), max_length)
+
+ # Retrieve remaining uncompressed data
+ while not lzd.eof:
+ out.append(lzd.decompress(b'', max_length=max_length))
+ self.assertLessEqual(len(out[-1]), max_length)
+
+ out = b"".join(out)
+ self.assertEqual(out, INPUT)
+ self.assertEqual(lzd.check, lzma.CHECK_CRC64)
+ self.assertEqual(lzd.unused_data, b"")
+
+ def test_decompressor_inputbuf_1(self):
+ # Test reusing input buffer after moving existing
+ # contents to beginning
+ lzd = LZMADecompressor()
+ out = []
+
+ # Create input buffer and fill it
+ self.assertEqual(lzd.decompress(COMPRESSED_XZ[:100],
+ max_length=0), b'')
+
+ # Retrieve some results, freeing capacity at beginning
+ # of input buffer
+ out.append(lzd.decompress(b'', 2))
+
+ # Add more data that fits into input buffer after
+ # moving existing data to beginning
+ out.append(lzd.decompress(COMPRESSED_XZ[100:105], 15))
+
+ # Decompress rest of data
+ out.append(lzd.decompress(COMPRESSED_XZ[105:]))
+ self.assertEqual(b''.join(out), INPUT)
+
+ def test_decompressor_inputbuf_2(self):
+ # Test reusing input buffer by appending data at the
+ # end right away
+ lzd = LZMADecompressor()
+ out = []
+
+ # Create input buffer and empty it
+ self.assertEqual(lzd.decompress(COMPRESSED_XZ[:200],
+ max_length=0), b'')
+ out.append(lzd.decompress(b''))
+
+ # Fill buffer with new data
+ out.append(lzd.decompress(COMPRESSED_XZ[200:280], 2))
+
+ # Append some more data, not enough to require resize
+ out.append(lzd.decompress(COMPRESSED_XZ[280:300], 2))
+
+ # Decompress rest of data
+ out.append(lzd.decompress(COMPRESSED_XZ[300:]))
+ self.assertEqual(b''.join(out), INPUT)
+
+ def test_decompressor_inputbuf_3(self):
+ # Test reusing input buffer after extending it
+
+ lzd = LZMADecompressor()
+ out = []
+
+ # Create almost full input buffer
+ out.append(lzd.decompress(COMPRESSED_XZ[:200], 5))
+
+ # Add even more data to it, requiring resize
+ out.append(lzd.decompress(COMPRESSED_XZ[200:300], 5))
+
+ # Decompress rest of data
+ out.append(lzd.decompress(COMPRESSED_XZ[300:]))
+ self.assertEqual(b''.join(out), INPUT)
+
def test_decompressor_unused_data(self):
lzd = LZMADecompressor()
extra = b"fooblibar"
@@ -681,13 +773,13 @@ class FileTestCase(unittest.TestCase):
def test_read_multistream_buffer_size_aligned(self):
# Test the case where a stream boundary coincides with the end
# of the raw read buffer.
- saved_buffer_size = lzma._BUFFER_SIZE
- lzma._BUFFER_SIZE = len(COMPRESSED_XZ)
+ saved_buffer_size = _compression.BUFFER_SIZE
+ _compression.BUFFER_SIZE = len(COMPRESSED_XZ)
try:
with LZMAFile(BytesIO(COMPRESSED_XZ * 5)) as f:
self.assertEqual(f.read(), INPUT * 5)
finally:
- lzma._BUFFER_SIZE = saved_buffer_size
+ _compression.BUFFER_SIZE = saved_buffer_size
def test_read_trailing_junk(self):
with LZMAFile(BytesIO(COMPRESSED_XZ + COMPRESSED_BOGUS)) as f:
@@ -738,7 +830,7 @@ class FileTestCase(unittest.TestCase):
with LZMAFile(BytesIO(), "w") as f:
self.assertRaises(ValueError, f.read)
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
- self.assertRaises(TypeError, f.read, None)
+ self.assertRaises(TypeError, f.read, float())
def test_read_bad_data(self):
with LZMAFile(BytesIO(COMPRESSED_BOGUS)) as f:
@@ -834,6 +926,17 @@ class FileTestCase(unittest.TestCase):
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
self.assertListEqual(f.readlines(), lines)
+ def test_decompress_limited(self):
+ """Decompressed data buffering should be limited"""
+ bomb = lzma.compress(bytes(int(2e6)), preset=6)
+ self.assertLess(len(bomb), _compression.BUFFER_SIZE)
+
+ decomp = LZMAFile(BytesIO(bomb))
+ self.assertEqual(bytes(1), decomp.read(1))
+ max_decomp = 1 + DEFAULT_BUFFER_SIZE
+ self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
+ "Excessive amount of data was decompressed")
+
def test_write(self):
with BytesIO() as dst:
with LZMAFile(dst, "w") as f:
@@ -999,7 +1102,8 @@ class FileTestCase(unittest.TestCase):
self.assertRaises(ValueError, f.seek, 0)
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
self.assertRaises(ValueError, f.seek, 0, 3)
- self.assertRaises(ValueError, f.seek, 9, ())
+ # io.BufferedReader raises TypeError instead of ValueError
+ self.assertRaises((TypeError, ValueError), f.seek, 9, ())
self.assertRaises(TypeError, f.seek, None)
self.assertRaises(TypeError, f.seek, b"derp")