diff options
Diffstat (limited to 'Lib/test/test_tempfile.py')
| -rw-r--r-- | Lib/test/test_tempfile.py | 145 |
1 files changed, 131 insertions, 14 deletions
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py index 5a97035..6641298 100644 --- a/Lib/test/test_tempfile.py +++ b/Lib/test/test_tempfile.py @@ -9,6 +9,7 @@ import re import warnings import contextlib import weakref +from unittest import mock import unittest from test import support, script_helper @@ -37,7 +38,7 @@ else: # Common functionality. class BaseTestCase(unittest.TestCase): - str_check = re.compile(r"[a-zA-Z0-9_-]{6}$") + str_check = re.compile(r"^[a-z0-9_-]{8}$") def setUp(self): self._warnings_manager = support.check_warnings() @@ -64,7 +65,7 @@ class BaseTestCase(unittest.TestCase): nbase = nbase[len(pre):len(nbase)-len(suf)] self.assertTrue(self.str_check.match(nbase), - "random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/" + "random string '%s' does not match ^[a-z0-9_-]{8}$" % nbase) @@ -153,7 +154,7 @@ class TestRandomNameSequence(BaseTestCase): # via any bugs above try: os.kill(pid, signal.SIGKILL) - except EnvironmentError: + except OSError: pass os.close(read_fd) os.close(write_fd) @@ -192,7 +193,7 @@ class TestCandidateTempdirList(BaseTestCase): try: dirname = os.getcwd() - except (AttributeError, os.error): + except (AttributeError, OSError): dirname = os.curdir self.assertIn(dirname, cand) @@ -273,7 +274,39 @@ def _mock_candidate_names(*names): lambda: iter(names)) -class TestMkstempInner(BaseTestCase): +class TestBadTempdir: + + def test_read_only_directory(self): + with _inside_empty_temp_dir(): + oldmode = mode = os.stat(tempfile.tempdir).st_mode + mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH) + os.chmod(tempfile.tempdir, mode) + try: + if os.access(tempfile.tempdir, os.W_OK): + self.skipTest("can't set the directory read-only") + with self.assertRaises(PermissionError): + self.make_temp() + self.assertEqual(os.listdir(tempfile.tempdir), []) + finally: + os.chmod(tempfile.tempdir, oldmode) + + def test_nonexisting_directory(self): + with _inside_empty_temp_dir(): + tempdir = os.path.join(tempfile.tempdir, 'nonexistent') + with support.swap_attr(tempfile, 'tempdir', tempdir): + with self.assertRaises(FileNotFoundError): + self.make_temp() + + def test_non_directory(self): + with _inside_empty_temp_dir(): + tempdir = os.path.join(tempfile.tempdir, 'file') + open(tempdir, 'wb').close() + with support.swap_attr(tempfile, 'tempdir', tempdir): + with self.assertRaises((NotADirectoryError, FileNotFoundError)): + self.make_temp() + + +class TestMkstempInner(TestBadTempdir, BaseTestCase): """Test the internal function _mkstemp_inner.""" class mkstemped: @@ -332,7 +365,7 @@ class TestMkstempInner(BaseTestCase): file = self.do_create() mode = stat.S_IMODE(os.stat(file.name).st_mode) expected = 0o600 - if sys.platform in ('win32', 'os2emx'): + if sys.platform == 'win32': # There's no distinction among 'user', 'group' and 'world'; # replicate the 'user' bits. user = expected >> 6 @@ -349,6 +382,7 @@ class TestMkstempInner(BaseTestCase): v="q" file = self.do_create() + self.assertEqual(os.get_inheritable(file.fd), False) fd = "%d" % file.fd try: @@ -365,7 +399,7 @@ class TestMkstempInner(BaseTestCase): # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted, # but an arg with embedded spaces should be decorated with double # quotes on each end - if sys.platform in ('win32',): + if sys.platform == 'win32': decorated = '"%s"' % sys.executable tester = '"%s"' % tester else: @@ -387,7 +421,7 @@ class TestMkstempInner(BaseTestCase): os.lseek(f.fd, 0, os.SEEK_SET) self.assertEqual(os.read(f.fd, 20), b"blat") - def default_mkstemp_inner(self): + def make_temp(self): return tempfile._mkstemp_inner(tempfile.gettempdir(), tempfile.template, '', @@ -398,11 +432,11 @@ class TestMkstempInner(BaseTestCase): # the chosen name already exists with _inside_empty_temp_dir(), \ _mock_candidate_names('aaa', 'aaa', 'bbb'): - (fd1, name1) = self.default_mkstemp_inner() + (fd1, name1) = self.make_temp() os.close(fd1) self.assertTrue(name1.endswith('aaa')) - (fd2, name2) = self.default_mkstemp_inner() + (fd2, name2) = self.make_temp() os.close(fd2) self.assertTrue(name2.endswith('bbb')) @@ -414,7 +448,7 @@ class TestMkstempInner(BaseTestCase): dir = tempfile.mkdtemp() self.assertTrue(dir.endswith('aaa')) - (fd, name) = self.default_mkstemp_inner() + (fd, name) = self.make_temp() os.close(fd) self.assertTrue(name.endswith('bbb')) @@ -475,6 +509,20 @@ class TestGetTempDir(BaseTestCase): self.assertTrue(a is b) + def test_case_sensitive(self): + # gettempdir should not flatten its case + # even on a case-insensitive file system + case_sensitive_tempdir = tempfile.mkdtemp("-Temp") + _tempdir, tempfile.tempdir = tempfile.tempdir, None + try: + with support.EnvironmentVarGuard() as env: + # Fake the first env var which is checked as a candidate + env["TMPDIR"] = case_sensitive_tempdir + self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir) + finally: + tempfile.tempdir = _tempdir + support.rmdir(case_sensitive_tempdir) + class TestMkstemp(BaseTestCase): """Test mkstemp().""" @@ -512,9 +560,12 @@ class TestMkstemp(BaseTestCase): os.rmdir(dir) -class TestMkdtemp(BaseTestCase): +class TestMkdtemp(TestBadTempdir, BaseTestCase): """Test mkdtemp().""" + def make_temp(self): + return tempfile.mkdtemp() + def do_create(self, dir=None, pre="", suf=""): if dir is None: dir = tempfile.gettempdir() @@ -563,7 +614,7 @@ class TestMkdtemp(BaseTestCase): mode = stat.S_IMODE(os.stat(dir).st_mode) mode &= 0o777 # Mask off sticky bits inherited from /tmp expected = 0o700 - if sys.platform in ('win32', 'os2emx'): + if sys.platform == 'win32': # There's no distinction among 'user', 'group' and 'world'; # replicate the 'user' bits. user = expected >> 6 @@ -691,6 +742,19 @@ class TestNamedTemporaryFile(BaseTestCase): # No reference cycle was created. self.assertIsNone(wr()) + def test_iter(self): + # Issue #23700: getting iterator from a temporary file should keep + # it alive as long as it's being iterated over + lines = [b'spam\n', b'eggs\n', b'beans\n'] + def make_file(): + f = tempfile.NamedTemporaryFile(mode='w+b') + f.write(b''.join(lines)) + f.seek(0) + return f + for i, l in enumerate(make_file()): + self.assertEqual(l, lines[i]) + self.assertEqual(i, len(lines) - 1) + def test_creates_named(self): # NamedTemporaryFile creates files with names f = tempfile.NamedTemporaryFile() @@ -743,6 +807,19 @@ class TestNamedTemporaryFile(BaseTestCase): pass self.assertRaises(ValueError, use_closed) + def test_no_leak_fd(self): + # Issue #21058: don't leak file descriptor when io.open() fails + closed = [] + os_close = os.close + def close(fd): + closed.append(fd) + os_close(fd) + + with mock.patch('os.close', side_effect=close): + with mock.patch('io.open', side_effect=ValueError): + self.assertRaises(ValueError, tempfile.NamedTemporaryFile) + self.assertEqual(len(closed), 1) + # How to test the mode and bufsize parameters? @@ -1046,6 +1123,20 @@ if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: roundtrip("\u039B", "w+", encoding="utf-16") roundtrip("foo\r\n", "w+", newline="") + def test_no_leak_fd(self): + # Issue #21058: don't leak file descriptor when io.open() fails + closed = [] + os_close = os.close + def close(fd): + closed.append(fd) + os_close(fd) + + with mock.patch('os.close', side_effect=close): + with mock.patch('io.open', side_effect=ValueError): + self.assertRaises(ValueError, tempfile.TemporaryFile) + self.assertEqual(len(closed), 1) + + # Helper for test_del_on_shutdown class NulledModules: @@ -1139,8 +1230,9 @@ class TestTemporaryDirectory(BaseTestCase): def test_del_on_shutdown(self): # A TemporaryDirectory may be cleaned up during shutdown with self.do_create() as dir: - for mod in ('os', 'shutil', 'sys', 'tempfile', 'warnings'): + for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'): code = """if True: + import builtins import os import shutil import sys @@ -1165,6 +1257,31 @@ class TestTemporaryDirectory(BaseTestCase): "TemporaryDirectory %s exists after cleanup" % tmp_name) err = err.decode('utf-8', 'backslashreplace') self.assertNotIn("Exception ", err) + self.assertIn("ResourceWarning: Implicitly cleaning up", err) + + def test_exit_on_shutdown(self): + # Issue #22427 + with self.do_create() as dir: + code = """if True: + import sys + import tempfile + import warnings + + def generator(): + with tempfile.TemporaryDirectory(dir={dir!r}) as tmp: + yield tmp + g = generator() + sys.stdout.buffer.write(next(g).encode()) + + warnings.filterwarnings("always", category=ResourceWarning) + """.format(dir=dir) + rc, out, err = script_helper.assert_python_ok("-c", code) + tmp_name = out.decode().strip() + self.assertFalse(os.path.exists(tmp_name), + "TemporaryDirectory %s exists after cleanup" % tmp_name) + err = err.decode('utf-8', 'backslashreplace') + self.assertNotIn("Exception ", err) + self.assertIn("ResourceWarning: Implicitly cleaning up", err) def test_warnings_on_cleanup(self): # ResourceWarning will be triggered by __del__ |
