diff options
Diffstat (limited to 'Lib/tempfile.py')
-rw-r--r-- | Lib/tempfile.py | 97 |
1 files changed, 23 insertions, 74 deletions
diff --git a/Lib/tempfile.py b/Lib/tempfile.py index 6bc842f..05d4cab 100644 --- a/Lib/tempfile.py +++ b/Lib/tempfile.py @@ -27,7 +27,6 @@ __all__ = [ # Imports. -import atexit as _atexit import functools as _functools import warnings as _warnings import io as _io @@ -35,23 +34,7 @@ import os as _os import shutil as _shutil import errno as _errno from random import Random as _Random - -try: - import fcntl as _fcntl -except ImportError: - def _set_cloexec(fd): - pass -else: - def _set_cloexec(fd): - try: - flags = _fcntl.fcntl(fd, _fcntl.F_GETFD, 0) - except OSError: - pass - else: - # flags read successfully, modify - flags |= _fcntl.FD_CLOEXEC - _fcntl.fcntl(fd, _fcntl.F_SETFD, flags) - +import weakref as _weakref try: import _thread @@ -60,8 +43,6 @@ except ImportError: _allocate_lock = _thread.allocate_lock _text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL -if hasattr(_os, 'O_NOINHERIT'): - _text_openflags |= _os.O_NOINHERIT if hasattr(_os, 'O_NOFOLLOW'): _text_openflags |= _os.O_NOFOLLOW @@ -90,8 +71,8 @@ else: # Fallback. All we need is something that raises OSError if the # file doesn't exist. def _stat(fn): - f = open(fn) - f.close() + fd = _os.open(fn, _os.O_RDONLY) + os.close(fd) def _exists(fn): try: @@ -125,7 +106,7 @@ class _RandomNameSequence: def __next__(self): c = self.characters choose = self.rng.choice - letters = [choose(c) for dummy in "123456"] + letters = [choose(c) for dummy in range(8)] return ''.join(letters) def _candidate_tempdir_list(): @@ -167,7 +148,7 @@ def _get_default_tempdir(): for dir in dirlist: if dir != _os.curdir: - dir = _os.path.normcase(_os.path.abspath(dir)) + dir = _os.path.abspath(dir) # Try only a few names per directory. for seq in range(100): name = next(namer) @@ -217,7 +198,6 @@ def _mkstemp_inner(dir, pre, suf, flags): file = _os.path.join(dir, pre + name + suf) try: fd = _os.open(file, flags, 0o600) - _set_cloexec(fd) return (fd, _os.path.abspath(file)) except FileExistsError: continue # try again @@ -356,8 +336,7 @@ class _TemporaryFileCloser: underlying file object, without adding a __del__ method to the temporary file.""" - # Set here since __del__ checks it - file = None + file = None # Set here since __del__ checks it close_called = False def __init__(self, file, name, delete=True): @@ -682,10 +661,21 @@ class TemporaryDirectory(object): # Handle mkdtemp raising an exception name = None + _finalizer = None _closed = False def __init__(self, suffix="", prefix=template, dir=None): self.name = mkdtemp(suffix, prefix, dir) + self._finalizer = _weakref.finalize( + self, self._cleanup, self.name, + warn_message="Implicitly cleaning up {!r}".format(self)) + + @classmethod + def _cleanup(cls, name, warn_message=None): + _shutil.rmtree(name) + if warn_message is not None: + _warnings.warn(warn_message, ResourceWarning) + def __repr__(self): return "<{} {!r}>".format(self.__class__.__name__, self.name) @@ -693,53 +683,12 @@ class TemporaryDirectory(object): def __enter__(self): return self.name - def cleanup(self, _warn=False, _warnings=_warnings): - if self.name and not self._closed: - try: - _shutil.rmtree(self.name) - except (TypeError, AttributeError) as ex: - if "None" not in '%s' % (ex,): - raise - self._rmtree(self.name) - self._closed = True - if _warn and _warnings.warn: - try: - _warnings.warn("Implicitly cleaning up {!r}".format(self), - ResourceWarning) - except: - if _is_running: - raise - # Don't raise an exception if modules needed for emitting - # a warning are already cleaned in shutdown process. - def __exit__(self, exc, value, tb): self.cleanup() - def __del__(self): - # Issue a ResourceWarning if implicit cleanup needed - self.cleanup(_warn=True) - - def _rmtree(self, path, _OSError=OSError, _sep=_os.path.sep, - _listdir=_os.listdir, _remove=_os.remove, _rmdir=_os.rmdir): - # Essentially a stripped down version of shutil.rmtree. We can't - # use globals because they may be None'ed out at shutdown. - if not isinstance(path, str): - _sep = _sep.encode() - try: - for name in _listdir(path): - fullname = path + _sep + name - try: - _remove(fullname) - except _OSError: - self._rmtree(fullname) - _rmdir(path) - except _OSError: - pass - -_is_running = True - -def _on_shutdown(): - global _is_running - _is_running = False - -_atexit.register(_on_shutdown) + def cleanup(self): + if self._finalizer is not None: + self._finalizer.detach() + if self.name is not None and not self._closed: + _shutil.rmtree(self.name) + self._closed = True |