From 0127de0b877600a95871e07aee8a092e9199002a Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Wed, 13 Feb 2013 00:34:46 +0200 Subject: Issue #16800: tempfile.gettempdir() no longer left temporary files when the disk is full. Original patch by Amir Szekely. --- Lib/tempfile.py | 18 +++++++++------ Lib/test/test_support.py | 27 +++++++++++++++++++++++ Lib/test/test_tempfile.py | 56 ++++++++++++++++++++++++++++++++++++++++++----- Misc/ACKS | 1 + Misc/NEWS | 3 +++ 5 files changed, 93 insertions(+), 12 deletions(-) diff --git a/Lib/tempfile.py b/Lib/tempfile.py index 14e4f1c..d2af8a6 100644 --- a/Lib/tempfile.py +++ b/Lib/tempfile.py @@ -29,6 +29,7 @@ __all__ = [ # Imports. +import io as _io import os as _os import errno as _errno from random import Random as _Random @@ -193,14 +194,17 @@ def _get_default_tempdir(): name = namer.next() filename = _os.path.join(dir, name) try: - fd = _os.open(filename, flags, 0600) - fp = _os.fdopen(fd, 'w') - fp.write('blat') - fp.close() - _os.unlink(filename) - del fp, fd + fd = _os.open(filename, flags, 0o600) + try: + try: + fp = _io.open(fd, 'wb', buffering=0, closefd=False) + fp.write(b'blat') + finally: + _os.close(fd) + finally: + _os.unlink(filename) return dir - except (OSError, IOError), e: + except (OSError, IOError) as e: if e[0] != _errno.EEXIST: break # no point trying more names in this directory pass diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index 0792c45..cc74d07 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -1298,6 +1298,33 @@ def reap_children(): except: break +@contextlib.contextmanager +def swap_attr(obj, attr, new_val): + """Temporary swap out an attribute with a new object. + + Usage: + with swap_attr(obj, "attr", 5): + ... + + This will set obj.attr to 5 for the duration of the with: block, + restoring the old value at the end of the block. If `attr` doesn't + exist on `obj`, it will be created and then deleted at the end of the + block. + """ + if hasattr(obj, attr): + real_val = getattr(obj, attr) + setattr(obj, attr, new_val) + try: + yield + finally: + setattr(obj, attr, real_val) + else: + setattr(obj, attr, new_val) + try: + yield + finally: + delattr(obj, attr) + def py3k_bytes(b): """Emulate the py3k bytes() constructor. diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py index 1b219c8..b4d23ad 100644 --- a/Lib/test/test_tempfile.py +++ b/Lib/test/test_tempfile.py @@ -1,13 +1,16 @@ # tempfile.py unit tests. import tempfile +import errno +import io import os import signal +import shutil import sys import re import warnings import unittest -from test import test_support +from test import test_support as support warnings.filterwarnings("ignore", category=RuntimeWarning, @@ -177,7 +180,7 @@ class test__candidate_tempdir_list(TC): # _candidate_tempdir_list contains the expected directories # Make sure the interesting environment variables are all set. - with test_support.EnvironmentVarGuard() as env: + with support.EnvironmentVarGuard() as env: for envname in 'TMPDIR', 'TEMP', 'TMP': dirname = os.getenv(envname) if not dirname: @@ -202,8 +205,51 @@ class test__candidate_tempdir_list(TC): test_classes.append(test__candidate_tempdir_list) +# We test _get_default_tempdir some more by testing gettempdir. -# We test _get_default_tempdir by testing gettempdir. +class TestGetDefaultTempdir(TC): + """Test _get_default_tempdir().""" + + def test_no_files_left_behind(self): + # use a private empty directory + our_temp_directory = tempfile.mkdtemp() + try: + # force _get_default_tempdir() to consider our empty directory + def our_candidate_list(): + return [our_temp_directory] + + with support.swap_attr(tempfile, "_candidate_tempdir_list", + our_candidate_list): + # verify our directory is empty after _get_default_tempdir() + tempfile._get_default_tempdir() + self.assertEqual(os.listdir(our_temp_directory), []) + + def raise_OSError(*args, **kwargs): + raise OSError(-1) + + with support.swap_attr(io, "open", raise_OSError): + # test again with failing io.open() + with self.assertRaises(IOError) as cm: + tempfile._get_default_tempdir() + self.assertEqual(cm.exception.errno, errno.ENOENT) + self.assertEqual(os.listdir(our_temp_directory), []) + + open = io.open + def bad_writer(*args, **kwargs): + fp = open(*args, **kwargs) + fp.write = raise_OSError + return fp + + with support.swap_attr(io, "open", bad_writer): + # test again with failing write() + with self.assertRaises(IOError) as cm: + tempfile._get_default_tempdir() + self.assertEqual(cm.exception.errno, errno.ENOENT) + self.assertEqual(os.listdir(our_temp_directory), []) + finally: + shutil.rmtree(our_temp_directory) + +test_classes.append(TestGetDefaultTempdir) class test__get_candidate_names(TC): @@ -299,7 +345,7 @@ class test__mkstemp_inner(TC): if not has_spawnl: return # ugh, can't use SkipTest. - if test_support.verbose: + if support.verbose: v="v" else: v="q" @@ -913,7 +959,7 @@ if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: test_classes.append(test_TemporaryFile) def test_main(): - test_support.run_unittest(*test_classes) + support.run_unittest(*test_classes) if __name__ == "__main__": test_main() diff --git a/Misc/ACKS b/Misc/ACKS index 15bf415..cd7ffee 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -974,6 +974,7 @@ Kalle Svensson Paul Swartz Thenault Sylvain Péter Szabó +Amir Szekely Arfrever Frehtes Taifersar Arahesis Geoff Talvola William Tanksley diff --git a/Misc/NEWS b/Misc/NEWS index b3d28fd..847ccf9 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -202,6 +202,9 @@ Core and Builtins Library ------- +- Issue #16800: tempfile.gettempdir() no longer left temporary files when + the disk is full. Original patch by Amir Szekely. + - Issue #13555: cPickle now supports files larger than 2 GiB. - Issue #17052: unittest discovery should use self.testLoader. -- cgit v0.12