diff options
Diffstat (limited to 'Lib/test/test_tempfile.py')
| -rw-r--r-- | Lib/test/test_tempfile.py | 189 |
1 files changed, 171 insertions, 18 deletions
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py index f72b0b1..2d29885 100644 --- a/Lib/test/test_tempfile.py +++ b/Lib/test/test_tempfile.py @@ -3,15 +3,11 @@ import tempfile import os import sys import re -import errno import warnings import unittest from test import support -warnings.filterwarnings("ignore", - category=RuntimeWarning, - message="mktemp", module=__name__) if hasattr(os, 'stat'): import stat @@ -24,9 +20,7 @@ has_spawnl = hasattr(os, 'spawnl') # TEST_FILES may need to be tweaked for systems depending on the maximum # number of files that can be opened at one time (see ulimit -n) -if sys.platform == 'mac': - TEST_FILES = 32 -elif sys.platform in ('openbsd3', 'openbsd4'): +if sys.platform in ('openbsd3', 'openbsd4'): TEST_FILES = 48 else: TEST_FILES = 100 @@ -40,6 +34,16 @@ class TC(unittest.TestCase): str_check = re.compile(r"[a-zA-Z0-9_-]{6}$") + def setUp(self): + self._warnings_manager = support.check_warnings() + self._warnings_manager.__enter__() + warnings.filterwarnings("ignore", category=RuntimeWarning, + message="mktemp", module=__name__) + + def tearDown(self): + self._warnings_manager.__exit__(None, None, None) + + def failOnException(self, what, ei=None): if ei is None: ei = sys.exc_info() @@ -81,7 +85,8 @@ class test_exports(TC): "gettempdir" : 1, "tempdir" : 1, "template" : 1, - "SpooledTemporaryFile" : 1 + "SpooledTemporaryFile" : 1, + "TemporaryDirectory" : 1, } unexp = [] @@ -99,6 +104,7 @@ class test__RandomNameSequence(TC): def setUp(self): self.r = tempfile._RandomNameSequence() + super().setUp() def test_get_six_char_str(self): # _RandomNameSequence returns a six-character string @@ -113,7 +119,7 @@ class test__RandomNameSequence(TC): for i in range(TEST_FILES): s = next(r) self.nameCheck(s, '', '', '') - self.assertFalse(s in dict) + self.assertNotIn(s, dict) dict[s] = 1 def supports_iter(self): @@ -127,7 +133,7 @@ class test__RandomNameSequence(TC): if i == 20: break except: - failOnException("iteration") + self.failOnException("iteration") test_classes.append(test__RandomNameSequence) @@ -142,8 +148,7 @@ class test__candidate_tempdir_list(TC): self.assertFalse(len(cand) == 0) for c in cand: - self.assertTrue(isinstance(c, str), - "%s is not a string" % c) + self.assertIsInstance(c, str) def test_wanted_dirs(self): # _candidate_tempdir_list contains the expected directories @@ -160,14 +165,14 @@ class test__candidate_tempdir_list(TC): for envname in 'TMPDIR', 'TEMP', 'TMP': dirname = os.getenv(envname) if not dirname: raise ValueError - self.assertTrue(dirname in cand) + self.assertIn(dirname, cand) try: dirname = os.getcwd() except (AttributeError, os.error): dirname = os.curdir - self.assertTrue(dirname in cand) + self.assertIn(dirname, cand) # Not practical to try to verify the presence of OS-specific # paths in this list. @@ -184,7 +189,7 @@ class test__get_candidate_names(TC): def test_retval(self): # _get_candidate_names returns a _RandomNameSequence object obj = tempfile._get_candidate_names() - self.assertTrue(isinstance(obj, tempfile._RandomNameSequence)) + self.assertIsInstance(obj, tempfile._RandomNameSequence) def test_same_thing(self): # _get_candidate_names always returns the same object @@ -259,7 +264,7 @@ class test__mkstemp_inner(TC): file = self.do_create() mode = stat.S_IMODE(os.stat(file.name).st_mode) expected = 0o600 - if sys.platform in ('win32', 'os2emx', 'mac'): + if sys.platform in ('win32', 'os2emx'): # There's no distinction among 'user', 'group' and 'world'; # replicate the 'user' bits. user = expected >> 6 @@ -326,7 +331,7 @@ class test_gettempprefix(TC): # gettempprefix returns a nonempty prefix string p = tempfile.gettempprefix() - self.assertTrue(isinstance(p, str)) + self.assertIsInstance(p, str) self.assertTrue(len(p) > 0) def test_usable_template(self): @@ -482,7 +487,7 @@ class test_mkdtemp(TC): mode = stat.S_IMODE(os.stat(dir).st_mode) mode &= 0o777 # Mask off sticky bits inherited from /tmp expected = 0o700 - if sys.platform in ('win32', 'os2emx', 'mac'): + if sys.platform in ('win32', 'os2emx'): # There's no distinction among 'user', 'group' and 'world'; # replicate the 'user' bits. user = expected >> 6 @@ -501,11 +506,13 @@ class test_mktemp(TC): # We must also suppress the RuntimeWarning it generates. def setUp(self): self.dir = tempfile.mkdtemp() + super().setUp() def tearDown(self): if self.dir: os.rmdir(self.dir) self.dir = None + super().tearDown() class mktemped: _unlink = os.unlink @@ -900,6 +907,152 @@ class test_TemporaryFile(TC): if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: test_classes.append(test_TemporaryFile) + +# Helper for test_del_on_shutdown +class NulledModules: + def __init__(self, *modules): + self.refs = [mod.__dict__ for mod in modules] + self.contents = [ref.copy() for ref in self.refs] + + def __enter__(self): + for d in self.refs: + for key in d: + d[key] = None + + def __exit__(self, *exc_info): + for d, c in zip(self.refs, self.contents): + d.clear() + d.update(c) + +class test_TemporaryDirectory(TC): + """Test TemporaryDirectory().""" + + def do_create(self, dir=None, pre="", suf="", recurse=1): + if dir is None: + dir = tempfile.gettempdir() + try: + tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf) + except: + self.failOnException("TemporaryDirectory") + self.nameCheck(tmp.name, dir, pre, suf) + # Create a subdirectory and some files + if recurse: + self.do_create(tmp.name, pre, suf, recurse-1) + with open(os.path.join(tmp.name, "test.txt"), "wb") as f: + f.write(b"Hello world!") + return tmp + + def test_mkdtemp_failure(self): + # Check no additional exception if mkdtemp fails + # Previously would raise AttributeError instead + # (noted as part of Issue #10188) + with tempfile.TemporaryDirectory() as nonexistent: + pass + with self.assertRaises(os.error): + tempfile.TemporaryDirectory(dir=nonexistent) + + def test_explicit_cleanup(self): + # A TemporaryDirectory is deleted when cleaned up + dir = tempfile.mkdtemp() + try: + d = self.do_create(dir=dir) + self.assertTrue(os.path.exists(d.name), + "TemporaryDirectory %s does not exist" % d.name) + d.cleanup() + self.assertFalse(os.path.exists(d.name), + "TemporaryDirectory %s exists after cleanup" % d.name) + finally: + os.rmdir(dir) + + @support.cpython_only + def test_del_on_collection(self): + # A TemporaryDirectory is deleted when garbage collected + dir = tempfile.mkdtemp() + try: + d = self.do_create(dir=dir) + name = d.name + del d # Rely on refcounting to invoke __del__ + self.assertFalse(os.path.exists(name), + "TemporaryDirectory %s exists after __del__" % name) + finally: + os.rmdir(dir) + + @unittest.expectedFailure # See issue #10188 + def test_del_on_shutdown(self): + # A TemporaryDirectory may be cleaned up during shutdown + # Make sure it works with the relevant modules nulled out + with self.do_create() as dir: + d = self.do_create(dir=dir) + # Mimic the nulling out of modules that + # occurs during system shutdown + modules = [os, os.path] + if has_stat: + modules.append(stat) + # Currently broken, so suppress the warning + # that is otherwise emitted on stdout + with support.captured_stderr() as err: + with NulledModules(*modules): + d.cleanup() + # Currently broken, so stop spurious exception by + # indicating the object has already been closed + d._closed = True + # And this assert will fail, as expected by the + # unittest decorator... + self.assertFalse(os.path.exists(d.name), + "TemporaryDirectory %s exists after cleanup" % d.name) + + def test_warnings_on_cleanup(self): + # Two kinds of warning on shutdown + # Issue 10888: may write to stderr if modules are nulled out + # ResourceWarning will be triggered by __del__ + with self.do_create() as dir: + if os.sep != '\\': + # Embed a backslash in order to make sure string escaping + # in the displayed error message is dealt with correctly + suffix = '\\check_backslash_handling' + else: + suffix = '' + d = self.do_create(dir=dir, suf=suffix) + + #Check for the Issue 10888 message + modules = [os, os.path] + if has_stat: + modules.append(stat) + with support.captured_stderr() as err: + with NulledModules(*modules): + d.cleanup() + message = err.getvalue().replace('\\\\', '\\') + self.assertIn("while cleaning up", message) + self.assertIn(d.name, message) + + # Check for the resource warning + with support.check_warnings(('Implicitly', ResourceWarning), quiet=False): + warnings.filterwarnings("always", category=ResourceWarning) + d.__del__() + self.assertFalse(os.path.exists(d.name), + "TemporaryDirectory %s exists after __del__" % d.name) + + def test_multiple_close(self): + # Can be cleaned-up many times without error + d = self.do_create() + d.cleanup() + try: + d.cleanup() + d.cleanup() + except: + self.failOnException("cleanup") + + def test_context_manager(self): + # Can be used as a context manager + d = self.do_create() + with d as name: + self.assertTrue(os.path.exists(name)) + self.assertEqual(name, d.name) + self.assertFalse(os.path.exists(name)) + + +test_classes.append(test_TemporaryDirectory) + def test_main(): support.run_unittest(*test_classes) |
