summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBenjamin Peterson <benjamin@python.org>2010-10-31 17:57:22 (GMT)
committerBenjamin Peterson <benjamin@python.org>2010-10-31 17:57:22 (GMT)
commitd285bdb4434a57126f7f17dc6b8f78204c180c1f (patch)
tree00790aae4b9fbe0f54af017ac415cce34eea97ef
parentd6868b4ed45617675151d5abda7acf0080df8ca5 (diff)
downloadcpython-d285bdb4434a57126f7f17dc6b8f78204c180c1f.zip
cpython-d285bdb4434a57126f7f17dc6b8f78204c180c1f.tar.gz
cpython-d285bdb4434a57126f7f17dc6b8f78204c180c1f.tar.bz2
start banging on zipfile's file leakiness
-rw-r--r--Lib/test/test_zipfile.py57
-rw-r--r--Lib/zipfile.py3
2 files changed, 50 insertions, 10 deletions
diff --git a/Lib/test/test_zipfile.py b/Lib/test/test_zipfile.py
index 3c3ce2d..d67938f 100644
--- a/Lib/test/test_zipfile.py
+++ b/Lib/test/test_zipfile.py
@@ -99,6 +99,8 @@ class TestsWithSourceFile(unittest.TestCase):
# Check that testzip doesn't raise an exception
zipfp.testzip()
+ if not isinstance(f, str):
+ f.close()
def test_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@@ -127,6 +129,8 @@ class TestsWithSourceFile(unittest.TestCase):
self.assertEqual(b''.join(zipdata1), self.data)
self.assertEqual(b''.join(zipdata2), self.data)
+ if not isinstance(f, str):
+ f.close()
def test_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@@ -163,7 +167,8 @@ class TestsWithSourceFile(unittest.TestCase):
zipdata1.append(read_data)
self.assertEqual(b''.join(zipdata1), self.data)
-
+ if not isinstance(f, str):
+ f.close()
def test_random_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@@ -207,6 +212,8 @@ class TestsWithSourceFile(unittest.TestCase):
self.assertEqual(data, self.data)
zipfp.close()
+ if not isinstance(f, str):
+ f.close()
def zip_readline_test(self, f, compression):
self.make_test_archive(f, compression)
@@ -217,6 +224,8 @@ class TestsWithSourceFile(unittest.TestCase):
for line in self.line_gen:
linedata = zipopen.readline()
self.assertEqual(linedata, line + '\n')
+ if not isinstance(f, str):
+ f.close()
def zip_readlines_test(self, f, compression):
self.make_test_archive(f, compression)
@@ -226,6 +235,8 @@ class TestsWithSourceFile(unittest.TestCase):
ziplines = zipfp.open(TESTFN).readlines()
for line, zipline in zip(self.line_gen, ziplines):
self.assertEqual(zipline, line + '\n')
+ if not isinstance(f, str):
+ f.close()
def zip_iterlines_test(self, f, compression):
self.make_test_archive(f, compression)
@@ -234,6 +245,8 @@ class TestsWithSourceFile(unittest.TestCase):
with zipfile.ZipFile(f, "r") as zipfp:
for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
self.assertEqual(zipline, line + '\n')
+ if not isinstance(f, str):
+ f.close()
def test_readline_read_stored(self):
# Issue #7610: calls to readline() interleaved with calls to read().
@@ -430,6 +443,8 @@ class TestsWithSourceFile(unittest.TestCase):
with zipfile.ZipFile(f, "r") as zipfp:
zinfo = zipfp.getinfo('strfile')
self.assertEqual(zinfo.external_attr, 0o600 << 16)
+ if not isinstance(f, str):
+ f.close()
def test_writestr_permissions(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@@ -497,6 +512,8 @@ class TestZip64InSmallFiles(unittest.TestCase):
with zipfile.ZipFile(f, "w", compression) as zipfp:
self.assertRaises(zipfile.LargeZipFile,
zipfp.writestr, "another.name", self.data)
+ if not isinstance(f, str):
+ f.close()
def test_large_file_exception(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@@ -559,6 +576,8 @@ class TestZip64InSmallFiles(unittest.TestCase):
# Check that testzip doesn't raise an exception
zipfp.testzip()
+ if not isinstance(f, str):
+ f.close()
def test_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@@ -1078,6 +1097,8 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
self.assertEqual(len(testdata), len(self.data))
self.assertEqual(testdata, self.data)
self.assertEqual(zipfp.read("another.name"), self.data)
+ if not isinstance(f, str):
+ f.close()
def test_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@@ -1116,6 +1137,8 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
testdata2 = b''.join(zipdata2)
self.assertEqual(len(testdata2), len(self.data))
self.assertEqual(testdata2, self.data)
+ if not isinstance(f, str):
+ f.close()
def test_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@@ -1142,6 +1165,8 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
testdata = b''.join(zipdata1)
self.assertEqual(len(testdata), len(self.data))
self.assertEqual(testdata, self.data)
+ if not isinstance(f, str):
+ f.close()
def test_random_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@@ -1259,8 +1284,11 @@ class UniversalNewlineTests(unittest.TestCase):
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items():
- zipdata = zipfp.open(fn, "rU").read()
+ with zipfp.open(fn, "rU") as fp:
+ zipdata = fp.read()
self.assertEqual(self.arcdata[sep], zipdata)
+ if not isinstance(f, str):
+ f.close()
def readline_read_test(self, f, compression):
self.make_test_archive(f, compression)
@@ -1281,9 +1309,12 @@ class UniversalNewlineTests(unittest.TestCase):
break
data += read
+ zipopen.close()
self.assertEqual(data, self.arcdata['\n'])
zipfp.close()
+ if not isinstance(f, str):
+ f.close()
def readline_test(self, f, compression):
self.make_test_archive(f, compression)
@@ -1291,10 +1322,12 @@ class UniversalNewlineTests(unittest.TestCase):
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items():
- zipopen = zipfp.open(fn, "rU")
- for line in self.line_gen:
- linedata = zipopen.readline()
- self.assertEqual(linedata, line + b'\n')
+ with zipfp.open(fn, "rU") as zipopen:
+ for line in self.line_gen:
+ linedata = zipopen.readline()
+ self.assertEqual(linedata, line + b'\n')
+ if not isinstance(f, str):
+ f.close()
def readlines_test(self, f, compression):
self.make_test_archive(f, compression)
@@ -1302,9 +1335,12 @@ class UniversalNewlineTests(unittest.TestCase):
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items():
- ziplines = zipfp.open(fn, "rU").readlines()
+ with zipfp.open(fn, "rU") as fp:
+ ziplines = fp.readlines()
for line, zipline in zip(self.line_gen, ziplines):
self.assertEqual(zipline, line + b'\n')
+ if not isinstance(f, str):
+ f.close()
def iterlines_test(self, f, compression):
self.make_test_archive(f, compression)
@@ -1312,8 +1348,11 @@ class UniversalNewlineTests(unittest.TestCase):
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items():
- for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")):
- self.assertEqual(zipline, line + b'\n')
+ with zipfp.open(fn, "rU") as fp:
+ for line, zipline in zip(self.line_gen, fp):
+ self.assertEqual(zipline, line + b'\n')
+ if not isinstance(f, str):
+ f.close()
def test_read_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
diff --git a/Lib/zipfile.py b/Lib/zipfile.py
index a51dd21..184f4d2 100644
--- a/Lib/zipfile.py
+++ b/Lib/zipfile.py
@@ -874,7 +874,8 @@ class ZipFile:
def read(self, name, pwd=None):
"""Return file bytes (as a string) for name."""
- return self.open(name, "r", pwd).read()
+ with self.open(name, "r", pwd) as fp:
+ return fp.read()
def open(self, name, mode="r", pwd=None):
"""Return file-like object for 'name'."""