summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/tempfile.py621
-rw-r--r--Lib/test/test_tempfile.py711
2 files changed, 1100 insertions, 232 deletions
diff --git a/Lib/tempfile.py b/Lib/tempfile.py
index eb8253c..f3cc481 100644
--- a/Lib/tempfile.py
+++ b/Lib/tempfile.py
@@ -1,177 +1,360 @@
-"""Temporary files and filenames."""
+"""Temporary files.
-# XXX This tries to be not UNIX specific, but I don't know beans about
-# how to choose a temp directory or filename on MS-DOS or other
-# systems so it may have to be changed...
+This module provides generic, low- and high-level interfaces for
+creating temporary files and directories. The interfaces listed
+as "safe" just below can be used without fear of race conditions.
+Those listed as "unsafe" cannot, and are provided for backward
+compatibility only.
-import os
+This module also provides some data items to the user:
-__all__ = ["mktemp", "TemporaryFile", "tempdir", "gettempprefix"]
+ TMP_MAX - maximum number of names that will be tried before
+ giving up.
+ template - the default prefix for all temporary names.
+ You may change this to control the default prefix.
+ tempdir - If this is set to a string before the first use of
+ any routine from this module, it will be considered as
+ another candidate location to store temporary files.
+"""
+
+__all__ = [
+ "NamedTemporaryFile", "TemporaryFile", # high level safe interfaces
+ "mkstemp", "mkdtemp", # low level safe interfaces
+ "mktemp", # deprecated unsafe interface
+ "TMP_MAX", "gettempprefix", # constants
+ "tempdir", "gettempdir"
+ ]
+
+
+# Imports.
+
+import os as _os
+import errno as _errno
+from random import Random as _Random
+
+if _os.name == 'mac':
+ import macfs as _macfs
+ import MACFS as _MACFS
+
+try:
+ import fcntl as _fcntl
+ def _set_cloexec(fd):
+ flags = _fcntl.fcntl(fd, _fcntl.F_GETFD, 0)
+ if flags >= 0:
+ # flags read successfully, modify
+ flags |= _fcntl.FD_CLOEXEC
+ _fcntl.fcntl(fd, _fcntl.F_SETFD, flags)
+except (ImportError, AttributeError):
+ def _set_cloexec(fd):
+ pass
+
+try:
+ import thread as _thread
+ _allocate_lock = _thread.allocate_lock
+except (ImportError, AttributeError):
+ class _allocate_lock:
+ def acquire(self):
+ pass
+ release = acquire
+
+_text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL
+if hasattr(_os, 'O_NOINHERIT'): _text_openflags |= _os.O_NOINHERIT
+if hasattr(_os, 'O_NOFOLLOW'): _text_openflags |= _os.O_NOFOLLOW
+
+_bin_openflags = _text_openflags
+if hasattr(_os, 'O_BINARY'): _bin_openflags |= _os.O_BINARY
+
+if hasattr(_os, 'TMP_MAX'):
+ TMP_MAX = _os.TMP_MAX
+else:
+ TMP_MAX = 10000
+
+if _os.name == 'nt':
+ template = '~t' # cater to eight-letter limit
+else:
+ template = "tmp"
-# Parameters that the caller may set to override the defaults
tempdir = None
-template = None
-def gettempdir():
- """Function to calculate the directory to use."""
- global tempdir
- if tempdir is not None:
- return tempdir
-
- # _gettempdir_inner deduces whether a candidate temp dir is usable by
- # trying to create a file in it, and write to it. If that succeeds,
- # great, it closes the file and unlinks it. There's a race, though:
- # the *name* of the test file it tries is the same across all threads
- # under most OSes (Linux is an exception), and letting multiple threads
- # all try to open, write to, close, and unlink a single file can cause
- # a variety of bogus errors (e.g., you cannot unlink a file under
- # Windows if anyone has it open, and two threads cannot create the
- # same file in O_EXCL mode under Unix). The simplest cure is to serialize
- # calls to _gettempdir_inner. This isn't a real expense, because the
- # first thread to succeed sets the global tempdir, and all subsequent
- # calls to gettempdir() reuse that without trying _gettempdir_inner.
- _tempdir_lock.acquire()
+# Internal routines.
+
+_once_lock = _allocate_lock()
+
+def _once(var, initializer):
+ """Wrapper to execute an initialization operation just once,
+ even if multiple threads reach the same point at the same time.
+
+ var is the name (as a string) of the variable to be entered into
+ the current global namespace.
+
+ initializer is a callable which will return the appropriate initial
+ value for variable. It will be called only if variable is not
+ present in the global namespace, or its current value is None.
+
+ Do not call _once from inside an initializer routine, it will deadlock.
+ """
+
+ vars = globals()
+ lock = _once_lock
+
+ # Check first outside the lock.
+ if var in vars and vars[var] is not None:
+ return
try:
- return _gettempdir_inner()
+ lock.acquire()
+ # Check again inside the lock.
+ if var in vars and vars[var] is not None:
+ return
+ vars[var] = initializer()
finally:
- _tempdir_lock.release()
+ lock.release()
+
+class _RandomNameSequence:
+ """An instance of _RandomNameSequence generates an endless
+ sequence of unpredictable strings which can safely be incorporated
+ into file names. Each string is six characters long. Multiple
+ threads can safely use the same instance at the same time.
+
+ _RandomNameSequence is an iterator."""
+
+ characters = ( "abcdefghijklmnopqrstuvwxyz"
+ + "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ + "0123456789-_")
+
+ def __init__(self):
+ self.mutex = _allocate_lock()
+ self.rng = _Random()
+ self.normcase = _os.path.normcase
+ def __iter__(self):
+ return self
+
+ def next(self):
+ m = self.mutex
+ c = self.characters
+ r = self.rng
-def _gettempdir_inner():
- """Function to calculate the directory to use."""
- global tempdir
- if tempdir is not None:
- return tempdir
- try:
- pwd = os.getcwd()
- except (AttributeError, os.error):
- pwd = os.curdir
- attempdirs = ['/tmp', '/var/tmp', '/usr/tmp', pwd]
- if os.name == 'nt':
- attempdirs.insert(0, 'C:\\TEMP')
- attempdirs.insert(0, '\\TEMP')
- elif os.name == 'mac':
- import macfs, MACFS
try:
- refnum, dirid = macfs.FindFolder(MACFS.kOnSystemDisk,
- MACFS.kTemporaryFolderType, 1)
- dirname = macfs.FSSpec((refnum, dirid, '')).as_pathname()
- attempdirs.insert(0, dirname)
- except macfs.error:
- pass
- elif os.name == 'riscos':
- scrapdir = os.getenv('Wimp$ScrapDir')
- if scrapdir:
- attempdirs.insert(0, scrapdir)
+ m.acquire()
+ letters = ''.join([r.choice(c), r.choice(c), r.choice(c),
+ r.choice(c), r.choice(c), r.choice(c)])
+ finally:
+ m.release()
+
+ return self.normcase(letters)
+
+def _candidate_tempdir_list():
+ """Generate a list of candidate temporary directories which
+ _get_default_tempdir will try."""
+
+ dirlist = []
+
+ # First, try the environment.
for envname in 'TMPDIR', 'TEMP', 'TMP':
- if envname in os.environ:
- attempdirs.insert(0, os.environ[envname])
- testfile = gettempprefix() + 'test'
- for dir in attempdirs:
+ dirname = _os.getenv(envname)
+ if dirname: dirlist.append(dirname)
+
+ # Failing that, try OS-specific locations.
+ if _os.name == 'mac':
try:
- filename = os.path.join(dir, testfile)
- if os.name == 'posix':
- try:
- fd = os.open(filename,
- os.O_RDWR | os.O_CREAT | os.O_EXCL, 0700)
- except OSError:
- pass
- else:
- fp = os.fdopen(fd, 'w')
- fp.write('blat')
- fp.close()
- os.unlink(filename)
- del fp, fd
- tempdir = dir
- break
- else:
- fp = open(filename, 'w')
+ refnum, dirid = _macfs.FindFolder(_MACFS.kOnSystemDisk,
+ _MACFS.kTemporaryFolderType, 1)
+ dirname = _macfs.FSSpec((refnum, dirid, '')).as_pathname()
+ dirlist.append(dirname)
+ except _macfs.error:
+ pass
+ elif _os.name == 'riscos':
+ dirname = _os.getenv('Wimp$ScrapDir')
+ if dirname: dirlist.append(dirname)
+ elif _os.name == 'nt':
+ dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
+ else:
+ dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
+
+ # As a last resort, the current directory.
+ try:
+ dirlist.append(_os.getcwd())
+ except (AttributeError, _os.error):
+ dirlist.append(_os.curdir)
+
+ return dirlist
+
+def _get_default_tempdir():
+ """Calculate the default directory to use for temporary files.
+ This routine should be called through '_once' (see above) as we
+ do not want multiple threads attempting this calculation simultaneously.
+
+ We determine whether or not a candidate temp dir is usable by
+ trying to create and write to a file in that directory. If this
+ is successful, the test file is deleted. To prevent denial of
+ service, the name of the test file must be randomized."""
+
+ namer = _RandomNameSequence()
+ dirlist = _candidate_tempdir_list()
+ flags = _text_openflags
+
+ for dir in dirlist:
+ if dir != _os.curdir:
+ dir = _os.path.normcase(_os.path.abspath(dir))
+ # Try only a few names per directory.
+ for seq in xrange(100):
+ name = namer.next()
+ filename = _os.path.join(dir, name)
+ try:
+ fd = _os.open(filename, flags, 0600)
+ fp = _os.fdopen(fd, 'w')
fp.write('blat')
fp.close()
- os.unlink(filename)
- tempdir = dir
- break
- except IOError:
- pass
- if tempdir is None:
- msg = "Can't find a usable temporary directory amongst " + `attempdirs`
- raise IOError, msg
- return tempdir
+ _os.unlink(filename)
+ del fp, fd
+ return dir
+ except (OSError, IOError), e:
+ if e[0] != _errno.EEXIST:
+ break # no point trying more names in this directory
+ pass
+ raise IOError, (_errno.ENOENT,
+ ("No usable temporary directory found in %s" % dirlist))
+def _get_candidate_names():
+ """Common setup sequence for all user-callable interfaces."""
-# template caches the result of gettempprefix, for speed, when possible.
-# XXX unclear why this isn't "_template"; left it "template" for backward
-# compatibility.
-if os.name == "posix":
- # We don't try to cache the template on posix: the pid may change on us
- # between calls due to a fork, and on Linux the pid changes even for
- # another thread in the same process. Since any attempt to keep the
- # cache in synch would have to call os.getpid() anyway in order to make
- # sure the pid hasn't changed between calls, a cache wouldn't save any
- # time. In addition, a cache is difficult to keep correct with the pid
- # changing willy-nilly, and earlier attempts proved buggy (races).
- template = None
-
-# Else the pid never changes, so gettempprefix always returns the same
-# string.
-elif os.name == "nt":
- template = '~' + `os.getpid()` + '-'
-elif os.name in ('mac', 'riscos'):
- template = 'Python-Tmp-'
-else:
- template = 'tmp' # XXX might choose a better one
+ _once('_name_sequence', _RandomNameSequence)
+ return _name_sequence
+
+
+def _mkstemp_inner(dir, pre, suf, flags):
+ """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile."""
+
+ names = _get_candidate_names()
+
+ for seq in xrange(TMP_MAX):
+ name = names.next()
+ file = _os.path.join(dir, pre + name + suf)
+ try:
+ fd = _os.open(file, flags, 0600)
+ _set_cloexec(fd)
+ return (fd, file)
+ except OSError, e:
+ if e.errno == _errno.EEXIST:
+ continue # try again
+ raise
+
+ raise IOError, (_errno.EEXIST, "No usable temporary file name found")
+
+
+# User visible interfaces.
def gettempprefix():
- """Function to calculate a prefix of the filename to use.
+ """Accessor for tempdir.template."""
+ return template
+
+def gettempdir():
+ """Accessor for tempdir.tempdir."""
+ _once('tempdir', _get_default_tempdir)
+ return tempdir
+
+def mkstemp(suffix="", prefix=template, dir=gettempdir(), binary=1):
+ """mkstemp([suffix, [prefix, [dir, [binary]]]])
+ User-callable function to create and return a unique temporary
+ file. The return value is a pair (fd, name) where fd is the
+ file descriptor returned by os.open, and name is the filename.
+
+ If 'suffix' is specified, the file name will end with that suffix,
+ otherwise there will be no suffix.
+
+ If 'prefix' is specified, the file name will begin with that prefix,
+ otherwise a default prefix is used.
+
+ If 'dir' is specified, the file will be created in that directory,
+ otherwise a default directory is used.
+
+ If 'binary' is specified and false, the file is opened in binary
+ mode. Otherwise, the file is opened in text mode. On some
+ operating systems, this makes no difference.
+
+ The file is readable and writable only by the creating user ID.
+ If the operating system uses permission bits to indicate whether a
+ file is executable, the file is executable by no one. The file
+ descriptor is not inherited by children of this process.
- This incorporates the current process id on systems that support such a
- notion, so that concurrent processes don't generate the same prefix.
+ Caller is responsible for deleting the file when done with it.
"""
- global template
- if template is None:
- return '@' + `os.getpid()` + '.'
+ if binary:
+ flags = _bin_openflags
else:
- return template
+ flags = _text_openflags
+ return _mkstemp_inner(dir, prefix, suffix, flags)
-def mktemp(suffix=""):
- """User-callable function to return a unique temporary file name."""
- dir = gettempdir()
- pre = gettempprefix()
- while 1:
- i = _counter.get_next()
- file = os.path.join(dir, pre + str(i) + suffix)
- if not os.path.exists(file):
+
+def mkdtemp(suffix="", prefix=template, dir=gettempdir()):
+ """mkdtemp([suffix, [prefix, [dir]]])
+ User-callable function to create and return a unique temporary
+ directory. The return value is the pathname of the directory.
+
+ Arguments are as for mkstemp, except that the 'binary' argument is
+ not accepted.
+
+ The directory is readable, writable, and searchable only by the
+ creating user.
+
+ Caller is responsible for deleting the directory when done with it.
+ """
+
+ names = _get_candidate_names()
+
+ for seq in xrange(TMP_MAX):
+ name = names.next()
+ file = _os.path.join(dir, prefix + name + suffix)
+ try:
+ _os.mkdir(file, 0700)
return file
+ except OSError, e:
+ if e.errno == _errno.EEXIST:
+ continue # try again
+ raise
+ raise IOError, (_errno.EEXIST, "No usable temporary directory name found")
-class TemporaryFileWrapper:
- """Temporary file wrapper
+def mktemp(suffix="", prefix=template, dir=gettempdir()):
+ """mktemp([suffix, [prefix, [dir]]])
+ User-callable function to return a unique temporary file name. The
+ file is not created.
- This class provides a wrapper around files opened for temporary use.
- In particular, it seeks to automatically remove the file when it is
- no longer needed.
+ Arguments are as for mkstemp, except that the 'binary' argument is
+ not accepted.
+
+ This function is unsafe and should not be used. The file name
+ refers to a file that did not exist at some point, but by the time
+ you get around to creating it, someone else may have beaten you to
+ the punch.
"""
- # Cache the unlinker so we don't get spurious errors at shutdown
- # when the module-level "os" is None'd out. Note that this must
- # be referenced as self.unlink, because the name TemporaryFileWrapper
- # may also get None'd out before __del__ is called.
- unlink = os.unlink
+ from warnings import warn as _warn
+ _warn("mktemp is a potential security risk to your program",
+ RuntimeWarning, stacklevel=2)
- def __init__(self, file, path):
- self.file = file
- self.path = path
- self.close_called = 0
+ names = _get_candidate_names()
+ for seq in xrange(TMP_MAX):
+ name = names.next()
+ file = _os.path.join(dir, prefix + name + suffix)
+ if not _os.path.exists(file):
+ return file
+
+ raise IOError, (_errno.EEXIST, "No usable temporary filename found")
- def close(self):
- if not self.close_called:
- self.close_called = 1
- self.file.close()
- self.unlink(self.path)
+class _TemporaryFileWrapper:
+ """Temporary file wrapper
+
+ This class provides a wrapper around files opened for
+ temporary use. In particular, it seeks to automatically
+ remove the file when it is no longer needed.
+ """
- def __del__(self):
- self.close()
+ def __init__(self, file, name):
+ self.file = file
+ self.name = name
+ self.close_called = 0
def __getattr__(self, name):
file = self.__dict__['file']
@@ -180,93 +363,81 @@ class TemporaryFileWrapper:
setattr(self, name, a)
return a
-try:
- import fcntl as _fcntl
- def _set_cloexec(fd, flag=_fcntl.FD_CLOEXEC):
- flags = _fcntl.fcntl(fd, _fcntl.F_GETFD, 0)
- if flags >= 0:
- # flags read successfully, modify
- flags |= flag
- _fcntl.fcntl(fd, _fcntl.F_SETFD, flags)
-except (ImportError, AttributeError):
- def _set_cloexec(fd):
- pass
-
-def TemporaryFile(mode='w+b', bufsize=-1, suffix=""):
- """Create and return a temporary file (opened read-write by default)."""
- name = mktemp(suffix)
- if os.name == 'posix':
- # Unix -- be very careful
- fd = os.open(name, os.O_RDWR|os.O_CREAT|os.O_EXCL, 0700)
- _set_cloexec(fd)
- try:
- os.unlink(name)
- return os.fdopen(fd, mode, bufsize)
- except:
- os.close(fd)
- raise
- elif os.name == 'nt':
- # Windows -- can't unlink an open file, but O_TEMPORARY creates a
- # file that "deletes itself" when the last handle is closed.
- # O_NOINHERIT ensures processes created via spawn() don't get a
- # handle to this too. That would be a security hole, and, on my
- # Win98SE box, when an O_TEMPORARY file is inherited by a spawned
- # process, the fd in the spawned process seems to lack the
- # O_TEMPORARY flag, so the file doesn't go away by magic then if the
- # spawning process closes it first.
- flags = (os.O_RDWR | os.O_CREAT | os.O_EXCL |
- os.O_TEMPORARY | os.O_NOINHERIT)
- if 'b' in mode:
- flags |= os.O_BINARY
- fd = os.open(name, flags, 0700)
- return os.fdopen(fd, mode, bufsize)
- else:
- # Assume we can't unlink a file that's still open, or arrange for
- # an automagically self-deleting file -- use wrapper.
- file = open(name, mode, bufsize)
- return TemporaryFileWrapper(file, name)
-
-# In order to generate unique names, mktemp() uses _counter.get_next().
-# This returns a unique integer on each call, in a threadsafe way (i.e.,
-# multiple threads will never see the same integer). The integer will
-# usually be a Python int, but if _counter.get_next() is called often
-# enough, it will become a Python long.
-# Note that the only names that survive this next block of code
-# are "_counter" and "_tempdir_lock".
-
-class _ThreadSafeCounter:
- def __init__(self, mutex, initialvalue=0):
- self.mutex = mutex
- self.i = initialvalue
-
- def get_next(self):
- self.mutex.acquire()
- result = self.i
- try:
- newi = result + 1
- except OverflowError:
- newi = long(result) + 1
- self.i = newi
- self.mutex.release()
- return result
+ # NT provides delete-on-close as a primitive, so we don't need
+ # the wrapper to do anything special. We still use it so that
+ # file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile.
+ if _os.name != 'nt':
+
+ # Cache the unlinker so we don't get spurious errors at
+ # shutdown when the module-level "os" is None'd out. Note
+ # that this must be referenced as self.unlink, because the
+ # name TemporaryFileWrapper may also get None'd out before
+ # __del__ is called.
+ unlink = _os.unlink
+
+ def close(self):
+ if not self.close_called:
+ self.close_called = 1
+ self.file.close()
+ self.unlink(self.name)
+
+ def __del__(self):
+ self.close()
+
+def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix="",
+ prefix=template, dir=gettempdir()):
+ """Create and return a temporary file.
+ Arguments:
+ 'prefix', 'suffix', 'dir' -- as for mkstemp.
+ 'mode' -- the mode argument to os.fdopen (default "w+b").
+ 'bufsize' -- the buffer size argument to os.fdopen (default -1).
+ The file is created as mkstemp() would do it.
+
+ Returns a file object; the name of the file is accessible as
+ file.name. The file will be automatically deleted when it is
+ closed.
+ """
-try:
- import thread
+ bin = 'b' in mode
+ if bin: flags = _bin_openflags
+ else: flags = _text_openflags
-except ImportError:
- class _DummyMutex:
- def acquire(self):
- pass
+ # Setting O_TEMPORARY in the flags causes the OS to delete
+ # the file when it is closed. This is only supported by Windows.
+ if _os.name == 'nt':
+ flags |= _os.O_TEMPORARY
- release = acquire
+ (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
+ file = _os.fdopen(fd, mode, bufsize)
+ return _TemporaryFileWrapper(file, name)
- _counter = _ThreadSafeCounter(_DummyMutex())
- _tempdir_lock = _DummyMutex()
- del _DummyMutex
+if _os.name != 'posix':
+ # On non-POSIX systems, assume that we cannot unlink a file while
+ # it is open.
+ TemporaryFile = NamedTemporaryFile
else:
- _counter = _ThreadSafeCounter(thread.allocate_lock())
- _tempdir_lock = thread.allocate_lock()
- del thread
-
-del _ThreadSafeCounter
+ def TemporaryFile(mode='w+b', bufsize=-1, suffix="",
+ prefix=template, dir=gettempdir()):
+ """Create and return a temporary file.
+ Arguments:
+ 'prefix', 'suffix', 'directory' -- as for mkstemp.
+ 'mode' -- the mode argument to os.fdopen (default "w+b").
+ 'bufsize' -- the buffer size argument to os.fdopen (default -1).
+ The file is created as mkstemp() would do it.
+
+ Returns a file object. The file has no name, and will cease to
+ exist when it is closed.
+ """
+
+ bin = 'b' in mode
+ if bin: flags = _bin_openflags
+ else: flags = _text_openflags
+
+ (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
+ try:
+ _os.unlink(name)
+ return _os.fdopen(fd, mode, bufsize)
+ except:
+ _os.close(fd)
+ raise
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py
index 286e79c..d96aae7 100644
--- a/Lib/test/test_tempfile.py
+++ b/Lib/test/test_tempfile.py
@@ -1,10 +1,707 @@
-# SF bug #476138: tempfile behavior across platforms
-# Ensure that a temp file can be closed any number of times without error.
+# tempfile.py unit tests.
import tempfile
+import os
+import sys
+import re
+import errno
+import warnings
-f = tempfile.TemporaryFile("w+b")
-f.write('abc\n')
-f.close()
-f.close()
-f.close()
+import unittest
+from test import test_support
+
+if hasattr(os, 'stat'):
+ import stat
+ has_stat = 1
+else:
+ has_stat = 0
+
+has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
+
+# This is organized as one test for each chunk of code in tempfile.py,
+# in order of their appearance in the file. Testing which requires
+# threads is not done here.
+
+# Common functionality.
+class TC(unittest.TestCase):
+
+ str_check = re.compile(r"[a-zA-Z0-9_-]{6}$")
+
+ def failOnException(self, what, ei=None):
+ if ei is None:
+ ei = sys.exc_info()
+ self.fail("%s raised %s: %s" % (what, ei[0], ei[1]))
+
+ def nameCheck(self, name, dir, pre, suf):
+ (ndir, nbase) = os.path.split(name)
+ npre = nbase[:len(pre)]
+ nsuf = nbase[len(nbase)-len(suf):]
+
+ self.assertEqual(ndir, dir,
+ "file '%s' not in directory '%s'" % (name, dir))
+ self.assertEqual(npre, pre,
+ "file '%s' does not begin with '%s'" % (nbase, pre))
+ self.assertEqual(nsuf, suf,
+ "file '%s' does not end with '%s'" % (nbase, suf))
+
+ nbase = nbase[len(pre):len(nbase)-len(suf)]
+ self.assert_(self.str_check.match(nbase),
+ "random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/"
+ % nbase)
+
+test_classes = []
+
+class test_exports(TC):
+ def test_exports(self):
+ """There are no surprising symbols in the tempfile module"""
+ dict = tempfile.__dict__
+
+ expected = {
+ "NamedTemporaryFile" : 1,
+ "TemporaryFile" : 1,
+ "mkstemp" : 1,
+ "mkdtemp" : 1,
+ "mktemp" : 1,
+ "TMP_MAX" : 1,
+ "gettempprefix" : 1,
+ "gettempdir" : 1,
+ "tempdir" : 1,
+ "template" : 1
+ }
+
+ unexp = []
+ for key in dict:
+ if key[0] != '_' and key not in expected:
+ unexp.append(key)
+ self.failUnless(len(unexp) == 0,
+ "unexpected keys: %s" % unexp)
+
+test_classes.append(test_exports)
+
+
+class test__once(TC):
+ """Test the internal function _once."""
+
+ def setUp(self):
+ tempfile.once_var = None
+ self.already_called = 0
+
+ def tearDown(self):
+ del tempfile.once_var
+
+ def callMeOnce(self):
+ self.failIf(self.already_called, "callMeOnce called twice")
+ self.already_called = 1
+ return 24
+
+ def do_once(self):
+ tempfile._once('once_var', self.callMeOnce)
+
+ def test_once_initializes(self):
+ """_once initializes its argument"""
+
+ self.do_once()
+
+ self.assertEqual(tempfile.once_var, 24,
+ "once_var=%d, not 24" % tempfile.once_var)
+ self.assertEqual(self.already_called, 1,
+ "already_called=%d, not 1" % self.already_called)
+
+ def test_once_means_once(self):
+ """_once calls the callback just once"""
+
+ self.do_once()
+ self.do_once()
+ self.do_once()
+ self.do_once()
+
+ def test_once_namespace_safe(self):
+ """_once does not modify anything but its argument"""
+
+ env_copy = tempfile.__dict__.copy()
+
+ self.do_once()
+
+ env = tempfile.__dict__
+
+ a = env.keys()
+ a.sort()
+ b = env_copy.keys()
+ b.sort()
+
+ self.failIf(len(a) != len(b))
+ for i in xrange(len(a)):
+ self.failIf(a[i] != b[i])
+
+ key = a[i]
+ if key != 'once_var':
+ self.failIf(env[key] != env_copy[key])
+
+test_classes.append(test__once)
+
+
+class test__RandomNameSequence(TC):
+ """Test the internal iterator object _RandomNameSequence."""
+
+ def setUp(self):
+ self.r = tempfile._RandomNameSequence()
+
+ def test_get_six_char_str(self):
+ """_RandomNameSequence returns a six-character string"""
+ s = self.r.next()
+ self.nameCheck(s, '', '', '')
+
+ def test_many(self):
+ """_RandomNameSequence returns no duplicate strings (stochastic)"""
+
+ dict = {}
+ r = self.r
+ for i in xrange(1000):
+ s = r.next()
+ self.nameCheck(s, '', '', '')
+ self.failIf(s in dict)
+ dict[s] = 1
+
+ def test_supports_iter(self):
+ """_RandomNameSequence supports the iterator protocol"""
+
+ i = 0
+ r = self.r
+ try:
+ for s in r:
+ i += 1
+ if i == 20:
+ break
+ except:
+ failOnException("iteration")
+
+test_classes.append(test__RandomNameSequence)
+
+
+class test__candidate_tempdir_list(TC):
+ """Test the internal function _candidate_tempdir_list."""
+
+ def test_nonempty_list(self):
+ """_candidate_tempdir_list returns a nonempty list of strings"""
+
+ cand = tempfile._candidate_tempdir_list()
+
+ self.failIf(len(cand) == 0)
+ for c in cand:
+ self.assert_(isinstance(c, basestring),
+ "%s is not a string" % c)
+
+ def test_wanted_dirs(self):
+ """_candidate_tempdir_list contains the expected directories"""
+
+ # Make sure the interesting environment variables are all set.
+ added = []
+ try:
+ for envname in 'TMPDIR', 'TEMP', 'TMP':
+ dirname = os.getenv(envname)
+ if not dirname:
+ os.environ[envname] = os.path.abspath(envname)
+ added.append(envname)
+
+ cand = tempfile._candidate_tempdir_list()
+
+ for envname in 'TMPDIR', 'TEMP', 'TMP':
+ dirname = os.getenv(envname)
+ if not dirname: raise ValueError
+ self.assert_(dirname in cand)
+
+ try:
+ dirname = os.getcwd()
+ except (AttributeError, os.error):
+ dirname = os.curdir
+
+ self.assert_(dirname in cand)
+
+ # Not practical to try to verify the presence of OS-specific
+ # paths in this list.
+ finally:
+ for p in added:
+ del os.environ[p]
+
+test_classes.append(test__candidate_tempdir_list)
+
+
+# We test _get_default_tempdir by testing gettempdir.
+
+
+class test__get_candidate_names(TC):
+ """Test the internal function _get_candidate_names."""
+
+ def test_retval(self):
+ """_get_candidate_names returns a _RandomNameSequence object"""
+ obj = tempfile._get_candidate_names()
+ self.assert_(isinstance(obj, tempfile._RandomNameSequence))
+
+ def test_same_thing(self):
+ """_get_candidate_names always returns the same object"""
+ a = tempfile._get_candidate_names()
+ b = tempfile._get_candidate_names()
+
+ self.assert_(a is b)
+
+test_classes.append(test__get_candidate_names)
+
+
+class test__mkstemp_inner(TC):
+ """Test the internal function _mkstemp_inner."""
+
+ class mkstemped:
+ _bflags = tempfile._bin_openflags
+ _tflags = tempfile._text_openflags
+ _close = os.close
+ _unlink = os.unlink
+
+ def __init__(self, dir, pre, suf, bin):
+ if bin: flags = self._bflags
+ else: flags = self._tflags
+
+ (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags)
+
+ def write(self, str):
+ os.write(self.fd, str)
+
+ def __del__(self):
+ self._close(self.fd)
+ self._unlink(self.name)
+
+ def do_create(self, dir=None, pre="", suf="", bin=1):
+ if dir is None:
+ dir = tempfile.gettempdir()
+ try:
+ file = self.mkstemped(dir, pre, suf, bin)
+ except:
+ self.failOnException("_mkstemp_inner")
+
+ self.nameCheck(file.name, dir, pre, suf)
+ return file
+
+ def test_basic(self):
+ """_mkstemp_inner can create files"""
+ self.do_create().write("blat")
+ self.do_create(pre="a").write("blat")
+ self.do_create(suf="b").write("blat")
+ self.do_create(pre="a", suf="b").write("blat")
+ self.do_create(pre="aa", suf=".txt").write("blat")
+
+ def test_basic_many(self):
+ """_mkstemp_inner can create many files (stochastic)"""
+ extant = range(1000)
+ for i in extant:
+ extant[i] = self.do_create(pre="aa")
+
+ def test_choose_directory(self):
+ """_mkstemp_inner can create files in a user-selected directory"""
+ dir = tempfile.mkdtemp()
+ try:
+ self.do_create(dir=dir).write("blat")
+ finally:
+ os.rmdir(dir)
+
+ def test_file_mode(self):
+ """_mkstemp_inner creates files with the proper mode"""
+ if not has_stat:
+ return # ugh, can't use TestSkipped.
+
+ file = self.do_create()
+ mode = stat.S_IMODE(os.stat(file.name).st_mode)
+ self.assertEqual(mode, 0600)
+
+ def test_noinherit(self):
+ """_mkstemp_inner file handles are not inherited by child processes"""
+ # FIXME: Find a way to test this on Windows.
+ if os.name != 'posix':
+ return # ugh, can't use TestSkipped.
+
+ file = self.do_create()
+
+ # We have to exec something, so that FD_CLOEXEC will take
+ # effect. The sanest thing to try is /bin/sh; we can easily
+ # instruct it to attempt to write to the fd and report success
+ # or failure. Unfortunately, sh syntax does not permit use of
+ # fds numerically larger than 9; abandon this test if so.
+ if file.fd > 9:
+ raise test_support.TestSkipped, 'cannot test with fd %d' % file.fd
+
+ pid = os.fork()
+ if pid:
+ status = os.wait()[1]
+ self.failUnless(os.WIFEXITED(status),
+ "child process did not exit (status %d)" % status)
+
+ # We want the child to have exited _un_successfully, indicating
+ # failure to write to the closed fd.
+ self.failUnless(os.WEXITSTATUS(status) != 0,
+ "child process exited successfully")
+
+ else:
+ try:
+ # Throw away stderr.
+ nul = os.open('/dev/null', os.O_RDWR)
+ os.dup2(nul, 2)
+ os.execv('/bin/sh', ['sh', '-c', 'echo blat >&%d' % file.fd])
+ except:
+ os._exit(0)
+
+ def test_textmode(self):
+ """_mkstemp_inner can create files in text mode"""
+ if not has_textmode:
+ return # ugh, can't use TestSkipped.
+
+ self.do_create(bin=0).write("blat\n")
+ # XXX should test that the file really is a text file
+
+test_classes.append(test__mkstemp_inner)
+
+
+class test_gettempprefix(TC):
+ """Test gettempprefix()."""
+
+ def test_sane_template(self):
+ """gettempprefix returns a nonempty prefix string"""
+ p = tempfile.gettempprefix()
+
+ self.assert_(isinstance(p, basestring))
+ self.assert_(len(p) > 0)
+
+ def test_usable_template(self):
+ """gettempprefix returns a usable prefix string"""
+
+ # Create a temp directory, avoiding use of the prefix.
+ # Then attempt to create a file whose name is
+ # prefix + 'xxxxxx.xxx' in that directory.
+ p = tempfile.gettempprefix() + "xxxxxx.xxx"
+ d = tempfile.mkdtemp(prefix="")
+ try:
+ p = os.path.join(d, p)
+ try:
+ fd = os.open(p, os.O_RDWR | os.O_CREAT)
+ except:
+ self.failOnException("os.open")
+ os.close(fd)
+ os.unlink(p)
+ finally:
+ os.rmdir(d)
+
+test_classes.append(test_gettempprefix)
+
+
+class test_gettempdir(TC):
+ """Test gettempdir()."""
+
+ def test_directory_exists(self):
+ """gettempdir returns a directory which exists"""
+
+ dir = tempfile.gettempdir()
+ self.assert_(os.path.isabs(dir) or dir == os.curdir,
+ "%s is not an absolute path" % dir)
+ self.assert_(os.path.isdir(dir),
+ "%s is not a directory" % dir)
+
+ def test_directory_writable(self):
+ """gettempdir returns a directory writable by the user"""
+
+ # sneaky: just instantiate a NamedTemporaryFile, which
+ # defaults to writing into the directory returned by
+ # gettempdir.
+ try:
+ file = tempfile.NamedTemporaryFile()
+ file.write("blat")
+ file.close()
+ except:
+ self.failOnException("create file in %s" % tempfile.gettempdir())
+
+ def test_same_thing(self):
+ """gettempdir always returns the same object"""
+ a = tempfile.gettempdir()
+ b = tempfile.gettempdir()
+
+ self.assert_(a is b)
+
+test_classes.append(test_gettempdir)
+
+
+class test_mkstemp(TC):
+ """Test mkstemp()."""
+ def do_create(self, dir=None, pre="", suf="", ):
+ if dir is None:
+ dir = tempfile.gettempdir()
+ try:
+ (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
+ except:
+ self.failOnException("mkstemp")
+
+ try:
+ self.nameCheck(name, dir, pre, suf)
+ finally:
+ os.close(fd)
+ os.unlink(name)
+
+ def test_basic(self):
+ """mkstemp can create files"""
+ self.do_create()
+ self.do_create(pre="a")
+ self.do_create(suf="b")
+ self.do_create(pre="a", suf="b")
+ self.do_create(pre="aa", suf=".txt")
+
+ def test_choose_directory(self):
+ """mkstemp can create directories in a user-selected directory"""
+ dir = tempfile.mkdtemp()
+ try:
+ self.do_create(dir=dir)
+ finally:
+ os.rmdir(dir)
+
+test_classes.append(test_mkstemp)
+
+
+class test_mkdtemp(TC):
+ """Test mkdtemp()."""
+
+ def do_create(self, dir=None, pre="", suf=""):
+ if dir is None:
+ dir = tempfile.gettempdir()
+ try:
+ name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
+ except:
+ self.failOnException("mkdtemp")
+
+ try:
+ self.nameCheck(name, dir, pre, suf)
+ return name
+ except:
+ os.rmdir(name)
+ raise
+
+ def test_basic(self):
+ """mkdtemp can create directories"""
+ os.rmdir(self.do_create())
+ os.rmdir(self.do_create(pre="a"))
+ os.rmdir(self.do_create(suf="b"))
+ os.rmdir(self.do_create(pre="a", suf="b"))
+ os.rmdir(self.do_create(pre="aa", suf=".txt"))
+
+ def test_basic_many(self):
+ """mkdtemp can create many directories (stochastic)"""
+ extant = range(1000)
+ try:
+ for i in extant:
+ extant[i] = self.do_create(pre="aa")
+ finally:
+ for i in extant:
+ if(isinstance(i, basestring)):
+ os.rmdir(i)
+
+ def test_choose_directory(self):
+ """mkdtemp can create directories in a user-selected directory"""
+ dir = tempfile.mkdtemp()
+ try:
+ os.rmdir(self.do_create(dir=dir))
+ finally:
+ os.rmdir(dir)
+
+ def test_mode(self):
+ """mkdtemp creates directories with the proper mode"""
+ if not has_stat:
+ return # ugh, can't use TestSkipped.
+
+ dir = self.do_create()
+ try:
+ mode = stat.S_IMODE(os.stat(dir).st_mode)
+ self.assertEqual(mode, 0700)
+ finally:
+ os.rmdir(dir)
+
+test_classes.append(test_mkdtemp)
+
+
+class test_mktemp(TC):
+ """Test mktemp()."""
+
+ # For safety, all use of mktemp must occur in a private directory.
+ # We must also suppress the RuntimeWarning it generates.
+ def setUp(self):
+ self.dir = tempfile.mkdtemp()
+ warnings.filterwarnings("ignore",
+ category=RuntimeWarning,
+ message="mktemp")
+
+ def tearDown(self):
+ if self.dir:
+ os.rmdir(self.dir)
+ self.dir = None
+ # XXX This clobbers any -W options.
+ warnings.resetwarnings()
+
+ class mktemped:
+ _unlink = os.unlink
+ _bflags = tempfile._bin_openflags
+
+ def __init__(self, dir, pre, suf):
+ self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
+ # Create the file. This will raise an exception if it's
+ # mysteriously appeared in the meanwhile.
+ os.close(os.open(self.name, self._bflags, 0600))
+
+ def __del__(self):
+ self._unlink(self.name)
+
+ def do_create(self, pre="", suf=""):
+ try:
+ file = self.mktemped(self.dir, pre, suf)
+ except:
+ self.failOnException("mktemp")
+
+ self.nameCheck(file.name, self.dir, pre, suf)
+ return file
+
+ def test_basic(self):
+ """mktemp can choose usable file names"""
+ self.do_create()
+ self.do_create(pre="a")
+ self.do_create(suf="b")
+ self.do_create(pre="a", suf="b")
+ self.do_create(pre="aa", suf=".txt")
+
+ def test_many(self):
+ """mktemp can choose many usable file names (stochastic)"""
+ extant = range(1000)
+ for i in extant:
+ extant[i] = self.do_create(pre="aa")
+
+ def test_warning(self):
+ """mktemp issues a warning when used"""
+ warnings.filterwarnings("error",
+ category=RuntimeWarning,
+ message="mktemp")
+ self.assertRaises(RuntimeWarning,
+ tempfile.mktemp, (), { 'dir': self.dir })
+
+test_classes.append(test_mktemp)
+
+
+# We test _TemporaryFileWrapper by testing NamedTemporaryFile.
+
+
+class test_NamedTemporaryFile(TC):
+ """Test NamedTemporaryFile()."""
+
+ def do_create(self, dir=None, pre="", suf=""):
+ if dir is None:
+ dir = tempfile.gettempdir()
+ try:
+ file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf)
+ except:
+ self.failOnException("NamedTemporaryFile")
+
+ self.nameCheck(file.name, dir, pre, suf)
+ return file
+
+
+ def test_basic(self):
+ """NamedTemporaryFile can create files"""
+ self.do_create()
+ self.do_create(pre="a")
+ self.do_create(suf="b")
+ self.do_create(pre="a", suf="b")
+ self.do_create(pre="aa", suf=".txt")
+
+ def test_creates_named(self):
+ """NamedTemporaryFile creates files with names"""
+ f = tempfile.NamedTemporaryFile()
+ self.failUnless(os.path.exists(f.name),
+ "NamedTemporaryFile %s does not exist" % f.name)
+
+ def test_del_on_close(self):
+ """A NamedTemporaryFile is deleted when closed"""
+ dir = tempfile.mkdtemp()
+ try:
+ f = tempfile.NamedTemporaryFile(dir=dir)
+ f.write('blat')
+ f.close()
+ self.failIf(os.path.exists(f.name),
+ "NamedTemporaryFile %s exists after close" % f.name)
+ finally:
+ os.rmdir(dir)
+
+ def test_multiple_close(self):
+ """A NamedTemporaryFile can be closed many times without error"""
+
+ f = tempfile.NamedTemporaryFile()
+ f.write('abc\n')
+ f.close()
+ try:
+ f.close()
+ f.close()
+ except:
+ self.failOnException("close")
+
+ # How to test the mode and bufsize parameters?
+
+test_classes.append(test_NamedTemporaryFile)
+
+
+class test_TemporaryFile(TC):
+ """Test TemporaryFile()."""
+
+ def test_basic(self):
+ """TemporaryFile can create files"""
+ # No point in testing the name params - the file has no name.
+ try:
+ tempfile.TemporaryFile()
+ except:
+ self.failOnException("TemporaryFile")
+
+ def test_has_no_name(self):
+ """TemporaryFile creates files with no names (on this system)"""
+ dir = tempfile.mkdtemp()
+ f = tempfile.TemporaryFile(dir=dir)
+ f.write('blat')
+
+ # Sneaky: because this file has no name, it should not prevent
+ # us from removing the directory it was created in.
+ try:
+ os.rmdir(dir)
+ except:
+ ei = sys.exc_info()
+ # cleanup
+ f.close()
+ os.rmdir(dir)
+ self.failOnException("rmdir", ei)
+
+ def test_multiple_close(self):
+ """A TemporaryFile can be closed many times without error"""
+ f = tempfile.TemporaryFile()
+ f.write('abc\n')
+ f.close()
+ try:
+ f.close()
+ f.close()
+ except:
+ self.failOnException("close")
+
+ # How to test the mode and bufsize parameters?
+
+class dummy_test_TemporaryFile(TC):
+ def test_dummy(self):
+ """TemporaryFile and NamedTemporaryFile are the same (on this system)"""
+ pass
+
+if tempfile.NamedTemporaryFile is tempfile.TemporaryFile:
+ test_classes.append(dummy_test_TemporaryFile)
+else:
+ test_classes.append(test_TemporaryFile)
+
+def test_main():
+ suite = unittest.TestSuite()
+ for c in test_classes:
+ suite.addTest(unittest.makeSuite(c))
+ test_support.run_suite(suite)
+
+if __name__ == "__main__":
+ test_main()