diff options
author | Guido van Rossum <guido@python.org> | 2002-08-09 16:14:33 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2002-08-09 16:14:33 (GMT) |
commit | 0e54871f827687309e3186a4d916231cae0d8b01 (patch) | |
tree | 22ec7f00b7e02e6a027ecb5adf7612a73ea46dcd | |
parent | 0f5f0b8057e3a1368d6e3b340173ae2c5d95cac5 (diff) | |
download | cpython-0e54871f827687309e3186a4d916231cae0d8b01.zip cpython-0e54871f827687309e3186a4d916231cae0d8b01.tar.gz cpython-0e54871f827687309e3186a4d916231cae0d8b01.tar.bz2 |
Check-in of the most essential parts of SF 589982 (tempfile.py
rewrite, by Zack Weinberg). This replaces most code in tempfile.py
(please review!!!) and adds extensive unit tests for it.
This will cause some warnings in the test suite; I'll check those in
soon, and also the docs.
-rw-r--r-- | Lib/tempfile.py | 621 | ||||
-rw-r--r-- | Lib/test/test_tempfile.py | 711 |
2 files changed, 1100 insertions, 232 deletions
diff --git a/Lib/tempfile.py b/Lib/tempfile.py index eb8253c..f3cc481 100644 --- a/Lib/tempfile.py +++ b/Lib/tempfile.py @@ -1,177 +1,360 @@ -"""Temporary files and filenames.""" +"""Temporary files. -# XXX This tries to be not UNIX specific, but I don't know beans about -# how to choose a temp directory or filename on MS-DOS or other -# systems so it may have to be changed... +This module provides generic, low- and high-level interfaces for +creating temporary files and directories. The interfaces listed +as "safe" just below can be used without fear of race conditions. +Those listed as "unsafe" cannot, and are provided for backward +compatibility only. -import os +This module also provides some data items to the user: -__all__ = ["mktemp", "TemporaryFile", "tempdir", "gettempprefix"] + TMP_MAX - maximum number of names that will be tried before + giving up. + template - the default prefix for all temporary names. + You may change this to control the default prefix. + tempdir - If this is set to a string before the first use of + any routine from this module, it will be considered as + another candidate location to store temporary files. +""" + +__all__ = [ + "NamedTemporaryFile", "TemporaryFile", # high level safe interfaces + "mkstemp", "mkdtemp", # low level safe interfaces + "mktemp", # deprecated unsafe interface + "TMP_MAX", "gettempprefix", # constants + "tempdir", "gettempdir" + ] + + +# Imports. + +import os as _os +import errno as _errno +from random import Random as _Random + +if _os.name == 'mac': + import macfs as _macfs + import MACFS as _MACFS + +try: + import fcntl as _fcntl + def _set_cloexec(fd): + flags = _fcntl.fcntl(fd, _fcntl.F_GETFD, 0) + if flags >= 0: + # flags read successfully, modify + flags |= _fcntl.FD_CLOEXEC + _fcntl.fcntl(fd, _fcntl.F_SETFD, flags) +except (ImportError, AttributeError): + def _set_cloexec(fd): + pass + +try: + import thread as _thread + _allocate_lock = _thread.allocate_lock +except (ImportError, AttributeError): + class _allocate_lock: + def acquire(self): + pass + release = acquire + +_text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL +if hasattr(_os, 'O_NOINHERIT'): _text_openflags |= _os.O_NOINHERIT +if hasattr(_os, 'O_NOFOLLOW'): _text_openflags |= _os.O_NOFOLLOW + +_bin_openflags = _text_openflags +if hasattr(_os, 'O_BINARY'): _bin_openflags |= _os.O_BINARY + +if hasattr(_os, 'TMP_MAX'): + TMP_MAX = _os.TMP_MAX +else: + TMP_MAX = 10000 + +if _os.name == 'nt': + template = '~t' # cater to eight-letter limit +else: + template = "tmp" -# Parameters that the caller may set to override the defaults tempdir = None -template = None -def gettempdir(): - """Function to calculate the directory to use.""" - global tempdir - if tempdir is not None: - return tempdir - - # _gettempdir_inner deduces whether a candidate temp dir is usable by - # trying to create a file in it, and write to it. If that succeeds, - # great, it closes the file and unlinks it. There's a race, though: - # the *name* of the test file it tries is the same across all threads - # under most OSes (Linux is an exception), and letting multiple threads - # all try to open, write to, close, and unlink a single file can cause - # a variety of bogus errors (e.g., you cannot unlink a file under - # Windows if anyone has it open, and two threads cannot create the - # same file in O_EXCL mode under Unix). The simplest cure is to serialize - # calls to _gettempdir_inner. This isn't a real expense, because the - # first thread to succeed sets the global tempdir, and all subsequent - # calls to gettempdir() reuse that without trying _gettempdir_inner. - _tempdir_lock.acquire() +# Internal routines. + +_once_lock = _allocate_lock() + +def _once(var, initializer): + """Wrapper to execute an initialization operation just once, + even if multiple threads reach the same point at the same time. + + var is the name (as a string) of the variable to be entered into + the current global namespace. + + initializer is a callable which will return the appropriate initial + value for variable. It will be called only if variable is not + present in the global namespace, or its current value is None. + + Do not call _once from inside an initializer routine, it will deadlock. + """ + + vars = globals() + lock = _once_lock + + # Check first outside the lock. + if var in vars and vars[var] is not None: + return try: - return _gettempdir_inner() + lock.acquire() + # Check again inside the lock. + if var in vars and vars[var] is not None: + return + vars[var] = initializer() finally: - _tempdir_lock.release() + lock.release() + +class _RandomNameSequence: + """An instance of _RandomNameSequence generates an endless + sequence of unpredictable strings which can safely be incorporated + into file names. Each string is six characters long. Multiple + threads can safely use the same instance at the same time. + + _RandomNameSequence is an iterator.""" + + characters = ( "abcdefghijklmnopqrstuvwxyz" + + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + + "0123456789-_") + + def __init__(self): + self.mutex = _allocate_lock() + self.rng = _Random() + self.normcase = _os.path.normcase + def __iter__(self): + return self + + def next(self): + m = self.mutex + c = self.characters + r = self.rng -def _gettempdir_inner(): - """Function to calculate the directory to use.""" - global tempdir - if tempdir is not None: - return tempdir - try: - pwd = os.getcwd() - except (AttributeError, os.error): - pwd = os.curdir - attempdirs = ['/tmp', '/var/tmp', '/usr/tmp', pwd] - if os.name == 'nt': - attempdirs.insert(0, 'C:\\TEMP') - attempdirs.insert(0, '\\TEMP') - elif os.name == 'mac': - import macfs, MACFS try: - refnum, dirid = macfs.FindFolder(MACFS.kOnSystemDisk, - MACFS.kTemporaryFolderType, 1) - dirname = macfs.FSSpec((refnum, dirid, '')).as_pathname() - attempdirs.insert(0, dirname) - except macfs.error: - pass - elif os.name == 'riscos': - scrapdir = os.getenv('Wimp$ScrapDir') - if scrapdir: - attempdirs.insert(0, scrapdir) + m.acquire() + letters = ''.join([r.choice(c), r.choice(c), r.choice(c), + r.choice(c), r.choice(c), r.choice(c)]) + finally: + m.release() + + return self.normcase(letters) + +def _candidate_tempdir_list(): + """Generate a list of candidate temporary directories which + _get_default_tempdir will try.""" + + dirlist = [] + + # First, try the environment. for envname in 'TMPDIR', 'TEMP', 'TMP': - if envname in os.environ: - attempdirs.insert(0, os.environ[envname]) - testfile = gettempprefix() + 'test' - for dir in attempdirs: + dirname = _os.getenv(envname) + if dirname: dirlist.append(dirname) + + # Failing that, try OS-specific locations. + if _os.name == 'mac': try: - filename = os.path.join(dir, testfile) - if os.name == 'posix': - try: - fd = os.open(filename, - os.O_RDWR | os.O_CREAT | os.O_EXCL, 0700) - except OSError: - pass - else: - fp = os.fdopen(fd, 'w') - fp.write('blat') - fp.close() - os.unlink(filename) - del fp, fd - tempdir = dir - break - else: - fp = open(filename, 'w') + refnum, dirid = _macfs.FindFolder(_MACFS.kOnSystemDisk, + _MACFS.kTemporaryFolderType, 1) + dirname = _macfs.FSSpec((refnum, dirid, '')).as_pathname() + dirlist.append(dirname) + except _macfs.error: + pass + elif _os.name == 'riscos': + dirname = _os.getenv('Wimp$ScrapDir') + if dirname: dirlist.append(dirname) + elif _os.name == 'nt': + dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ]) + else: + dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ]) + + # As a last resort, the current directory. + try: + dirlist.append(_os.getcwd()) + except (AttributeError, _os.error): + dirlist.append(_os.curdir) + + return dirlist + +def _get_default_tempdir(): + """Calculate the default directory to use for temporary files. + This routine should be called through '_once' (see above) as we + do not want multiple threads attempting this calculation simultaneously. + + We determine whether or not a candidate temp dir is usable by + trying to create and write to a file in that directory. If this + is successful, the test file is deleted. To prevent denial of + service, the name of the test file must be randomized.""" + + namer = _RandomNameSequence() + dirlist = _candidate_tempdir_list() + flags = _text_openflags + + for dir in dirlist: + if dir != _os.curdir: + dir = _os.path.normcase(_os.path.abspath(dir)) + # Try only a few names per directory. + for seq in xrange(100): + name = namer.next() + filename = _os.path.join(dir, name) + try: + fd = _os.open(filename, flags, 0600) + fp = _os.fdopen(fd, 'w') fp.write('blat') fp.close() - os.unlink(filename) - tempdir = dir - break - except IOError: - pass - if tempdir is None: - msg = "Can't find a usable temporary directory amongst " + `attempdirs` - raise IOError, msg - return tempdir + _os.unlink(filename) + del fp, fd + return dir + except (OSError, IOError), e: + if e[0] != _errno.EEXIST: + break # no point trying more names in this directory + pass + raise IOError, (_errno.ENOENT, + ("No usable temporary directory found in %s" % dirlist)) +def _get_candidate_names(): + """Common setup sequence for all user-callable interfaces.""" -# template caches the result of gettempprefix, for speed, when possible. -# XXX unclear why this isn't "_template"; left it "template" for backward -# compatibility. -if os.name == "posix": - # We don't try to cache the template on posix: the pid may change on us - # between calls due to a fork, and on Linux the pid changes even for - # another thread in the same process. Since any attempt to keep the - # cache in synch would have to call os.getpid() anyway in order to make - # sure the pid hasn't changed between calls, a cache wouldn't save any - # time. In addition, a cache is difficult to keep correct with the pid - # changing willy-nilly, and earlier attempts proved buggy (races). - template = None - -# Else the pid never changes, so gettempprefix always returns the same -# string. -elif os.name == "nt": - template = '~' + `os.getpid()` + '-' -elif os.name in ('mac', 'riscos'): - template = 'Python-Tmp-' -else: - template = 'tmp' # XXX might choose a better one + _once('_name_sequence', _RandomNameSequence) + return _name_sequence + + +def _mkstemp_inner(dir, pre, suf, flags): + """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile.""" + + names = _get_candidate_names() + + for seq in xrange(TMP_MAX): + name = names.next() + file = _os.path.join(dir, pre + name + suf) + try: + fd = _os.open(file, flags, 0600) + _set_cloexec(fd) + return (fd, file) + except OSError, e: + if e.errno == _errno.EEXIST: + continue # try again + raise + + raise IOError, (_errno.EEXIST, "No usable temporary file name found") + + +# User visible interfaces. def gettempprefix(): - """Function to calculate a prefix of the filename to use. + """Accessor for tempdir.template.""" + return template + +def gettempdir(): + """Accessor for tempdir.tempdir.""" + _once('tempdir', _get_default_tempdir) + return tempdir + +def mkstemp(suffix="", prefix=template, dir=gettempdir(), binary=1): + """mkstemp([suffix, [prefix, [dir, [binary]]]]) + User-callable function to create and return a unique temporary + file. The return value is a pair (fd, name) where fd is the + file descriptor returned by os.open, and name is the filename. + + If 'suffix' is specified, the file name will end with that suffix, + otherwise there will be no suffix. + + If 'prefix' is specified, the file name will begin with that prefix, + otherwise a default prefix is used. + + If 'dir' is specified, the file will be created in that directory, + otherwise a default directory is used. + + If 'binary' is specified and false, the file is opened in binary + mode. Otherwise, the file is opened in text mode. On some + operating systems, this makes no difference. + + The file is readable and writable only by the creating user ID. + If the operating system uses permission bits to indicate whether a + file is executable, the file is executable by no one. The file + descriptor is not inherited by children of this process. - This incorporates the current process id on systems that support such a - notion, so that concurrent processes don't generate the same prefix. + Caller is responsible for deleting the file when done with it. """ - global template - if template is None: - return '@' + `os.getpid()` + '.' + if binary: + flags = _bin_openflags else: - return template + flags = _text_openflags + return _mkstemp_inner(dir, prefix, suffix, flags) -def mktemp(suffix=""): - """User-callable function to return a unique temporary file name.""" - dir = gettempdir() - pre = gettempprefix() - while 1: - i = _counter.get_next() - file = os.path.join(dir, pre + str(i) + suffix) - if not os.path.exists(file): + +def mkdtemp(suffix="", prefix=template, dir=gettempdir()): + """mkdtemp([suffix, [prefix, [dir]]]) + User-callable function to create and return a unique temporary + directory. The return value is the pathname of the directory. + + Arguments are as for mkstemp, except that the 'binary' argument is + not accepted. + + The directory is readable, writable, and searchable only by the + creating user. + + Caller is responsible for deleting the directory when done with it. + """ + + names = _get_candidate_names() + + for seq in xrange(TMP_MAX): + name = names.next() + file = _os.path.join(dir, prefix + name + suffix) + try: + _os.mkdir(file, 0700) return file + except OSError, e: + if e.errno == _errno.EEXIST: + continue # try again + raise + raise IOError, (_errno.EEXIST, "No usable temporary directory name found") -class TemporaryFileWrapper: - """Temporary file wrapper +def mktemp(suffix="", prefix=template, dir=gettempdir()): + """mktemp([suffix, [prefix, [dir]]]) + User-callable function to return a unique temporary file name. The + file is not created. - This class provides a wrapper around files opened for temporary use. - In particular, it seeks to automatically remove the file when it is - no longer needed. + Arguments are as for mkstemp, except that the 'binary' argument is + not accepted. + + This function is unsafe and should not be used. The file name + refers to a file that did not exist at some point, but by the time + you get around to creating it, someone else may have beaten you to + the punch. """ - # Cache the unlinker so we don't get spurious errors at shutdown - # when the module-level "os" is None'd out. Note that this must - # be referenced as self.unlink, because the name TemporaryFileWrapper - # may also get None'd out before __del__ is called. - unlink = os.unlink + from warnings import warn as _warn + _warn("mktemp is a potential security risk to your program", + RuntimeWarning, stacklevel=2) - def __init__(self, file, path): - self.file = file - self.path = path - self.close_called = 0 + names = _get_candidate_names() + for seq in xrange(TMP_MAX): + name = names.next() + file = _os.path.join(dir, prefix + name + suffix) + if not _os.path.exists(file): + return file + + raise IOError, (_errno.EEXIST, "No usable temporary filename found") - def close(self): - if not self.close_called: - self.close_called = 1 - self.file.close() - self.unlink(self.path) +class _TemporaryFileWrapper: + """Temporary file wrapper + + This class provides a wrapper around files opened for + temporary use. In particular, it seeks to automatically + remove the file when it is no longer needed. + """ - def __del__(self): - self.close() + def __init__(self, file, name): + self.file = file + self.name = name + self.close_called = 0 def __getattr__(self, name): file = self.__dict__['file'] @@ -180,93 +363,81 @@ class TemporaryFileWrapper: setattr(self, name, a) return a -try: - import fcntl as _fcntl - def _set_cloexec(fd, flag=_fcntl.FD_CLOEXEC): - flags = _fcntl.fcntl(fd, _fcntl.F_GETFD, 0) - if flags >= 0: - # flags read successfully, modify - flags |= flag - _fcntl.fcntl(fd, _fcntl.F_SETFD, flags) -except (ImportError, AttributeError): - def _set_cloexec(fd): - pass - -def TemporaryFile(mode='w+b', bufsize=-1, suffix=""): - """Create and return a temporary file (opened read-write by default).""" - name = mktemp(suffix) - if os.name == 'posix': - # Unix -- be very careful - fd = os.open(name, os.O_RDWR|os.O_CREAT|os.O_EXCL, 0700) - _set_cloexec(fd) - try: - os.unlink(name) - return os.fdopen(fd, mode, bufsize) - except: - os.close(fd) - raise - elif os.name == 'nt': - # Windows -- can't unlink an open file, but O_TEMPORARY creates a - # file that "deletes itself" when the last handle is closed. - # O_NOINHERIT ensures processes created via spawn() don't get a - # handle to this too. That would be a security hole, and, on my - # Win98SE box, when an O_TEMPORARY file is inherited by a spawned - # process, the fd in the spawned process seems to lack the - # O_TEMPORARY flag, so the file doesn't go away by magic then if the - # spawning process closes it first. - flags = (os.O_RDWR | os.O_CREAT | os.O_EXCL | - os.O_TEMPORARY | os.O_NOINHERIT) - if 'b' in mode: - flags |= os.O_BINARY - fd = os.open(name, flags, 0700) - return os.fdopen(fd, mode, bufsize) - else: - # Assume we can't unlink a file that's still open, or arrange for - # an automagically self-deleting file -- use wrapper. - file = open(name, mode, bufsize) - return TemporaryFileWrapper(file, name) - -# In order to generate unique names, mktemp() uses _counter.get_next(). -# This returns a unique integer on each call, in a threadsafe way (i.e., -# multiple threads will never see the same integer). The integer will -# usually be a Python int, but if _counter.get_next() is called often -# enough, it will become a Python long. -# Note that the only names that survive this next block of code -# are "_counter" and "_tempdir_lock". - -class _ThreadSafeCounter: - def __init__(self, mutex, initialvalue=0): - self.mutex = mutex - self.i = initialvalue - - def get_next(self): - self.mutex.acquire() - result = self.i - try: - newi = result + 1 - except OverflowError: - newi = long(result) + 1 - self.i = newi - self.mutex.release() - return result + # NT provides delete-on-close as a primitive, so we don't need + # the wrapper to do anything special. We still use it so that + # file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile. + if _os.name != 'nt': + + # Cache the unlinker so we don't get spurious errors at + # shutdown when the module-level "os" is None'd out. Note + # that this must be referenced as self.unlink, because the + # name TemporaryFileWrapper may also get None'd out before + # __del__ is called. + unlink = _os.unlink + + def close(self): + if not self.close_called: + self.close_called = 1 + self.file.close() + self.unlink(self.name) + + def __del__(self): + self.close() + +def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix="", + prefix=template, dir=gettempdir()): + """Create and return a temporary file. + Arguments: + 'prefix', 'suffix', 'dir' -- as for mkstemp. + 'mode' -- the mode argument to os.fdopen (default "w+b"). + 'bufsize' -- the buffer size argument to os.fdopen (default -1). + The file is created as mkstemp() would do it. + + Returns a file object; the name of the file is accessible as + file.name. The file will be automatically deleted when it is + closed. + """ -try: - import thread + bin = 'b' in mode + if bin: flags = _bin_openflags + else: flags = _text_openflags -except ImportError: - class _DummyMutex: - def acquire(self): - pass + # Setting O_TEMPORARY in the flags causes the OS to delete + # the file when it is closed. This is only supported by Windows. + if _os.name == 'nt': + flags |= _os.O_TEMPORARY - release = acquire + (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags) + file = _os.fdopen(fd, mode, bufsize) + return _TemporaryFileWrapper(file, name) - _counter = _ThreadSafeCounter(_DummyMutex()) - _tempdir_lock = _DummyMutex() - del _DummyMutex +if _os.name != 'posix': + # On non-POSIX systems, assume that we cannot unlink a file while + # it is open. + TemporaryFile = NamedTemporaryFile else: - _counter = _ThreadSafeCounter(thread.allocate_lock()) - _tempdir_lock = thread.allocate_lock() - del thread - -del _ThreadSafeCounter + def TemporaryFile(mode='w+b', bufsize=-1, suffix="", + prefix=template, dir=gettempdir()): + """Create and return a temporary file. + Arguments: + 'prefix', 'suffix', 'directory' -- as for mkstemp. + 'mode' -- the mode argument to os.fdopen (default "w+b"). + 'bufsize' -- the buffer size argument to os.fdopen (default -1). + The file is created as mkstemp() would do it. + + Returns a file object. The file has no name, and will cease to + exist when it is closed. + """ + + bin = 'b' in mode + if bin: flags = _bin_openflags + else: flags = _text_openflags + + (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags) + try: + _os.unlink(name) + return _os.fdopen(fd, mode, bufsize) + except: + _os.close(fd) + raise diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py index 286e79c..d96aae7 100644 --- a/Lib/test/test_tempfile.py +++ b/Lib/test/test_tempfile.py @@ -1,10 +1,707 @@ -# SF bug #476138: tempfile behavior across platforms -# Ensure that a temp file can be closed any number of times without error. +# tempfile.py unit tests. import tempfile +import os +import sys +import re +import errno +import warnings -f = tempfile.TemporaryFile("w+b") -f.write('abc\n') -f.close() -f.close() -f.close() +import unittest +from test import test_support + +if hasattr(os, 'stat'): + import stat + has_stat = 1 +else: + has_stat = 0 + +has_textmode = (tempfile._text_openflags != tempfile._bin_openflags) + +# This is organized as one test for each chunk of code in tempfile.py, +# in order of their appearance in the file. Testing which requires +# threads is not done here. + +# Common functionality. +class TC(unittest.TestCase): + + str_check = re.compile(r"[a-zA-Z0-9_-]{6}$") + + def failOnException(self, what, ei=None): + if ei is None: + ei = sys.exc_info() + self.fail("%s raised %s: %s" % (what, ei[0], ei[1])) + + def nameCheck(self, name, dir, pre, suf): + (ndir, nbase) = os.path.split(name) + npre = nbase[:len(pre)] + nsuf = nbase[len(nbase)-len(suf):] + + self.assertEqual(ndir, dir, + "file '%s' not in directory '%s'" % (name, dir)) + self.assertEqual(npre, pre, + "file '%s' does not begin with '%s'" % (nbase, pre)) + self.assertEqual(nsuf, suf, + "file '%s' does not end with '%s'" % (nbase, suf)) + + nbase = nbase[len(pre):len(nbase)-len(suf)] + self.assert_(self.str_check.match(nbase), + "random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/" + % nbase) + +test_classes = [] + +class test_exports(TC): + def test_exports(self): + """There are no surprising symbols in the tempfile module""" + dict = tempfile.__dict__ + + expected = { + "NamedTemporaryFile" : 1, + "TemporaryFile" : 1, + "mkstemp" : 1, + "mkdtemp" : 1, + "mktemp" : 1, + "TMP_MAX" : 1, + "gettempprefix" : 1, + "gettempdir" : 1, + "tempdir" : 1, + "template" : 1 + } + + unexp = [] + for key in dict: + if key[0] != '_' and key not in expected: + unexp.append(key) + self.failUnless(len(unexp) == 0, + "unexpected keys: %s" % unexp) + +test_classes.append(test_exports) + + +class test__once(TC): + """Test the internal function _once.""" + + def setUp(self): + tempfile.once_var = None + self.already_called = 0 + + def tearDown(self): + del tempfile.once_var + + def callMeOnce(self): + self.failIf(self.already_called, "callMeOnce called twice") + self.already_called = 1 + return 24 + + def do_once(self): + tempfile._once('once_var', self.callMeOnce) + + def test_once_initializes(self): + """_once initializes its argument""" + + self.do_once() + + self.assertEqual(tempfile.once_var, 24, + "once_var=%d, not 24" % tempfile.once_var) + self.assertEqual(self.already_called, 1, + "already_called=%d, not 1" % self.already_called) + + def test_once_means_once(self): + """_once calls the callback just once""" + + self.do_once() + self.do_once() + self.do_once() + self.do_once() + + def test_once_namespace_safe(self): + """_once does not modify anything but its argument""" + + env_copy = tempfile.__dict__.copy() + + self.do_once() + + env = tempfile.__dict__ + + a = env.keys() + a.sort() + b = env_copy.keys() + b.sort() + + self.failIf(len(a) != len(b)) + for i in xrange(len(a)): + self.failIf(a[i] != b[i]) + + key = a[i] + if key != 'once_var': + self.failIf(env[key] != env_copy[key]) + +test_classes.append(test__once) + + +class test__RandomNameSequence(TC): + """Test the internal iterator object _RandomNameSequence.""" + + def setUp(self): + self.r = tempfile._RandomNameSequence() + + def test_get_six_char_str(self): + """_RandomNameSequence returns a six-character string""" + s = self.r.next() + self.nameCheck(s, '', '', '') + + def test_many(self): + """_RandomNameSequence returns no duplicate strings (stochastic)""" + + dict = {} + r = self.r + for i in xrange(1000): + s = r.next() + self.nameCheck(s, '', '', '') + self.failIf(s in dict) + dict[s] = 1 + + def test_supports_iter(self): + """_RandomNameSequence supports the iterator protocol""" + + i = 0 + r = self.r + try: + for s in r: + i += 1 + if i == 20: + break + except: + failOnException("iteration") + +test_classes.append(test__RandomNameSequence) + + +class test__candidate_tempdir_list(TC): + """Test the internal function _candidate_tempdir_list.""" + + def test_nonempty_list(self): + """_candidate_tempdir_list returns a nonempty list of strings""" + + cand = tempfile._candidate_tempdir_list() + + self.failIf(len(cand) == 0) + for c in cand: + self.assert_(isinstance(c, basestring), + "%s is not a string" % c) + + def test_wanted_dirs(self): + """_candidate_tempdir_list contains the expected directories""" + + # Make sure the interesting environment variables are all set. + added = [] + try: + for envname in 'TMPDIR', 'TEMP', 'TMP': + dirname = os.getenv(envname) + if not dirname: + os.environ[envname] = os.path.abspath(envname) + added.append(envname) + + cand = tempfile._candidate_tempdir_list() + + for envname in 'TMPDIR', 'TEMP', 'TMP': + dirname = os.getenv(envname) + if not dirname: raise ValueError + self.assert_(dirname in cand) + + try: + dirname = os.getcwd() + except (AttributeError, os.error): + dirname = os.curdir + + self.assert_(dirname in cand) + + # Not practical to try to verify the presence of OS-specific + # paths in this list. + finally: + for p in added: + del os.environ[p] + +test_classes.append(test__candidate_tempdir_list) + + +# We test _get_default_tempdir by testing gettempdir. + + +class test__get_candidate_names(TC): + """Test the internal function _get_candidate_names.""" + + def test_retval(self): + """_get_candidate_names returns a _RandomNameSequence object""" + obj = tempfile._get_candidate_names() + self.assert_(isinstance(obj, tempfile._RandomNameSequence)) + + def test_same_thing(self): + """_get_candidate_names always returns the same object""" + a = tempfile._get_candidate_names() + b = tempfile._get_candidate_names() + + self.assert_(a is b) + +test_classes.append(test__get_candidate_names) + + +class test__mkstemp_inner(TC): + """Test the internal function _mkstemp_inner.""" + + class mkstemped: + _bflags = tempfile._bin_openflags + _tflags = tempfile._text_openflags + _close = os.close + _unlink = os.unlink + + def __init__(self, dir, pre, suf, bin): + if bin: flags = self._bflags + else: flags = self._tflags + + (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags) + + def write(self, str): + os.write(self.fd, str) + + def __del__(self): + self._close(self.fd) + self._unlink(self.name) + + def do_create(self, dir=None, pre="", suf="", bin=1): + if dir is None: + dir = tempfile.gettempdir() + try: + file = self.mkstemped(dir, pre, suf, bin) + except: + self.failOnException("_mkstemp_inner") + + self.nameCheck(file.name, dir, pre, suf) + return file + + def test_basic(self): + """_mkstemp_inner can create files""" + self.do_create().write("blat") + self.do_create(pre="a").write("blat") + self.do_create(suf="b").write("blat") + self.do_create(pre="a", suf="b").write("blat") + self.do_create(pre="aa", suf=".txt").write("blat") + + def test_basic_many(self): + """_mkstemp_inner can create many files (stochastic)""" + extant = range(1000) + for i in extant: + extant[i] = self.do_create(pre="aa") + + def test_choose_directory(self): + """_mkstemp_inner can create files in a user-selected directory""" + dir = tempfile.mkdtemp() + try: + self.do_create(dir=dir).write("blat") + finally: + os.rmdir(dir) + + def test_file_mode(self): + """_mkstemp_inner creates files with the proper mode""" + if not has_stat: + return # ugh, can't use TestSkipped. + + file = self.do_create() + mode = stat.S_IMODE(os.stat(file.name).st_mode) + self.assertEqual(mode, 0600) + + def test_noinherit(self): + """_mkstemp_inner file handles are not inherited by child processes""" + # FIXME: Find a way to test this on Windows. + if os.name != 'posix': + return # ugh, can't use TestSkipped. + + file = self.do_create() + + # We have to exec something, so that FD_CLOEXEC will take + # effect. The sanest thing to try is /bin/sh; we can easily + # instruct it to attempt to write to the fd and report success + # or failure. Unfortunately, sh syntax does not permit use of + # fds numerically larger than 9; abandon this test if so. + if file.fd > 9: + raise test_support.TestSkipped, 'cannot test with fd %d' % file.fd + + pid = os.fork() + if pid: + status = os.wait()[1] + self.failUnless(os.WIFEXITED(status), + "child process did not exit (status %d)" % status) + + # We want the child to have exited _un_successfully, indicating + # failure to write to the closed fd. + self.failUnless(os.WEXITSTATUS(status) != 0, + "child process exited successfully") + + else: + try: + # Throw away stderr. + nul = os.open('/dev/null', os.O_RDWR) + os.dup2(nul, 2) + os.execv('/bin/sh', ['sh', '-c', 'echo blat >&%d' % file.fd]) + except: + os._exit(0) + + def test_textmode(self): + """_mkstemp_inner can create files in text mode""" + if not has_textmode: + return # ugh, can't use TestSkipped. + + self.do_create(bin=0).write("blat\n") + # XXX should test that the file really is a text file + +test_classes.append(test__mkstemp_inner) + + +class test_gettempprefix(TC): + """Test gettempprefix().""" + + def test_sane_template(self): + """gettempprefix returns a nonempty prefix string""" + p = tempfile.gettempprefix() + + self.assert_(isinstance(p, basestring)) + self.assert_(len(p) > 0) + + def test_usable_template(self): + """gettempprefix returns a usable prefix string""" + + # Create a temp directory, avoiding use of the prefix. + # Then attempt to create a file whose name is + # prefix + 'xxxxxx.xxx' in that directory. + p = tempfile.gettempprefix() + "xxxxxx.xxx" + d = tempfile.mkdtemp(prefix="") + try: + p = os.path.join(d, p) + try: + fd = os.open(p, os.O_RDWR | os.O_CREAT) + except: + self.failOnException("os.open") + os.close(fd) + os.unlink(p) + finally: + os.rmdir(d) + +test_classes.append(test_gettempprefix) + + +class test_gettempdir(TC): + """Test gettempdir().""" + + def test_directory_exists(self): + """gettempdir returns a directory which exists""" + + dir = tempfile.gettempdir() + self.assert_(os.path.isabs(dir) or dir == os.curdir, + "%s is not an absolute path" % dir) + self.assert_(os.path.isdir(dir), + "%s is not a directory" % dir) + + def test_directory_writable(self): + """gettempdir returns a directory writable by the user""" + + # sneaky: just instantiate a NamedTemporaryFile, which + # defaults to writing into the directory returned by + # gettempdir. + try: + file = tempfile.NamedTemporaryFile() + file.write("blat") + file.close() + except: + self.failOnException("create file in %s" % tempfile.gettempdir()) + + def test_same_thing(self): + """gettempdir always returns the same object""" + a = tempfile.gettempdir() + b = tempfile.gettempdir() + + self.assert_(a is b) + +test_classes.append(test_gettempdir) + + +class test_mkstemp(TC): + """Test mkstemp().""" + def do_create(self, dir=None, pre="", suf="", ): + if dir is None: + dir = tempfile.gettempdir() + try: + (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf) + except: + self.failOnException("mkstemp") + + try: + self.nameCheck(name, dir, pre, suf) + finally: + os.close(fd) + os.unlink(name) + + def test_basic(self): + """mkstemp can create files""" + self.do_create() + self.do_create(pre="a") + self.do_create(suf="b") + self.do_create(pre="a", suf="b") + self.do_create(pre="aa", suf=".txt") + + def test_choose_directory(self): + """mkstemp can create directories in a user-selected directory""" + dir = tempfile.mkdtemp() + try: + self.do_create(dir=dir) + finally: + os.rmdir(dir) + +test_classes.append(test_mkstemp) + + +class test_mkdtemp(TC): + """Test mkdtemp().""" + + def do_create(self, dir=None, pre="", suf=""): + if dir is None: + dir = tempfile.gettempdir() + try: + name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf) + except: + self.failOnException("mkdtemp") + + try: + self.nameCheck(name, dir, pre, suf) + return name + except: + os.rmdir(name) + raise + + def test_basic(self): + """mkdtemp can create directories""" + os.rmdir(self.do_create()) + os.rmdir(self.do_create(pre="a")) + os.rmdir(self.do_create(suf="b")) + os.rmdir(self.do_create(pre="a", suf="b")) + os.rmdir(self.do_create(pre="aa", suf=".txt")) + + def test_basic_many(self): + """mkdtemp can create many directories (stochastic)""" + extant = range(1000) + try: + for i in extant: + extant[i] = self.do_create(pre="aa") + finally: + for i in extant: + if(isinstance(i, basestring)): + os.rmdir(i) + + def test_choose_directory(self): + """mkdtemp can create directories in a user-selected directory""" + dir = tempfile.mkdtemp() + try: + os.rmdir(self.do_create(dir=dir)) + finally: + os.rmdir(dir) + + def test_mode(self): + """mkdtemp creates directories with the proper mode""" + if not has_stat: + return # ugh, can't use TestSkipped. + + dir = self.do_create() + try: + mode = stat.S_IMODE(os.stat(dir).st_mode) + self.assertEqual(mode, 0700) + finally: + os.rmdir(dir) + +test_classes.append(test_mkdtemp) + + +class test_mktemp(TC): + """Test mktemp().""" + + # For safety, all use of mktemp must occur in a private directory. + # We must also suppress the RuntimeWarning it generates. + def setUp(self): + self.dir = tempfile.mkdtemp() + warnings.filterwarnings("ignore", + category=RuntimeWarning, + message="mktemp") + + def tearDown(self): + if self.dir: + os.rmdir(self.dir) + self.dir = None + # XXX This clobbers any -W options. + warnings.resetwarnings() + + class mktemped: + _unlink = os.unlink + _bflags = tempfile._bin_openflags + + def __init__(self, dir, pre, suf): + self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf) + # Create the file. This will raise an exception if it's + # mysteriously appeared in the meanwhile. + os.close(os.open(self.name, self._bflags, 0600)) + + def __del__(self): + self._unlink(self.name) + + def do_create(self, pre="", suf=""): + try: + file = self.mktemped(self.dir, pre, suf) + except: + self.failOnException("mktemp") + + self.nameCheck(file.name, self.dir, pre, suf) + return file + + def test_basic(self): + """mktemp can choose usable file names""" + self.do_create() + self.do_create(pre="a") + self.do_create(suf="b") + self.do_create(pre="a", suf="b") + self.do_create(pre="aa", suf=".txt") + + def test_many(self): + """mktemp can choose many usable file names (stochastic)""" + extant = range(1000) + for i in extant: + extant[i] = self.do_create(pre="aa") + + def test_warning(self): + """mktemp issues a warning when used""" + warnings.filterwarnings("error", + category=RuntimeWarning, + message="mktemp") + self.assertRaises(RuntimeWarning, + tempfile.mktemp, (), { 'dir': self.dir }) + +test_classes.append(test_mktemp) + + +# We test _TemporaryFileWrapper by testing NamedTemporaryFile. + + +class test_NamedTemporaryFile(TC): + """Test NamedTemporaryFile().""" + + def do_create(self, dir=None, pre="", suf=""): + if dir is None: + dir = tempfile.gettempdir() + try: + file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf) + except: + self.failOnException("NamedTemporaryFile") + + self.nameCheck(file.name, dir, pre, suf) + return file + + + def test_basic(self): + """NamedTemporaryFile can create files""" + self.do_create() + self.do_create(pre="a") + self.do_create(suf="b") + self.do_create(pre="a", suf="b") + self.do_create(pre="aa", suf=".txt") + + def test_creates_named(self): + """NamedTemporaryFile creates files with names""" + f = tempfile.NamedTemporaryFile() + self.failUnless(os.path.exists(f.name), + "NamedTemporaryFile %s does not exist" % f.name) + + def test_del_on_close(self): + """A NamedTemporaryFile is deleted when closed""" + dir = tempfile.mkdtemp() + try: + f = tempfile.NamedTemporaryFile(dir=dir) + f.write('blat') + f.close() + self.failIf(os.path.exists(f.name), + "NamedTemporaryFile %s exists after close" % f.name) + finally: + os.rmdir(dir) + + def test_multiple_close(self): + """A NamedTemporaryFile can be closed many times without error""" + + f = tempfile.NamedTemporaryFile() + f.write('abc\n') + f.close() + try: + f.close() + f.close() + except: + self.failOnException("close") + + # How to test the mode and bufsize parameters? + +test_classes.append(test_NamedTemporaryFile) + + +class test_TemporaryFile(TC): + """Test TemporaryFile().""" + + def test_basic(self): + """TemporaryFile can create files""" + # No point in testing the name params - the file has no name. + try: + tempfile.TemporaryFile() + except: + self.failOnException("TemporaryFile") + + def test_has_no_name(self): + """TemporaryFile creates files with no names (on this system)""" + dir = tempfile.mkdtemp() + f = tempfile.TemporaryFile(dir=dir) + f.write('blat') + + # Sneaky: because this file has no name, it should not prevent + # us from removing the directory it was created in. + try: + os.rmdir(dir) + except: + ei = sys.exc_info() + # cleanup + f.close() + os.rmdir(dir) + self.failOnException("rmdir", ei) + + def test_multiple_close(self): + """A TemporaryFile can be closed many times without error""" + f = tempfile.TemporaryFile() + f.write('abc\n') + f.close() + try: + f.close() + f.close() + except: + self.failOnException("close") + + # How to test the mode and bufsize parameters? + +class dummy_test_TemporaryFile(TC): + def test_dummy(self): + """TemporaryFile and NamedTemporaryFile are the same (on this system)""" + pass + +if tempfile.NamedTemporaryFile is tempfile.TemporaryFile: + test_classes.append(dummy_test_TemporaryFile) +else: + test_classes.append(test_TemporaryFile) + +def test_main(): + suite = unittest.TestSuite() + for c in test_classes: + suite.addTest(unittest.makeSuite(c)) + test_support.run_suite(suite) + +if __name__ == "__main__": + test_main() |