diff options
author | Serhiy Storchaka <storchaka@gmail.com> | 2013-02-12 22:34:46 (GMT) |
---|---|---|
committer | Serhiy Storchaka <storchaka@gmail.com> | 2013-02-12 22:34:46 (GMT) |
commit | 0127de0b877600a95871e07aee8a092e9199002a (patch) | |
tree | 3a94dc1b1e70e1958e8e6ccccffb510befc0532a /Lib | |
parent | cdc7a91dde9221f5cf857268f7900e7a7d64619b (diff) | |
download | cpython-0127de0b877600a95871e07aee8a092e9199002a.zip cpython-0127de0b877600a95871e07aee8a092e9199002a.tar.gz cpython-0127de0b877600a95871e07aee8a092e9199002a.tar.bz2 |
Issue #16800: tempfile.gettempdir() no longer left temporary files when
the disk is full. Original patch by Amir Szekely.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/tempfile.py | 18 | ||||
-rw-r--r-- | Lib/test/test_support.py | 27 | ||||
-rw-r--r-- | Lib/test/test_tempfile.py | 56 |
3 files changed, 89 insertions, 12 deletions
diff --git a/Lib/tempfile.py b/Lib/tempfile.py index 14e4f1c..d2af8a6 100644 --- a/Lib/tempfile.py +++ b/Lib/tempfile.py @@ -29,6 +29,7 @@ __all__ = [ # Imports. +import io as _io import os as _os import errno as _errno from random import Random as _Random @@ -193,14 +194,17 @@ def _get_default_tempdir(): name = namer.next() filename = _os.path.join(dir, name) try: - fd = _os.open(filename, flags, 0600) - fp = _os.fdopen(fd, 'w') - fp.write('blat') - fp.close() - _os.unlink(filename) - del fp, fd + fd = _os.open(filename, flags, 0o600) + try: + try: + fp = _io.open(fd, 'wb', buffering=0, closefd=False) + fp.write(b'blat') + finally: + _os.close(fd) + finally: + _os.unlink(filename) return dir - except (OSError, IOError), e: + except (OSError, IOError) as e: if e[0] != _errno.EEXIST: break # no point trying more names in this directory pass diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index 0792c45..cc74d07 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -1298,6 +1298,33 @@ def reap_children(): except: break +@contextlib.contextmanager +def swap_attr(obj, attr, new_val): + """Temporary swap out an attribute with a new object. + + Usage: + with swap_attr(obj, "attr", 5): + ... + + This will set obj.attr to 5 for the duration of the with: block, + restoring the old value at the end of the block. If `attr` doesn't + exist on `obj`, it will be created and then deleted at the end of the + block. + """ + if hasattr(obj, attr): + real_val = getattr(obj, attr) + setattr(obj, attr, new_val) + try: + yield + finally: + setattr(obj, attr, real_val) + else: + setattr(obj, attr, new_val) + try: + yield + finally: + delattr(obj, attr) + def py3k_bytes(b): """Emulate the py3k bytes() constructor. diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py index 1b219c8..b4d23ad 100644 --- a/Lib/test/test_tempfile.py +++ b/Lib/test/test_tempfile.py @@ -1,13 +1,16 @@ # tempfile.py unit tests. import tempfile +import errno +import io import os import signal +import shutil import sys import re import warnings import unittest -from test import test_support +from test import test_support as support warnings.filterwarnings("ignore", category=RuntimeWarning, @@ -177,7 +180,7 @@ class test__candidate_tempdir_list(TC): # _candidate_tempdir_list contains the expected directories # Make sure the interesting environment variables are all set. - with test_support.EnvironmentVarGuard() as env: + with support.EnvironmentVarGuard() as env: for envname in 'TMPDIR', 'TEMP', 'TMP': dirname = os.getenv(envname) if not dirname: @@ -202,8 +205,51 @@ class test__candidate_tempdir_list(TC): test_classes.append(test__candidate_tempdir_list) +# We test _get_default_tempdir some more by testing gettempdir. -# We test _get_default_tempdir by testing gettempdir. +class TestGetDefaultTempdir(TC): + """Test _get_default_tempdir().""" + + def test_no_files_left_behind(self): + # use a private empty directory + our_temp_directory = tempfile.mkdtemp() + try: + # force _get_default_tempdir() to consider our empty directory + def our_candidate_list(): + return [our_temp_directory] + + with support.swap_attr(tempfile, "_candidate_tempdir_list", + our_candidate_list): + # verify our directory is empty after _get_default_tempdir() + tempfile._get_default_tempdir() + self.assertEqual(os.listdir(our_temp_directory), []) + + def raise_OSError(*args, **kwargs): + raise OSError(-1) + + with support.swap_attr(io, "open", raise_OSError): + # test again with failing io.open() + with self.assertRaises(IOError) as cm: + tempfile._get_default_tempdir() + self.assertEqual(cm.exception.errno, errno.ENOENT) + self.assertEqual(os.listdir(our_temp_directory), []) + + open = io.open + def bad_writer(*args, **kwargs): + fp = open(*args, **kwargs) + fp.write = raise_OSError + return fp + + with support.swap_attr(io, "open", bad_writer): + # test again with failing write() + with self.assertRaises(IOError) as cm: + tempfile._get_default_tempdir() + self.assertEqual(cm.exception.errno, errno.ENOENT) + self.assertEqual(os.listdir(our_temp_directory), []) + finally: + shutil.rmtree(our_temp_directory) + +test_classes.append(TestGetDefaultTempdir) class test__get_candidate_names(TC): @@ -299,7 +345,7 @@ class test__mkstemp_inner(TC): if not has_spawnl: return # ugh, can't use SkipTest. - if test_support.verbose: + if support.verbose: v="v" else: v="q" @@ -913,7 +959,7 @@ if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: test_classes.append(test_TemporaryFile) def test_main(): - test_support.run_unittest(*test_classes) + support.run_unittest(*test_classes) if __name__ == "__main__": test_main() |