summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_bz2.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_bz2.py')
-rw-r--r--Lib/test/test_bz2.py229
1 files changed, 100 insertions, 129 deletions
diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py
index fbf8fff..be35580 100644
--- a/Lib/test/test_bz2.py
+++ b/Lib/test/test_bz2.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
from test import support
from test.support import TESTFN
@@ -7,11 +7,14 @@ from io import BytesIO
import os
import subprocess
import sys
-import threading
+
+try:
+ import threading
+except ImportError:
+ threading = None
# Skip tests if the bz2 module doesn't exist.
bz2 = support.import_module('bz2')
-
from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx")
@@ -52,76 +55,68 @@ class BZ2FileTest(BaseTest):
os.unlink(self.filename)
def createTempFile(self, crlf=0):
- f = open(self.filename, "wb")
- if crlf:
- data = self.DATA_CRLF
- else:
- data = self.DATA
- f.write(data)
- f.close()
+ with open(self.filename, "wb") as f:
+ if crlf:
+ data = self.DATA_CRLF
+ else:
+ data = self.DATA
+ f.write(data)
def testRead(self):
# "Test BZ2File.read()"
self.createTempFile()
- bz2f = BZ2File(self.filename)
- self.assertRaises(TypeError, bz2f.read, None)
- self.assertEqual(bz2f.read(), self.TEXT)
- bz2f.close()
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.read, None)
+ self.assertEqual(bz2f.read(), self.TEXT)
def testRead0(self):
# Test BBZ2File.read(0)"
self.createTempFile()
- bz2f = BZ2File(self.filename)
- self.assertRaises(TypeError, bz2f.read, None)
- self.assertEqual(bz2f.read(0), b"")
- bz2f.close()
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.read, None)
+ self.assertEqual(bz2f.read(0), b"")
def testReadChunk10(self):
# "Test BZ2File.read() in chunks of 10 bytes"
self.createTempFile()
- bz2f = BZ2File(self.filename)
- text = b''
- while 1:
- str = bz2f.read(10)
- if not str:
- break
- text += str
- self.assertEqual(text, self.TEXT)
- bz2f.close()
+ with BZ2File(self.filename) as bz2f:
+ text = b''
+ while 1:
+ str = bz2f.read(10)
+ if not str:
+ break
+ text += str
+ self.assertEqual(text, self.TEXT)
def testRead100(self):
# "Test BZ2File.read(100)"
self.createTempFile()
- bz2f = BZ2File(self.filename)
- self.assertEqual(bz2f.read(100), self.TEXT[:100])
- bz2f.close()
+ with BZ2File(self.filename) as bz2f:
+ self.assertEqual(bz2f.read(100), self.TEXT[:100])
def testReadLine(self):
# "Test BZ2File.readline()"
self.createTempFile()
- bz2f = BZ2File(self.filename)
- self.assertRaises(TypeError, bz2f.readline, None)
- sio = BytesIO(self.TEXT)
- for line in sio.readlines():
- self.assertEqual(bz2f.readline(), line)
- bz2f.close()
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.readline, None)
+ sio = BytesIO(self.TEXT)
+ for line in sio.readlines():
+ self.assertEqual(bz2f.readline(), line)
def testReadLines(self):
# "Test BZ2File.readlines()"
self.createTempFile()
- bz2f = BZ2File(self.filename)
- self.assertRaises(TypeError, bz2f.readlines, None)
- sio = BytesIO(self.TEXT)
- self.assertEqual(bz2f.readlines(), sio.readlines())
- bz2f.close()
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.readlines, None)
+ sio = BytesIO(self.TEXT)
+ self.assertEqual(bz2f.readlines(), sio.readlines())
def testIterator(self):
# "Test iter(BZ2File)"
self.createTempFile()
- bz2f = BZ2File(self.filename)
- sio = BytesIO(self.TEXT)
- self.assertEqual(list(iter(bz2f)), sio.readlines())
- bz2f.close()
+ with BZ2File(self.filename) as bz2f:
+ sio = BytesIO(self.TEXT)
+ self.assertEqual(list(iter(bz2f)), sio.readlines())
def testClosedIteratorDeadlock(self):
# "Test that iteration on a closed bz2file releases the lock."
@@ -136,104 +131,91 @@ class BZ2FileTest(BaseTest):
def testWrite(self):
# "Test BZ2File.write()"
- bz2f = BZ2File(self.filename, "w")
- self.assertRaises(TypeError, bz2f.write)
- bz2f.write(self.TEXT)
- bz2f.close()
- f = open(self.filename, 'rb')
- self.assertEqual(self.decompress(f.read()), self.TEXT)
- f.close()
+ with BZ2File(self.filename, "w") as bz2f:
+ self.assertRaises(TypeError, bz2f.write)
+ bz2f.write(self.TEXT)
+ with open(self.filename, 'rb') as f:
+ self.assertEqual(self.decompress(f.read()), self.TEXT)
def testWriteChunks10(self):
# "Test BZ2File.write() with chunks of 10 bytes"
- bz2f = BZ2File(self.filename, "w")
- n = 0
- while 1:
- str = self.TEXT[n*10:(n+1)*10]
- if not str:
- break
- bz2f.write(str)
- n += 1
- bz2f.close()
- f = open(self.filename, 'rb')
- self.assertEqual(self.decompress(f.read()), self.TEXT)
- f.close()
+ with BZ2File(self.filename, "w") as bz2f:
+ n = 0
+ while 1:
+ str = self.TEXT[n*10:(n+1)*10]
+ if not str:
+ break
+ bz2f.write(str)
+ n += 1
+ with open(self.filename, 'rb') as f:
+ self.assertEqual(self.decompress(f.read()), self.TEXT)
def testWriteLines(self):
# "Test BZ2File.writelines()"
- bz2f = BZ2File(self.filename, "w")
- self.assertRaises(TypeError, bz2f.writelines)
- sio = BytesIO(self.TEXT)
- bz2f.writelines(sio.readlines())
- bz2f.close()
+ with BZ2File(self.filename, "w") as bz2f:
+ self.assertRaises(TypeError, bz2f.writelines)
+ sio = BytesIO(self.TEXT)
+ bz2f.writelines(sio.readlines())
# patch #1535500
self.assertRaises(ValueError, bz2f.writelines, ["a"])
- f = open(self.filename, 'rb')
- self.assertEqual(self.decompress(f.read()), self.TEXT)
- f.close()
+ with open(self.filename, 'rb') as f:
+ self.assertEqual(self.decompress(f.read()), self.TEXT)
def testWriteMethodsOnReadOnlyFile(self):
- bz2f = BZ2File(self.filename, "w")
- bz2f.write(b"abc")
- bz2f.close()
+ with BZ2File(self.filename, "w") as bz2f:
+ bz2f.write(b"abc")
- bz2f = BZ2File(self.filename, "r")
- self.assertRaises(IOError, bz2f.write, b"a")
- self.assertRaises(IOError, bz2f.writelines, [b"a"])
+ with BZ2File(self.filename, "r") as bz2f:
+ self.assertRaises(IOError, bz2f.write, b"a")
+ self.assertRaises(IOError, bz2f.writelines, [b"a"])
def testSeekForward(self):
# "Test BZ2File.seek(150, 0)"
self.createTempFile()
- bz2f = BZ2File(self.filename)
- self.assertRaises(TypeError, bz2f.seek)
- bz2f.seek(150)
- self.assertEqual(bz2f.read(), self.TEXT[150:])
- bz2f.close()
+ with BZ2File(self.filename) as bz2f:
+ self.assertRaises(TypeError, bz2f.seek)
+ bz2f.seek(150)
+ self.assertEqual(bz2f.read(), self.TEXT[150:])
def testSeekBackwards(self):
# "Test BZ2File.seek(-150, 1)"
self.createTempFile()
- bz2f = BZ2File(self.filename)
- bz2f.read(500)
- bz2f.seek(-150, 1)
- self.assertEqual(bz2f.read(), self.TEXT[500-150:])
- bz2f.close()
+ with BZ2File(self.filename) as bz2f:
+ bz2f.read(500)
+ bz2f.seek(-150, 1)
+ self.assertEqual(bz2f.read(), self.TEXT[500-150:])
def testSeekBackwardsFromEnd(self):
# "Test BZ2File.seek(-150, 2)"
self.createTempFile()
- bz2f = BZ2File(self.filename)
- bz2f.seek(-150, 2)
- self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:])
- bz2f.close()
+ with BZ2File(self.filename) as bz2f:
+ bz2f.seek(-150, 2)
+ self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:])
def testSeekPostEnd(self):
# "Test BZ2File.seek(150000)"
self.createTempFile()
- bz2f = BZ2File(self.filename)
- bz2f.seek(150000)
- self.assertEqual(bz2f.tell(), len(self.TEXT))
- self.assertEqual(bz2f.read(), b"")
- bz2f.close()
+ with BZ2File(self.filename) as bz2f:
+ bz2f.seek(150000)
+ self.assertEqual(bz2f.tell(), len(self.TEXT))
+ self.assertEqual(bz2f.read(), b"")
def testSeekPostEndTwice(self):
# "Test BZ2File.seek(150000) twice"
self.createTempFile()
- bz2f = BZ2File(self.filename)
- bz2f.seek(150000)
- bz2f.seek(150000)
- self.assertEqual(bz2f.tell(), len(self.TEXT))
- self.assertEqual(bz2f.read(), b"")
- bz2f.close()
+ with BZ2File(self.filename) as bz2f:
+ bz2f.seek(150000)
+ bz2f.seek(150000)
+ self.assertEqual(bz2f.tell(), len(self.TEXT))
+ self.assertEqual(bz2f.read(), b"")
def testSeekPreStart(self):
# "Test BZ2File.seek(-150, 0)"
self.createTempFile()
- bz2f = BZ2File(self.filename)
- bz2f.seek(-150)
- self.assertEqual(bz2f.tell(), 0)
- self.assertEqual(bz2f.read(), self.TEXT)
- bz2f.close()
+ with BZ2File(self.filename) as bz2f:
+ bz2f.seek(-150)
+ self.assertEqual(bz2f.tell(), 0)
+ self.assertEqual(bz2f.read(), self.TEXT)
def testOpenDel(self):
# "Test opening and deleting a file many times"
@@ -249,16 +231,13 @@ class BZ2FileTest(BaseTest):
def testBug1191043(self):
# readlines() for files containing no newline
data = b'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t'
- f = open(self.filename, "wb")
- f.write(data)
- f.close()
- bz2f = BZ2File(self.filename)
- lines = bz2f.readlines()
- bz2f.close()
+ with open(self.filename, "wb") as f:
+ f.write(data)
+ with BZ2File(self.filename) as bz2f:
+ lines = bz2f.readlines()
self.assertEqual(lines, [b'Test'])
- bz2f = BZ2File(self.filename)
- xlines = list(bz2f.readlines())
- bz2f.close()
+ with BZ2File(self.filename) as bz2f:
+ xlines = list(bz2f.readlines())
self.assertEqual(xlines, [b'Test'])
def testContextProtocol(self):
@@ -283,12 +262,12 @@ class BZ2FileTest(BaseTest):
else:
self.fail("1/0 didn't raise an exception")
+ @unittest.skipUnless(threading, 'Threading required for this test.')
def testThreading(self):
# Using a BZ2File from several threads doesn't deadlock (issue #7205).
data = b"1" * 2**20
nthreads = 10
- f = bz2.BZ2File(self.filename, 'wb')
- try:
+ with bz2.BZ2File(self.filename, 'wb') as f:
def comp():
for i in range(5):
f.write(data)
@@ -297,27 +276,19 @@ class BZ2FileTest(BaseTest):
t.start()
for t in threads:
t.join()
- finally:
- f.close()
def testMixedIterationReads(self):
# Issue #8397: mixed iteration and reads should be forbidden.
- f = bz2.BZ2File(self.filename, 'wb')
- try:
+ with bz2.BZ2File(self.filename, 'wb') as f:
# The internal buffer size is hard-wired to 8192 bytes, we must
# write out more than that for the test to stop half through
# the buffer.
f.write(self.TEXT * 100)
- finally:
- f.close()
- f = bz2.BZ2File(self.filename, 'rb')
- try:
+ with bz2.BZ2File(self.filename, 'rb') as f:
next(f)
self.assertRaises(ValueError, f.read)
self.assertRaises(ValueError, f.readline)
self.assertRaises(ValueError, f.readlines)
- finally:
- f.close()
class BZ2CompressorTest(BaseTest):
def testCompress(self):