summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_tempfile.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_tempfile.py')
-rw-r--r--Lib/test/test_tempfile.py711
1 files changed, 704 insertions, 7 deletions
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py
index 286e79c..d96aae7 100644
--- a/Lib/test/test_tempfile.py
+++ b/Lib/test/test_tempfile.py
@@ -1,10 +1,707 @@
-# SF bug #476138: tempfile behavior across platforms
-# Ensure that a temp file can be closed any number of times without error.
+# tempfile.py unit tests.
import tempfile
+import os
+import sys
+import re
+import errno
+import warnings
-f = tempfile.TemporaryFile("w+b")
-f.write('abc\n')
-f.close()
-f.close()
-f.close()
+import unittest
+from test import test_support
+
+if hasattr(os, 'stat'):
+ import stat
+ has_stat = 1
+else:
+ has_stat = 0
+
+has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
+
+# This is organized as one test for each chunk of code in tempfile.py,
+# in order of their appearance in the file. Testing which requires
+# threads is not done here.
+
+# Common functionality.
+class TC(unittest.TestCase):
+
+ str_check = re.compile(r"[a-zA-Z0-9_-]{6}$")
+
+ def failOnException(self, what, ei=None):
+ if ei is None:
+ ei = sys.exc_info()
+ self.fail("%s raised %s: %s" % (what, ei[0], ei[1]))
+
+ def nameCheck(self, name, dir, pre, suf):
+ (ndir, nbase) = os.path.split(name)
+ npre = nbase[:len(pre)]
+ nsuf = nbase[len(nbase)-len(suf):]
+
+ self.assertEqual(ndir, dir,
+ "file '%s' not in directory '%s'" % (name, dir))
+ self.assertEqual(npre, pre,
+ "file '%s' does not begin with '%s'" % (nbase, pre))
+ self.assertEqual(nsuf, suf,
+ "file '%s' does not end with '%s'" % (nbase, suf))
+
+ nbase = nbase[len(pre):len(nbase)-len(suf)]
+ self.assert_(self.str_check.match(nbase),
+ "random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/"
+ % nbase)
+
+test_classes = []
+
+class test_exports(TC):
+ def test_exports(self):
+ """There are no surprising symbols in the tempfile module"""
+ dict = tempfile.__dict__
+
+ expected = {
+ "NamedTemporaryFile" : 1,
+ "TemporaryFile" : 1,
+ "mkstemp" : 1,
+ "mkdtemp" : 1,
+ "mktemp" : 1,
+ "TMP_MAX" : 1,
+ "gettempprefix" : 1,
+ "gettempdir" : 1,
+ "tempdir" : 1,
+ "template" : 1
+ }
+
+ unexp = []
+ for key in dict:
+ if key[0] != '_' and key not in expected:
+ unexp.append(key)
+ self.failUnless(len(unexp) == 0,
+ "unexpected keys: %s" % unexp)
+
+test_classes.append(test_exports)
+
+
+class test__once(TC):
+ """Test the internal function _once."""
+
+ def setUp(self):
+ tempfile.once_var = None
+ self.already_called = 0
+
+ def tearDown(self):
+ del tempfile.once_var
+
+ def callMeOnce(self):
+ self.failIf(self.already_called, "callMeOnce called twice")
+ self.already_called = 1
+ return 24
+
+ def do_once(self):
+ tempfile._once('once_var', self.callMeOnce)
+
+ def test_once_initializes(self):
+ """_once initializes its argument"""
+
+ self.do_once()
+
+ self.assertEqual(tempfile.once_var, 24,
+ "once_var=%d, not 24" % tempfile.once_var)
+ self.assertEqual(self.already_called, 1,
+ "already_called=%d, not 1" % self.already_called)
+
+ def test_once_means_once(self):
+ """_once calls the callback just once"""
+
+ self.do_once()
+ self.do_once()
+ self.do_once()
+ self.do_once()
+
+ def test_once_namespace_safe(self):
+ """_once does not modify anything but its argument"""
+
+ env_copy = tempfile.__dict__.copy()
+
+ self.do_once()
+
+ env = tempfile.__dict__
+
+ a = env.keys()
+ a.sort()
+ b = env_copy.keys()
+ b.sort()
+
+ self.failIf(len(a) != len(b))
+ for i in xrange(len(a)):
+ self.failIf(a[i] != b[i])
+
+ key = a[i]
+ if key != 'once_var':
+ self.failIf(env[key] != env_copy[key])
+
+test_classes.append(test__once)
+
+
+class test__RandomNameSequence(TC):
+ """Test the internal iterator object _RandomNameSequence."""
+
+ def setUp(self):
+ self.r = tempfile._RandomNameSequence()
+
+ def test_get_six_char_str(self):
+ """_RandomNameSequence returns a six-character string"""
+ s = self.r.next()
+ self.nameCheck(s, '', '', '')
+
+ def test_many(self):
+ """_RandomNameSequence returns no duplicate strings (stochastic)"""
+
+ dict = {}
+ r = self.r
+ for i in xrange(1000):
+ s = r.next()
+ self.nameCheck(s, '', '', '')
+ self.failIf(s in dict)
+ dict[s] = 1
+
+ def test_supports_iter(self):
+ """_RandomNameSequence supports the iterator protocol"""
+
+ i = 0
+ r = self.r
+ try:
+ for s in r:
+ i += 1
+ if i == 20:
+ break
+ except:
+ failOnException("iteration")
+
+test_classes.append(test__RandomNameSequence)
+
+
+class test__candidate_tempdir_list(TC):
+ """Test the internal function _candidate_tempdir_list."""
+
+ def test_nonempty_list(self):
+ """_candidate_tempdir_list returns a nonempty list of strings"""
+
+ cand = tempfile._candidate_tempdir_list()
+
+ self.failIf(len(cand) == 0)
+ for c in cand:
+ self.assert_(isinstance(c, basestring),
+ "%s is not a string" % c)
+
+ def test_wanted_dirs(self):
+ """_candidate_tempdir_list contains the expected directories"""
+
+ # Make sure the interesting environment variables are all set.
+ added = []
+ try:
+ for envname in 'TMPDIR', 'TEMP', 'TMP':
+ dirname = os.getenv(envname)
+ if not dirname:
+ os.environ[envname] = os.path.abspath(envname)
+ added.append(envname)
+
+ cand = tempfile._candidate_tempdir_list()
+
+ for envname in 'TMPDIR', 'TEMP', 'TMP':
+ dirname = os.getenv(envname)
+ if not dirname: raise ValueError
+ self.assert_(dirname in cand)
+
+ try:
+ dirname = os.getcwd()
+ except (AttributeError, os.error):
+ dirname = os.curdir
+
+ self.assert_(dirname in cand)
+
+ # Not practical to try to verify the presence of OS-specific
+ # paths in this list.
+ finally:
+ for p in added:
+ del os.environ[p]
+
+test_classes.append(test__candidate_tempdir_list)
+
+
+# We test _get_default_tempdir by testing gettempdir.
+
+
+class test__get_candidate_names(TC):
+ """Test the internal function _get_candidate_names."""
+
+ def test_retval(self):
+ """_get_candidate_names returns a _RandomNameSequence object"""
+ obj = tempfile._get_candidate_names()
+ self.assert_(isinstance(obj, tempfile._RandomNameSequence))
+
+ def test_same_thing(self):
+ """_get_candidate_names always returns the same object"""
+ a = tempfile._get_candidate_names()
+ b = tempfile._get_candidate_names()
+
+ self.assert_(a is b)
+
+test_classes.append(test__get_candidate_names)
+
+
+class test__mkstemp_inner(TC):
+ """Test the internal function _mkstemp_inner."""
+
+ class mkstemped:
+ _bflags = tempfile._bin_openflags
+ _tflags = tempfile._text_openflags
+ _close = os.close
+ _unlink = os.unlink
+
+ def __init__(self, dir, pre, suf, bin):
+ if bin: flags = self._bflags
+ else: flags = self._tflags
+
+ (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags)
+
+ def write(self, str):
+ os.write(self.fd, str)
+
+ def __del__(self):
+ self._close(self.fd)
+ self._unlink(self.name)
+
+ def do_create(self, dir=None, pre="", suf="", bin=1):
+ if dir is None:
+ dir = tempfile.gettempdir()
+ try:
+ file = self.mkstemped(dir, pre, suf, bin)
+ except:
+ self.failOnException("_mkstemp_inner")
+
+ self.nameCheck(file.name, dir, pre, suf)
+ return file
+
+ def test_basic(self):
+ """_mkstemp_inner can create files"""
+ self.do_create().write("blat")
+ self.do_create(pre="a").write("blat")
+ self.do_create(suf="b").write("blat")
+ self.do_create(pre="a", suf="b").write("blat")
+ self.do_create(pre="aa", suf=".txt").write("blat")
+
+ def test_basic_many(self):
+ """_mkstemp_inner can create many files (stochastic)"""
+ extant = range(1000)
+ for i in extant:
+ extant[i] = self.do_create(pre="aa")
+
+ def test_choose_directory(self):
+ """_mkstemp_inner can create files in a user-selected directory"""
+ dir = tempfile.mkdtemp()
+ try:
+ self.do_create(dir=dir).write("blat")
+ finally:
+ os.rmdir(dir)
+
+ def test_file_mode(self):
+ """_mkstemp_inner creates files with the proper mode"""
+ if not has_stat:
+ return # ugh, can't use TestSkipped.
+
+ file = self.do_create()
+ mode = stat.S_IMODE(os.stat(file.name).st_mode)
+ self.assertEqual(mode, 0600)
+
+ def test_noinherit(self):
+ """_mkstemp_inner file handles are not inherited by child processes"""
+ # FIXME: Find a way to test this on Windows.
+ if os.name != 'posix':
+ return # ugh, can't use TestSkipped.
+
+ file = self.do_create()
+
+ # We have to exec something, so that FD_CLOEXEC will take
+ # effect. The sanest thing to try is /bin/sh; we can easily
+ # instruct it to attempt to write to the fd and report success
+ # or failure. Unfortunately, sh syntax does not permit use of
+ # fds numerically larger than 9; abandon this test if so.
+ if file.fd > 9:
+ raise test_support.TestSkipped, 'cannot test with fd %d' % file.fd
+
+ pid = os.fork()
+ if pid:
+ status = os.wait()[1]
+ self.failUnless(os.WIFEXITED(status),
+ "child process did not exit (status %d)" % status)
+
+ # We want the child to have exited _un_successfully, indicating
+ # failure to write to the closed fd.
+ self.failUnless(os.WEXITSTATUS(status) != 0,
+ "child process exited successfully")
+
+ else:
+ try:
+ # Throw away stderr.
+ nul = os.open('/dev/null', os.O_RDWR)
+ os.dup2(nul, 2)
+ os.execv('/bin/sh', ['sh', '-c', 'echo blat >&%d' % file.fd])
+ except:
+ os._exit(0)
+
+ def test_textmode(self):
+ """_mkstemp_inner can create files in text mode"""
+ if not has_textmode:
+ return # ugh, can't use TestSkipped.
+
+ self.do_create(bin=0).write("blat\n")
+ # XXX should test that the file really is a text file
+
+test_classes.append(test__mkstemp_inner)
+
+
+class test_gettempprefix(TC):
+ """Test gettempprefix()."""
+
+ def test_sane_template(self):
+ """gettempprefix returns a nonempty prefix string"""
+ p = tempfile.gettempprefix()
+
+ self.assert_(isinstance(p, basestring))
+ self.assert_(len(p) > 0)
+
+ def test_usable_template(self):
+ """gettempprefix returns a usable prefix string"""
+
+ # Create a temp directory, avoiding use of the prefix.
+ # Then attempt to create a file whose name is
+ # prefix + 'xxxxxx.xxx' in that directory.
+ p = tempfile.gettempprefix() + "xxxxxx.xxx"
+ d = tempfile.mkdtemp(prefix="")
+ try:
+ p = os.path.join(d, p)
+ try:
+ fd = os.open(p, os.O_RDWR | os.O_CREAT)
+ except:
+ self.failOnException("os.open")
+ os.close(fd)
+ os.unlink(p)
+ finally:
+ os.rmdir(d)
+
+test_classes.append(test_gettempprefix)
+
+
+class test_gettempdir(TC):
+ """Test gettempdir()."""
+
+ def test_directory_exists(self):
+ """gettempdir returns a directory which exists"""
+
+ dir = tempfile.gettempdir()
+ self.assert_(os.path.isabs(dir) or dir == os.curdir,
+ "%s is not an absolute path" % dir)
+ self.assert_(os.path.isdir(dir),
+ "%s is not a directory" % dir)
+
+ def test_directory_writable(self):
+ """gettempdir returns a directory writable by the user"""
+
+ # sneaky: just instantiate a NamedTemporaryFile, which
+ # defaults to writing into the directory returned by
+ # gettempdir.
+ try:
+ file = tempfile.NamedTemporaryFile()
+ file.write("blat")
+ file.close()
+ except:
+ self.failOnException("create file in %s" % tempfile.gettempdir())
+
+ def test_same_thing(self):
+ """gettempdir always returns the same object"""
+ a = tempfile.gettempdir()
+ b = tempfile.gettempdir()
+
+ self.assert_(a is b)
+
+test_classes.append(test_gettempdir)
+
+
+class test_mkstemp(TC):
+ """Test mkstemp()."""
+ def do_create(self, dir=None, pre="", suf="", ):
+ if dir is None:
+ dir = tempfile.gettempdir()
+ try:
+ (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
+ except:
+ self.failOnException("mkstemp")
+
+ try:
+ self.nameCheck(name, dir, pre, suf)
+ finally:
+ os.close(fd)
+ os.unlink(name)
+
+ def test_basic(self):
+ """mkstemp can create files"""
+ self.do_create()
+ self.do_create(pre="a")
+ self.do_create(suf="b")
+ self.do_create(pre="a", suf="b")
+ self.do_create(pre="aa", suf=".txt")
+
+ def test_choose_directory(self):
+ """mkstemp can create directories in a user-selected directory"""
+ dir = tempfile.mkdtemp()
+ try:
+ self.do_create(dir=dir)
+ finally:
+ os.rmdir(dir)
+
+test_classes.append(test_mkstemp)
+
+
+class test_mkdtemp(TC):
+ """Test mkdtemp()."""
+
+ def do_create(self, dir=None, pre="", suf=""):
+ if dir is None:
+ dir = tempfile.gettempdir()
+ try:
+ name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
+ except:
+ self.failOnException("mkdtemp")
+
+ try:
+ self.nameCheck(name, dir, pre, suf)
+ return name
+ except:
+ os.rmdir(name)
+ raise
+
+ def test_basic(self):
+ """mkdtemp can create directories"""
+ os.rmdir(self.do_create())
+ os.rmdir(self.do_create(pre="a"))
+ os.rmdir(self.do_create(suf="b"))
+ os.rmdir(self.do_create(pre="a", suf="b"))
+ os.rmdir(self.do_create(pre="aa", suf=".txt"))
+
+ def test_basic_many(self):
+ """mkdtemp can create many directories (stochastic)"""
+ extant = range(1000)
+ try:
+ for i in extant:
+ extant[i] = self.do_create(pre="aa")
+ finally:
+ for i in extant:
+ if(isinstance(i, basestring)):
+ os.rmdir(i)
+
+ def test_choose_directory(self):
+ """mkdtemp can create directories in a user-selected directory"""
+ dir = tempfile.mkdtemp()
+ try:
+ os.rmdir(self.do_create(dir=dir))
+ finally:
+ os.rmdir(dir)
+
+ def test_mode(self):
+ """mkdtemp creates directories with the proper mode"""
+ if not has_stat:
+ return # ugh, can't use TestSkipped.
+
+ dir = self.do_create()
+ try:
+ mode = stat.S_IMODE(os.stat(dir).st_mode)
+ self.assertEqual(mode, 0700)
+ finally:
+ os.rmdir(dir)
+
+test_classes.append(test_mkdtemp)
+
+
+class test_mktemp(TC):
+ """Test mktemp()."""
+
+ # For safety, all use of mktemp must occur in a private directory.
+ # We must also suppress the RuntimeWarning it generates.
+ def setUp(self):
+ self.dir = tempfile.mkdtemp()
+ warnings.filterwarnings("ignore",
+ category=RuntimeWarning,
+ message="mktemp")
+
+ def tearDown(self):
+ if self.dir:
+ os.rmdir(self.dir)
+ self.dir = None
+ # XXX This clobbers any -W options.
+ warnings.resetwarnings()
+
+ class mktemped:
+ _unlink = os.unlink
+ _bflags = tempfile._bin_openflags
+
+ def __init__(self, dir, pre, suf):
+ self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
+ # Create the file. This will raise an exception if it's
+ # mysteriously appeared in the meanwhile.
+ os.close(os.open(self.name, self._bflags, 0600))
+
+ def __del__(self):
+ self._unlink(self.name)
+
+ def do_create(self, pre="", suf=""):
+ try:
+ file = self.mktemped(self.dir, pre, suf)
+ except:
+ self.failOnException("mktemp")
+
+ self.nameCheck(file.name, self.dir, pre, suf)
+ return file
+
+ def test_basic(self):
+ """mktemp can choose usable file names"""
+ self.do_create()
+ self.do_create(pre="a")
+ self.do_create(suf="b")
+ self.do_create(pre="a", suf="b")
+ self.do_create(pre="aa", suf=".txt")
+
+ def test_many(self):
+ """mktemp can choose many usable file names (stochastic)"""
+ extant = range(1000)
+ for i in extant:
+ extant[i] = self.do_create(pre="aa")
+
+ def test_warning(self):
+ """mktemp issues a warning when used"""
+ warnings.filterwarnings("error",
+ category=RuntimeWarning,
+ message="mktemp")
+ self.assertRaises(RuntimeWarning,
+ tempfile.mktemp, (), { 'dir': self.dir })
+
+test_classes.append(test_mktemp)
+
+
+# We test _TemporaryFileWrapper by testing NamedTemporaryFile.
+
+
+class test_NamedTemporaryFile(TC):
+ """Test NamedTemporaryFile()."""
+
+ def do_create(self, dir=None, pre="", suf=""):
+ if dir is None:
+ dir = tempfile.gettempdir()
+ try:
+ file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf)
+ except:
+ self.failOnException("NamedTemporaryFile")
+
+ self.nameCheck(file.name, dir, pre, suf)
+ return file
+
+
+ def test_basic(self):
+ """NamedTemporaryFile can create files"""
+ self.do_create()
+ self.do_create(pre="a")
+ self.do_create(suf="b")
+ self.do_create(pre="a", suf="b")
+ self.do_create(pre="aa", suf=".txt")
+
+ def test_creates_named(self):
+ """NamedTemporaryFile creates files with names"""
+ f = tempfile.NamedTemporaryFile()
+ self.failUnless(os.path.exists(f.name),
+ "NamedTemporaryFile %s does not exist" % f.name)
+
+ def test_del_on_close(self):
+ """A NamedTemporaryFile is deleted when closed"""
+ dir = tempfile.mkdtemp()
+ try:
+ f = tempfile.NamedTemporaryFile(dir=dir)
+ f.write('blat')
+ f.close()
+ self.failIf(os.path.exists(f.name),
+ "NamedTemporaryFile %s exists after close" % f.name)
+ finally:
+ os.rmdir(dir)
+
+ def test_multiple_close(self):
+ """A NamedTemporaryFile can be closed many times without error"""
+
+ f = tempfile.NamedTemporaryFile()
+ f.write('abc\n')
+ f.close()
+ try:
+ f.close()
+ f.close()
+ except:
+ self.failOnException("close")
+
+ # How to test the mode and bufsize parameters?
+
+test_classes.append(test_NamedTemporaryFile)
+
+
+class test_TemporaryFile(TC):
+ """Test TemporaryFile()."""
+
+ def test_basic(self):
+ """TemporaryFile can create files"""
+ # No point in testing the name params - the file has no name.
+ try:
+ tempfile.TemporaryFile()
+ except:
+ self.failOnException("TemporaryFile")
+
+ def test_has_no_name(self):
+ """TemporaryFile creates files with no names (on this system)"""
+ dir = tempfile.mkdtemp()
+ f = tempfile.TemporaryFile(dir=dir)
+ f.write('blat')
+
+ # Sneaky: because this file has no name, it should not prevent
+ # us from removing the directory it was created in.
+ try:
+ os.rmdir(dir)
+ except:
+ ei = sys.exc_info()
+ # cleanup
+ f.close()
+ os.rmdir(dir)
+ self.failOnException("rmdir", ei)
+
+ def test_multiple_close(self):
+ """A TemporaryFile can be closed many times without error"""
+ f = tempfile.TemporaryFile()
+ f.write('abc\n')
+ f.close()
+ try:
+ f.close()
+ f.close()
+ except:
+ self.failOnException("close")
+
+ # How to test the mode and bufsize parameters?
+
+class dummy_test_TemporaryFile(TC):
+ def test_dummy(self):
+ """TemporaryFile and NamedTemporaryFile are the same (on this system)"""
+ pass
+
+if tempfile.NamedTemporaryFile is tempfile.TemporaryFile:
+ test_classes.append(dummy_test_TemporaryFile)
+else:
+ test_classes.append(test_TemporaryFile)
+
+def test_main():
+ suite = unittest.TestSuite()
+ for c in test_classes:
+ suite.addTest(unittest.makeSuite(c))
+ test_support.run_suite(suite)
+
+if __name__ == "__main__":
+ test_main()