diff options
| author | Antoine Pitrou <solipsis@pitrou.net> | 2015-04-10 22:31:01 (GMT) |
|---|---|---|
| committer | Antoine Pitrou <solipsis@pitrou.net> | 2015-04-10 22:31:01 (GMT) |
| commit | 2dbc6e6bce0a29757acddd8000d55f7c844295a2 (patch) | |
| tree | f1510e3a93b2527308dd6400a8b0544607e072db /Lib/test | |
| parent | 2ce11d296cee8d71d2bf2451c7dba4ffa119d9d3 (diff) | |
| download | cpython-2dbc6e6bce0a29757acddd8000d55f7c844295a2.zip cpython-2dbc6e6bce0a29757acddd8000d55f7c844295a2.tar.gz cpython-2dbc6e6bce0a29757acddd8000d55f7c844295a2.tar.bz2 | |
Issue #23529: Limit the size of decompressed data when reading from
GzipFile, BZ2File or LZMAFile. This defeats denial of service attacks
using compressed bombs (i.e. compressed payloads which decompress to a huge
size).
Patch by Martin Panter and Nikolaus Rath.
Diffstat (limited to 'Lib/test')
| -rw-r--r-- | Lib/test/test_bz2.py | 30 | ||||
| -rw-r--r-- | Lib/test/test_gzip.py | 23 | ||||
| -rw-r--r-- | Lib/test/test_lzma.py | 25 |
3 files changed, 61 insertions, 17 deletions
diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py index bf9887b..a1e4b8d 100644 --- a/Lib/test/test_bz2.py +++ b/Lib/test/test_bz2.py @@ -2,7 +2,7 @@ from test import support from test.support import bigmemtest, _4G import unittest -from io import BytesIO +from io import BytesIO, DEFAULT_BUFFER_SIZE import os import pickle import glob @@ -10,6 +10,7 @@ import random import subprocess import sys from test.support import unlink +import _compression try: import threading @@ -110,7 +111,7 @@ class BZ2FileTest(BaseTest): def testRead(self): self.createTempFile() with BZ2File(self.filename) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(), self.TEXT) def testReadBadFile(self): @@ -121,21 +122,21 @@ class BZ2FileTest(BaseTest): def testReadMultiStream(self): self.createTempFile(streams=5) with BZ2File(self.filename) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(), self.TEXT * 5) def testReadMonkeyMultiStream(self): # Test BZ2File.read() on a multi-stream archive where a stream # boundary coincides with the end of the raw read buffer. - buffer_size = bz2._BUFFER_SIZE - bz2._BUFFER_SIZE = len(self.DATA) + buffer_size = _compression.BUFFER_SIZE + _compression.BUFFER_SIZE = len(self.DATA) try: self.createTempFile(streams=5) with BZ2File(self.filename) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(), self.TEXT * 5) finally: - bz2._BUFFER_SIZE = buffer_size + _compression.BUFFER_SIZE = buffer_size def testReadTrailingJunk(self): self.createTempFile(suffix=self.BAD_DATA) @@ -150,7 +151,7 @@ class BZ2FileTest(BaseTest): def testRead0(self): self.createTempFile() with BZ2File(self.filename) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(0), b"") def testReadChunk10(self): @@ -559,13 +560,24 @@ class BZ2FileTest(BaseTest): with BZ2File(str_filename, "rb") as f: self.assertEqual(f.read(), self.DATA) + def testDecompressLimited(self): + """Decompressed data buffering should be limited""" + bomb = bz2.compress(bytes(int(2e6)), compresslevel=9) + self.assertLess(len(bomb), _compression.BUFFER_SIZE) + + decomp = BZ2File(BytesIO(bomb)) + self.assertEqual(bytes(1), decomp.read(1)) + max_decomp = 1 + DEFAULT_BUFFER_SIZE + self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp, + "Excessive amount of data was decompressed") + # Tests for a BZ2File wrapping another file object: def testReadBytesIO(self): with BytesIO(self.DATA) as bio: with BZ2File(bio) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(), self.TEXT) self.assertFalse(bio.closed) diff --git a/Lib/test/test_gzip.py b/Lib/test/test_gzip.py index c0be3a1..d8408e1 100644 --- a/Lib/test/test_gzip.py +++ b/Lib/test/test_gzip.py @@ -123,7 +123,10 @@ class TestGzip(BaseTest): # Write to a file, open it for reading, then close it. self.test_write() f = gzip.GzipFile(self.filename, 'r') + fileobj = f.fileobj + self.assertFalse(fileobj.closed) f.close() + self.assertTrue(fileobj.closed) with self.assertRaises(ValueError): f.read(1) with self.assertRaises(ValueError): @@ -132,7 +135,10 @@ class TestGzip(BaseTest): f.tell() # Open the file for writing, then close it. f = gzip.GzipFile(self.filename, 'w') + fileobj = f.fileobj + self.assertFalse(fileobj.closed) f.close() + self.assertTrue(fileobj.closed) with self.assertRaises(ValueError): f.write(b'') with self.assertRaises(ValueError): @@ -271,9 +277,10 @@ class TestGzip(BaseTest): with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite: fWrite.write(data1) with gzip.GzipFile(self.filename) as fRead: + self.assertTrue(hasattr(fRead, 'mtime')) + self.assertIsNone(fRead.mtime) dataRead = fRead.read() self.assertEqual(dataRead, data1) - self.assertTrue(hasattr(fRead, 'mtime')) self.assertEqual(fRead.mtime, mtime) def test_metadata(self): @@ -416,6 +423,18 @@ class TestGzip(BaseTest): with gzip.GzipFile(str_filename, "rb") as f: self.assertEqual(f.read(), data1 * 50) + def test_decompress_limited(self): + """Decompressed data buffering should be limited""" + bomb = gzip.compress(bytes(int(2e6)), compresslevel=9) + self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE) + + bomb = io.BytesIO(bomb) + decomp = gzip.GzipFile(fileobj=bomb) + self.assertEqual(bytes(1), decomp.read(1)) + max_decomp = 1 + io.DEFAULT_BUFFER_SIZE + self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp, + "Excessive amount of data was decompressed") + # Testing compress/decompress shortcut functions def test_compress(self): @@ -463,7 +482,7 @@ class TestGzip(BaseTest): with gzip.open(self.filename, "wb") as f: f.write(data1) with gzip.open(self.filename, "rb") as f: - f.fileobj.prepend() + f._buffer.raw._fp.prepend() class TestOpen(BaseTest): def test_binary_modes(self): diff --git a/Lib/test/test_lzma.py b/Lib/test/test_lzma.py index cded28c..2d39099 100644 --- a/Lib/test/test_lzma.py +++ b/Lib/test/test_lzma.py @@ -1,4 +1,5 @@ -from io import BytesIO, UnsupportedOperation +import _compression +from io import BytesIO, UnsupportedOperation, DEFAULT_BUFFER_SIZE import os import pickle import random @@ -772,13 +773,13 @@ class FileTestCase(unittest.TestCase): def test_read_multistream_buffer_size_aligned(self): # Test the case where a stream boundary coincides with the end # of the raw read buffer. - saved_buffer_size = lzma._BUFFER_SIZE - lzma._BUFFER_SIZE = len(COMPRESSED_XZ) + saved_buffer_size = _compression.BUFFER_SIZE + _compression.BUFFER_SIZE = len(COMPRESSED_XZ) try: with LZMAFile(BytesIO(COMPRESSED_XZ * 5)) as f: self.assertEqual(f.read(), INPUT * 5) finally: - lzma._BUFFER_SIZE = saved_buffer_size + _compression.BUFFER_SIZE = saved_buffer_size def test_read_trailing_junk(self): with LZMAFile(BytesIO(COMPRESSED_XZ + COMPRESSED_BOGUS)) as f: @@ -829,7 +830,7 @@ class FileTestCase(unittest.TestCase): with LZMAFile(BytesIO(), "w") as f: self.assertRaises(ValueError, f.read) with LZMAFile(BytesIO(COMPRESSED_XZ)) as f: - self.assertRaises(TypeError, f.read, None) + self.assertRaises(TypeError, f.read, float()) def test_read_bad_data(self): with LZMAFile(BytesIO(COMPRESSED_BOGUS)) as f: @@ -925,6 +926,17 @@ class FileTestCase(unittest.TestCase): with LZMAFile(BytesIO(COMPRESSED_XZ)) as f: self.assertListEqual(f.readlines(), lines) + def test_decompress_limited(self): + """Decompressed data buffering should be limited""" + bomb = lzma.compress(bytes(int(2e6)), preset=6) + self.assertLess(len(bomb), _compression.BUFFER_SIZE) + + decomp = LZMAFile(BytesIO(bomb)) + self.assertEqual(bytes(1), decomp.read(1)) + max_decomp = 1 + DEFAULT_BUFFER_SIZE + self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp, + "Excessive amount of data was decompressed") + def test_write(self): with BytesIO() as dst: with LZMAFile(dst, "w") as f: @@ -1090,7 +1102,8 @@ class FileTestCase(unittest.TestCase): self.assertRaises(ValueError, f.seek, 0) with LZMAFile(BytesIO(COMPRESSED_XZ)) as f: self.assertRaises(ValueError, f.seek, 0, 3) - self.assertRaises(ValueError, f.seek, 9, ()) + # io.BufferedReader raises TypeError instead of ValueError + self.assertRaises((TypeError, ValueError), f.seek, 9, ()) self.assertRaises(TypeError, f.seek, None) self.assertRaises(TypeError, f.seek, b"derp") |
