diff options
Diffstat (limited to 'Lib/test/test_bz2.py')
-rw-r--r-- | Lib/test/test_bz2.py | 229 |
1 files changed, 100 insertions, 129 deletions
diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py index fbf8fff..be35580 100644 --- a/Lib/test/test_bz2.py +++ b/Lib/test/test_bz2.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python3 from test import support from test.support import TESTFN @@ -7,11 +7,14 @@ from io import BytesIO import os import subprocess import sys -import threading + +try: + import threading +except ImportError: + threading = None # Skip tests if the bz2 module doesn't exist. bz2 = support.import_module('bz2') - from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx") @@ -52,76 +55,68 @@ class BZ2FileTest(BaseTest): os.unlink(self.filename) def createTempFile(self, crlf=0): - f = open(self.filename, "wb") - if crlf: - data = self.DATA_CRLF - else: - data = self.DATA - f.write(data) - f.close() + with open(self.filename, "wb") as f: + if crlf: + data = self.DATA_CRLF + else: + data = self.DATA + f.write(data) def testRead(self): # "Test BZ2File.read()" self.createTempFile() - bz2f = BZ2File(self.filename) - self.assertRaises(TypeError, bz2f.read, None) - self.assertEqual(bz2f.read(), self.TEXT) - bz2f.close() + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.read, None) + self.assertEqual(bz2f.read(), self.TEXT) def testRead0(self): # Test BBZ2File.read(0)" self.createTempFile() - bz2f = BZ2File(self.filename) - self.assertRaises(TypeError, bz2f.read, None) - self.assertEqual(bz2f.read(0), b"") - bz2f.close() + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.read, None) + self.assertEqual(bz2f.read(0), b"") def testReadChunk10(self): # "Test BZ2File.read() in chunks of 10 bytes" self.createTempFile() - bz2f = BZ2File(self.filename) - text = b'' - while 1: - str = bz2f.read(10) - if not str: - break - text += str - self.assertEqual(text, self.TEXT) - bz2f.close() + with BZ2File(self.filename) as bz2f: + text = b'' + while 1: + str = bz2f.read(10) + if not str: + break + text += str + self.assertEqual(text, self.TEXT) def testRead100(self): # "Test BZ2File.read(100)" self.createTempFile() - bz2f = BZ2File(self.filename) - self.assertEqual(bz2f.read(100), self.TEXT[:100]) - bz2f.close() + with BZ2File(self.filename) as bz2f: + self.assertEqual(bz2f.read(100), self.TEXT[:100]) def testReadLine(self): # "Test BZ2File.readline()" self.createTempFile() - bz2f = BZ2File(self.filename) - self.assertRaises(TypeError, bz2f.readline, None) - sio = BytesIO(self.TEXT) - for line in sio.readlines(): - self.assertEqual(bz2f.readline(), line) - bz2f.close() + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.readline, None) + sio = BytesIO(self.TEXT) + for line in sio.readlines(): + self.assertEqual(bz2f.readline(), line) def testReadLines(self): # "Test BZ2File.readlines()" self.createTempFile() - bz2f = BZ2File(self.filename) - self.assertRaises(TypeError, bz2f.readlines, None) - sio = BytesIO(self.TEXT) - self.assertEqual(bz2f.readlines(), sio.readlines()) - bz2f.close() + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.readlines, None) + sio = BytesIO(self.TEXT) + self.assertEqual(bz2f.readlines(), sio.readlines()) def testIterator(self): # "Test iter(BZ2File)" self.createTempFile() - bz2f = BZ2File(self.filename) - sio = BytesIO(self.TEXT) - self.assertEqual(list(iter(bz2f)), sio.readlines()) - bz2f.close() + with BZ2File(self.filename) as bz2f: + sio = BytesIO(self.TEXT) + self.assertEqual(list(iter(bz2f)), sio.readlines()) def testClosedIteratorDeadlock(self): # "Test that iteration on a closed bz2file releases the lock." @@ -136,104 +131,91 @@ class BZ2FileTest(BaseTest): def testWrite(self): # "Test BZ2File.write()" - bz2f = BZ2File(self.filename, "w") - self.assertRaises(TypeError, bz2f.write) - bz2f.write(self.TEXT) - bz2f.close() - f = open(self.filename, 'rb') - self.assertEqual(self.decompress(f.read()), self.TEXT) - f.close() + with BZ2File(self.filename, "w") as bz2f: + self.assertRaises(TypeError, bz2f.write) + bz2f.write(self.TEXT) + with open(self.filename, 'rb') as f: + self.assertEqual(self.decompress(f.read()), self.TEXT) def testWriteChunks10(self): # "Test BZ2File.write() with chunks of 10 bytes" - bz2f = BZ2File(self.filename, "w") - n = 0 - while 1: - str = self.TEXT[n*10:(n+1)*10] - if not str: - break - bz2f.write(str) - n += 1 - bz2f.close() - f = open(self.filename, 'rb') - self.assertEqual(self.decompress(f.read()), self.TEXT) - f.close() + with BZ2File(self.filename, "w") as bz2f: + n = 0 + while 1: + str = self.TEXT[n*10:(n+1)*10] + if not str: + break + bz2f.write(str) + n += 1 + with open(self.filename, 'rb') as f: + self.assertEqual(self.decompress(f.read()), self.TEXT) def testWriteLines(self): # "Test BZ2File.writelines()" - bz2f = BZ2File(self.filename, "w") - self.assertRaises(TypeError, bz2f.writelines) - sio = BytesIO(self.TEXT) - bz2f.writelines(sio.readlines()) - bz2f.close() + with BZ2File(self.filename, "w") as bz2f: + self.assertRaises(TypeError, bz2f.writelines) + sio = BytesIO(self.TEXT) + bz2f.writelines(sio.readlines()) # patch #1535500 self.assertRaises(ValueError, bz2f.writelines, ["a"]) - f = open(self.filename, 'rb') - self.assertEqual(self.decompress(f.read()), self.TEXT) - f.close() + with open(self.filename, 'rb') as f: + self.assertEqual(self.decompress(f.read()), self.TEXT) def testWriteMethodsOnReadOnlyFile(self): - bz2f = BZ2File(self.filename, "w") - bz2f.write(b"abc") - bz2f.close() + with BZ2File(self.filename, "w") as bz2f: + bz2f.write(b"abc") - bz2f = BZ2File(self.filename, "r") - self.assertRaises(IOError, bz2f.write, b"a") - self.assertRaises(IOError, bz2f.writelines, [b"a"]) + with BZ2File(self.filename, "r") as bz2f: + self.assertRaises(IOError, bz2f.write, b"a") + self.assertRaises(IOError, bz2f.writelines, [b"a"]) def testSeekForward(self): # "Test BZ2File.seek(150, 0)" self.createTempFile() - bz2f = BZ2File(self.filename) - self.assertRaises(TypeError, bz2f.seek) - bz2f.seek(150) - self.assertEqual(bz2f.read(), self.TEXT[150:]) - bz2f.close() + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.seek) + bz2f.seek(150) + self.assertEqual(bz2f.read(), self.TEXT[150:]) def testSeekBackwards(self): # "Test BZ2File.seek(-150, 1)" self.createTempFile() - bz2f = BZ2File(self.filename) - bz2f.read(500) - bz2f.seek(-150, 1) - self.assertEqual(bz2f.read(), self.TEXT[500-150:]) - bz2f.close() + with BZ2File(self.filename) as bz2f: + bz2f.read(500) + bz2f.seek(-150, 1) + self.assertEqual(bz2f.read(), self.TEXT[500-150:]) def testSeekBackwardsFromEnd(self): # "Test BZ2File.seek(-150, 2)" self.createTempFile() - bz2f = BZ2File(self.filename) - bz2f.seek(-150, 2) - self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:]) - bz2f.close() + with BZ2File(self.filename) as bz2f: + bz2f.seek(-150, 2) + self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:]) def testSeekPostEnd(self): # "Test BZ2File.seek(150000)" self.createTempFile() - bz2f = BZ2File(self.filename) - bz2f.seek(150000) - self.assertEqual(bz2f.tell(), len(self.TEXT)) - self.assertEqual(bz2f.read(), b"") - bz2f.close() + with BZ2File(self.filename) as bz2f: + bz2f.seek(150000) + self.assertEqual(bz2f.tell(), len(self.TEXT)) + self.assertEqual(bz2f.read(), b"") def testSeekPostEndTwice(self): # "Test BZ2File.seek(150000) twice" self.createTempFile() - bz2f = BZ2File(self.filename) - bz2f.seek(150000) - bz2f.seek(150000) - self.assertEqual(bz2f.tell(), len(self.TEXT)) - self.assertEqual(bz2f.read(), b"") - bz2f.close() + with BZ2File(self.filename) as bz2f: + bz2f.seek(150000) + bz2f.seek(150000) + self.assertEqual(bz2f.tell(), len(self.TEXT)) + self.assertEqual(bz2f.read(), b"") def testSeekPreStart(self): # "Test BZ2File.seek(-150, 0)" self.createTempFile() - bz2f = BZ2File(self.filename) - bz2f.seek(-150) - self.assertEqual(bz2f.tell(), 0) - self.assertEqual(bz2f.read(), self.TEXT) - bz2f.close() + with BZ2File(self.filename) as bz2f: + bz2f.seek(-150) + self.assertEqual(bz2f.tell(), 0) + self.assertEqual(bz2f.read(), self.TEXT) def testOpenDel(self): # "Test opening and deleting a file many times" @@ -249,16 +231,13 @@ class BZ2FileTest(BaseTest): def testBug1191043(self): # readlines() for files containing no newline data = b'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t' - f = open(self.filename, "wb") - f.write(data) - f.close() - bz2f = BZ2File(self.filename) - lines = bz2f.readlines() - bz2f.close() + with open(self.filename, "wb") as f: + f.write(data) + with BZ2File(self.filename) as bz2f: + lines = bz2f.readlines() self.assertEqual(lines, [b'Test']) - bz2f = BZ2File(self.filename) - xlines = list(bz2f.readlines()) - bz2f.close() + with BZ2File(self.filename) as bz2f: + xlines = list(bz2f.readlines()) self.assertEqual(xlines, [b'Test']) def testContextProtocol(self): @@ -283,12 +262,12 @@ class BZ2FileTest(BaseTest): else: self.fail("1/0 didn't raise an exception") + @unittest.skipUnless(threading, 'Threading required for this test.') def testThreading(self): # Using a BZ2File from several threads doesn't deadlock (issue #7205). data = b"1" * 2**20 nthreads = 10 - f = bz2.BZ2File(self.filename, 'wb') - try: + with bz2.BZ2File(self.filename, 'wb') as f: def comp(): for i in range(5): f.write(data) @@ -297,27 +276,19 @@ class BZ2FileTest(BaseTest): t.start() for t in threads: t.join() - finally: - f.close() def testMixedIterationReads(self): # Issue #8397: mixed iteration and reads should be forbidden. - f = bz2.BZ2File(self.filename, 'wb') - try: + with bz2.BZ2File(self.filename, 'wb') as f: # The internal buffer size is hard-wired to 8192 bytes, we must # write out more than that for the test to stop half through # the buffer. f.write(self.TEXT * 100) - finally: - f.close() - f = bz2.BZ2File(self.filename, 'rb') - try: + with bz2.BZ2File(self.filename, 'rb') as f: next(f) self.assertRaises(ValueError, f.read) self.assertRaises(ValueError, f.readline) self.assertRaises(ValueError, f.readlines) - finally: - f.close() class BZ2CompressorTest(BaseTest): def testCompress(self): |