summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorSerhiy Storchaka <storchaka@gmail.com>2013-02-12 22:34:46 (GMT)
committerSerhiy Storchaka <storchaka@gmail.com>2013-02-12 22:34:46 (GMT)
commit0127de0b877600a95871e07aee8a092e9199002a (patch)
tree3a94dc1b1e70e1958e8e6ccccffb510befc0532a /Lib
parentcdc7a91dde9221f5cf857268f7900e7a7d64619b (diff)
downloadcpython-0127de0b877600a95871e07aee8a092e9199002a.zip
cpython-0127de0b877600a95871e07aee8a092e9199002a.tar.gz
cpython-0127de0b877600a95871e07aee8a092e9199002a.tar.bz2
Issue #16800: tempfile.gettempdir() no longer left temporary files when
the disk is full. Original patch by Amir Szekely.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/tempfile.py18
-rw-r--r--Lib/test/test_support.py27
-rw-r--r--Lib/test/test_tempfile.py56
3 files changed, 89 insertions, 12 deletions
diff --git a/Lib/tempfile.py b/Lib/tempfile.py
index 14e4f1c..d2af8a6 100644
--- a/Lib/tempfile.py
+++ b/Lib/tempfile.py
@@ -29,6 +29,7 @@ __all__ = [
# Imports.
+import io as _io
import os as _os
import errno as _errno
from random import Random as _Random
@@ -193,14 +194,17 @@ def _get_default_tempdir():
name = namer.next()
filename = _os.path.join(dir, name)
try:
- fd = _os.open(filename, flags, 0600)
- fp = _os.fdopen(fd, 'w')
- fp.write('blat')
- fp.close()
- _os.unlink(filename)
- del fp, fd
+ fd = _os.open(filename, flags, 0o600)
+ try:
+ try:
+ fp = _io.open(fd, 'wb', buffering=0, closefd=False)
+ fp.write(b'blat')
+ finally:
+ _os.close(fd)
+ finally:
+ _os.unlink(filename)
return dir
- except (OSError, IOError), e:
+ except (OSError, IOError) as e:
if e[0] != _errno.EEXIST:
break # no point trying more names in this directory
pass
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index 0792c45..cc74d07 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -1298,6 +1298,33 @@ def reap_children():
except:
break
+@contextlib.contextmanager
+def swap_attr(obj, attr, new_val):
+ """Temporary swap out an attribute with a new object.
+
+ Usage:
+ with swap_attr(obj, "attr", 5):
+ ...
+
+ This will set obj.attr to 5 for the duration of the with: block,
+ restoring the old value at the end of the block. If `attr` doesn't
+ exist on `obj`, it will be created and then deleted at the end of the
+ block.
+ """
+ if hasattr(obj, attr):
+ real_val = getattr(obj, attr)
+ setattr(obj, attr, new_val)
+ try:
+ yield
+ finally:
+ setattr(obj, attr, real_val)
+ else:
+ setattr(obj, attr, new_val)
+ try:
+ yield
+ finally:
+ delattr(obj, attr)
+
def py3k_bytes(b):
"""Emulate the py3k bytes() constructor.
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py
index 1b219c8..b4d23ad 100644
--- a/Lib/test/test_tempfile.py
+++ b/Lib/test/test_tempfile.py
@@ -1,13 +1,16 @@
# tempfile.py unit tests.
import tempfile
+import errno
+import io
import os
import signal
+import shutil
import sys
import re
import warnings
import unittest
-from test import test_support
+from test import test_support as support
warnings.filterwarnings("ignore",
category=RuntimeWarning,
@@ -177,7 +180,7 @@ class test__candidate_tempdir_list(TC):
# _candidate_tempdir_list contains the expected directories
# Make sure the interesting environment variables are all set.
- with test_support.EnvironmentVarGuard() as env:
+ with support.EnvironmentVarGuard() as env:
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = os.getenv(envname)
if not dirname:
@@ -202,8 +205,51 @@ class test__candidate_tempdir_list(TC):
test_classes.append(test__candidate_tempdir_list)
+# We test _get_default_tempdir some more by testing gettempdir.
-# We test _get_default_tempdir by testing gettempdir.
+class TestGetDefaultTempdir(TC):
+ """Test _get_default_tempdir()."""
+
+ def test_no_files_left_behind(self):
+ # use a private empty directory
+ our_temp_directory = tempfile.mkdtemp()
+ try:
+ # force _get_default_tempdir() to consider our empty directory
+ def our_candidate_list():
+ return [our_temp_directory]
+
+ with support.swap_attr(tempfile, "_candidate_tempdir_list",
+ our_candidate_list):
+ # verify our directory is empty after _get_default_tempdir()
+ tempfile._get_default_tempdir()
+ self.assertEqual(os.listdir(our_temp_directory), [])
+
+ def raise_OSError(*args, **kwargs):
+ raise OSError(-1)
+
+ with support.swap_attr(io, "open", raise_OSError):
+ # test again with failing io.open()
+ with self.assertRaises(IOError) as cm:
+ tempfile._get_default_tempdir()
+ self.assertEqual(cm.exception.errno, errno.ENOENT)
+ self.assertEqual(os.listdir(our_temp_directory), [])
+
+ open = io.open
+ def bad_writer(*args, **kwargs):
+ fp = open(*args, **kwargs)
+ fp.write = raise_OSError
+ return fp
+
+ with support.swap_attr(io, "open", bad_writer):
+ # test again with failing write()
+ with self.assertRaises(IOError) as cm:
+ tempfile._get_default_tempdir()
+ self.assertEqual(cm.exception.errno, errno.ENOENT)
+ self.assertEqual(os.listdir(our_temp_directory), [])
+ finally:
+ shutil.rmtree(our_temp_directory)
+
+test_classes.append(TestGetDefaultTempdir)
class test__get_candidate_names(TC):
@@ -299,7 +345,7 @@ class test__mkstemp_inner(TC):
if not has_spawnl:
return # ugh, can't use SkipTest.
- if test_support.verbose:
+ if support.verbose:
v="v"
else:
v="q"
@@ -913,7 +959,7 @@ if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
test_classes.append(test_TemporaryFile)
def test_main():
- test_support.run_unittest(*test_classes)
+ support.run_unittest(*test_classes)
if __name__ == "__main__":
test_main()