diff options
author | Ev2geny <ev2geny@gmail.com> | 2022-10-04 22:37:33 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-04 22:37:33 (GMT) |
commit | 743453a5541d3a6f19046459d22a7991ecb88223 (patch) | |
tree | bbecc400935d6ea5564292bba5c963e03520141a /Lib | |
parent | bbc7cd649a6ef56eb09278f3e746ca89b9d592c9 (diff) | |
download | cpython-743453a5541d3a6f19046459d22a7991ecb88223.zip cpython-743453a5541d3a6f19046459d22a7991ecb88223.tar.gz cpython-743453a5541d3a6f19046459d22a7991ecb88223.tar.bz2 |
gh-58451: Add optional delete_on_close parameter to NamedTemporaryFile (GH-97015)
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/tempfile.py | 76 | ||||
-rw-r--r-- | Lib/test/test_tempfile.py | 100 |
2 files changed, 139 insertions, 37 deletions
diff --git a/Lib/tempfile.py b/Lib/tempfile.py index 480c172..bb18d60 100644 --- a/Lib/tempfile.py +++ b/Lib/tempfile.py @@ -418,42 +418,42 @@ class _TemporaryFileCloser: underlying file object, without adding a __del__ method to the temporary file.""" - file = None # Set here since __del__ checks it + cleanup_called = False close_called = False - def __init__(self, file, name, delete=True): + def __init__(self, file, name, delete=True, delete_on_close=True): self.file = file self.name = name self.delete = delete + self.delete_on_close = delete_on_close - # NT provides delete-on-close as a primitive, so we don't need - # the wrapper to do anything special. We still use it so that - # file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile. - if _os.name != 'nt': - # Cache the unlinker so we don't get spurious errors at - # shutdown when the module-level "os" is None'd out. Note - # that this must be referenced as self.unlink, because the - # name TemporaryFileWrapper may also get None'd out before - # __del__ is called. - - def close(self, unlink=_os.unlink): - if not self.close_called and self.file is not None: - self.close_called = True - try: + def cleanup(self, windows=(_os.name == 'nt'), unlink=_os.unlink): + if not self.cleanup_called: + self.cleanup_called = True + try: + if not self.close_called: + self.close_called = True self.file.close() - finally: - if self.delete: + finally: + # Windows provides delete-on-close as a primitive, in which + # case the file was deleted by self.file.close(). + if self.delete and not (windows and self.delete_on_close): + try: unlink(self.name) + except FileNotFoundError: + pass - # Need to ensure the file is deleted on __del__ - def __del__(self): - self.close() - - else: - def close(self): - if not self.close_called: - self.close_called = True + def close(self): + if not self.close_called: + self.close_called = True + try: self.file.close() + finally: + if self.delete and self.delete_on_close: + self.cleanup() + + def __del__(self): + self.cleanup() class _TemporaryFileWrapper: @@ -464,11 +464,11 @@ class _TemporaryFileWrapper: remove the file when it is no longer needed. """ - def __init__(self, file, name, delete=True): + def __init__(self, file, name, delete=True, delete_on_close=True): self.file = file self.name = name - self.delete = delete - self._closer = _TemporaryFileCloser(file, name, delete) + self._closer = _TemporaryFileCloser(file, name, delete, + delete_on_close) def __getattr__(self, name): # Attribute lookups are delegated to the underlying file @@ -499,7 +499,7 @@ class _TemporaryFileWrapper: # deleted when used in a with statement def __exit__(self, exc, value, tb): result = self.file.__exit__(exc, value, tb) - self.close() + self._closer.cleanup() return result def close(self): @@ -518,10 +518,10 @@ class _TemporaryFileWrapper: for line in self.file: yield line - def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None, newline=None, suffix=None, prefix=None, - dir=None, delete=True, *, errors=None): + dir=None, delete=True, *, errors=None, + delete_on_close=True): """Create and return a temporary file. Arguments: 'prefix', 'suffix', 'dir' -- as for mkstemp. @@ -529,7 +529,10 @@ def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None, 'buffering' -- the buffer size argument to io.open (default -1). 'encoding' -- the encoding argument to io.open (default None) 'newline' -- the newline argument to io.open (default None) - 'delete' -- whether the file is deleted on close (default True). + 'delete' -- whether the file is automatically deleted (default True). + 'delete_on_close' -- if 'delete', whether the file is deleted on close + (default True) or otherwise either on context manager exit + (if context manager was used) or on object finalization. . 'errors' -- the errors argument to io.open (default None) The file is created as mkstemp() would do it. @@ -548,7 +551,7 @@ def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None, # Setting O_TEMPORARY in the flags causes the OS to delete # the file when it is closed. This is only supported by Windows. - if _os.name == 'nt' and delete: + if _os.name == 'nt' and delete and delete_on_close: flags |= _os.O_TEMPORARY if "b" not in mode: @@ -567,12 +570,13 @@ def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None, raw = getattr(file, 'buffer', file) raw = getattr(raw, 'raw', raw) raw.name = name - return _TemporaryFileWrapper(file, name, delete) + return _TemporaryFileWrapper(file, name, delete, delete_on_close) except: file.close() raise except: - if name is not None and not (_os.name == 'nt' and delete): + if name is not None and not ( + _os.name == 'nt' and delete and delete_on_close): _os.unlink(name) raise diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py index ccf7ea0..7c2c8de 100644 --- a/Lib/test/test_tempfile.py +++ b/Lib/test/test_tempfile.py @@ -11,6 +11,7 @@ import contextlib import stat import types import weakref +import gc from unittest import mock import unittest @@ -1013,6 +1014,102 @@ class TestNamedTemporaryFile(BaseTestCase): pass self.assertRaises(ValueError, use_closed) + def test_context_man_not_del_on_close_if_delete_on_close_false(self): + # Issue gh-58451: tempfile.NamedTemporaryFile is not particulary useful + # on Windows + # A NamedTemporaryFile is NOT deleted when closed if + # delete_on_close=False, but is deleted on context manager exit + dir = tempfile.mkdtemp() + try: + with tempfile.NamedTemporaryFile(dir=dir, + delete=True, + delete_on_close=False) as f: + f.write(b'blat') + f_name = f.name + f.close() + with self.subTest(): + # Testing that file is not deleted on close + self.assertTrue(os.path.exists(f.name), + f"NamedTemporaryFile {f.name!r} is incorrectly " + f"deleted on closure when delete_on_close=False") + + with self.subTest(): + # Testing that file is deleted on context manager exit + self.assertFalse(os.path.exists(f.name), + f"NamedTemporaryFile {f.name!r} exists " + f"after context manager exit") + + finally: + os.rmdir(dir) + + def test_context_man_ok_to_delete_manually(self): + # In the case of delete=True, a NamedTemporaryFile can be manually + # deleted in a with-statement context without causing an error. + dir = tempfile.mkdtemp() + try: + with tempfile.NamedTemporaryFile(dir=dir, + delete=True, + delete_on_close=False) as f: + f.write(b'blat') + f.close() + os.unlink(f.name) + + finally: + os.rmdir(dir) + + def test_context_man_not_del_if_delete_false(self): + # A NamedTemporaryFile is not deleted if delete = False + dir = tempfile.mkdtemp() + f_name = "" + try: + # Test that delete_on_close=True has no effect if delete=False. + with tempfile.NamedTemporaryFile(dir=dir, delete=False, + delete_on_close=True) as f: + f.write(b'blat') + f_name = f.name + self.assertTrue(os.path.exists(f.name), + f"NamedTemporaryFile {f.name!r} exists after close") + finally: + os.unlink(f_name) + os.rmdir(dir) + + def test_del_by_finalizer(self): + # A NamedTemporaryFile is deleted when finalized in the case of + # delete=True, delete_on_close=False, and no with-statement is used. + def my_func(dir): + f = tempfile.NamedTemporaryFile(dir=dir, delete=True, + delete_on_close=False) + tmp_name = f.name + f.write(b'blat') + # Testing extreme case, where the file is not explicitly closed + # f.close() + return tmp_name + # Make sure that the garbage collector has finalized the file object. + gc.collect() + dir = tempfile.mkdtemp() + try: + tmp_name = my_func(dir) + self.assertFalse(os.path.exists(tmp_name), + f"NamedTemporaryFile {tmp_name!r} " + f"exists after finalizer ") + finally: + os.rmdir(dir) + + def test_correct_finalizer_work_if_already_deleted(self): + # There should be no error in the case of delete=True, + # delete_on_close=False, no with-statement is used, and the file is + # deleted manually. + def my_func(dir)->str: + f = tempfile.NamedTemporaryFile(dir=dir, delete=True, + delete_on_close=False) + tmp_name = f.name + f.write(b'blat') + f.close() + os.unlink(tmp_name) + return tmp_name + # Make sure that the garbage collector has finalized the file object. + gc.collect() + def test_bad_mode(self): dir = tempfile.mkdtemp() self.addCleanup(os_helper.rmtree, dir) @@ -1081,7 +1178,8 @@ class TestSpooledTemporaryFile(BaseTestCase): missing_attrs = iobase_attrs - spooledtempfile_attrs self.assertFalse( missing_attrs, - 'SpooledTemporaryFile missing attributes from IOBase/BufferedIOBase/TextIOBase' + 'SpooledTemporaryFile missing attributes from ' + 'IOBase/BufferedIOBase/TextIOBase' ) def test_del_on_close(self): |