summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_tempfile.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_tempfile.py')
-rw-r--r--Lib/test/test_tempfile.py145
1 files changed, 131 insertions, 14 deletions
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py
index 5a97035..6641298 100644
--- a/Lib/test/test_tempfile.py
+++ b/Lib/test/test_tempfile.py
@@ -9,6 +9,7 @@ import re
import warnings
import contextlib
import weakref
+from unittest import mock
import unittest
from test import support, script_helper
@@ -37,7 +38,7 @@ else:
# Common functionality.
class BaseTestCase(unittest.TestCase):
- str_check = re.compile(r"[a-zA-Z0-9_-]{6}$")
+ str_check = re.compile(r"^[a-z0-9_-]{8}$")
def setUp(self):
self._warnings_manager = support.check_warnings()
@@ -64,7 +65,7 @@ class BaseTestCase(unittest.TestCase):
nbase = nbase[len(pre):len(nbase)-len(suf)]
self.assertTrue(self.str_check.match(nbase),
- "random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/"
+ "random string '%s' does not match ^[a-z0-9_-]{8}$"
% nbase)
@@ -153,7 +154,7 @@ class TestRandomNameSequence(BaseTestCase):
# via any bugs above
try:
os.kill(pid, signal.SIGKILL)
- except EnvironmentError:
+ except OSError:
pass
os.close(read_fd)
os.close(write_fd)
@@ -192,7 +193,7 @@ class TestCandidateTempdirList(BaseTestCase):
try:
dirname = os.getcwd()
- except (AttributeError, os.error):
+ except (AttributeError, OSError):
dirname = os.curdir
self.assertIn(dirname, cand)
@@ -273,7 +274,39 @@ def _mock_candidate_names(*names):
lambda: iter(names))
-class TestMkstempInner(BaseTestCase):
+class TestBadTempdir:
+
+ def test_read_only_directory(self):
+ with _inside_empty_temp_dir():
+ oldmode = mode = os.stat(tempfile.tempdir).st_mode
+ mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
+ os.chmod(tempfile.tempdir, mode)
+ try:
+ if os.access(tempfile.tempdir, os.W_OK):
+ self.skipTest("can't set the directory read-only")
+ with self.assertRaises(PermissionError):
+ self.make_temp()
+ self.assertEqual(os.listdir(tempfile.tempdir), [])
+ finally:
+ os.chmod(tempfile.tempdir, oldmode)
+
+ def test_nonexisting_directory(self):
+ with _inside_empty_temp_dir():
+ tempdir = os.path.join(tempfile.tempdir, 'nonexistent')
+ with support.swap_attr(tempfile, 'tempdir', tempdir):
+ with self.assertRaises(FileNotFoundError):
+ self.make_temp()
+
+ def test_non_directory(self):
+ with _inside_empty_temp_dir():
+ tempdir = os.path.join(tempfile.tempdir, 'file')
+ open(tempdir, 'wb').close()
+ with support.swap_attr(tempfile, 'tempdir', tempdir):
+ with self.assertRaises((NotADirectoryError, FileNotFoundError)):
+ self.make_temp()
+
+
+class TestMkstempInner(TestBadTempdir, BaseTestCase):
"""Test the internal function _mkstemp_inner."""
class mkstemped:
@@ -332,7 +365,7 @@ class TestMkstempInner(BaseTestCase):
file = self.do_create()
mode = stat.S_IMODE(os.stat(file.name).st_mode)
expected = 0o600
- if sys.platform in ('win32', 'os2emx'):
+ if sys.platform == 'win32':
# There's no distinction among 'user', 'group' and 'world';
# replicate the 'user' bits.
user = expected >> 6
@@ -349,6 +382,7 @@ class TestMkstempInner(BaseTestCase):
v="q"
file = self.do_create()
+ self.assertEqual(os.get_inheritable(file.fd), False)
fd = "%d" % file.fd
try:
@@ -365,7 +399,7 @@ class TestMkstempInner(BaseTestCase):
# On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
# but an arg with embedded spaces should be decorated with double
# quotes on each end
- if sys.platform in ('win32',):
+ if sys.platform == 'win32':
decorated = '"%s"' % sys.executable
tester = '"%s"' % tester
else:
@@ -387,7 +421,7 @@ class TestMkstempInner(BaseTestCase):
os.lseek(f.fd, 0, os.SEEK_SET)
self.assertEqual(os.read(f.fd, 20), b"blat")
- def default_mkstemp_inner(self):
+ def make_temp(self):
return tempfile._mkstemp_inner(tempfile.gettempdir(),
tempfile.template,
'',
@@ -398,11 +432,11 @@ class TestMkstempInner(BaseTestCase):
# the chosen name already exists
with _inside_empty_temp_dir(), \
_mock_candidate_names('aaa', 'aaa', 'bbb'):
- (fd1, name1) = self.default_mkstemp_inner()
+ (fd1, name1) = self.make_temp()
os.close(fd1)
self.assertTrue(name1.endswith('aaa'))
- (fd2, name2) = self.default_mkstemp_inner()
+ (fd2, name2) = self.make_temp()
os.close(fd2)
self.assertTrue(name2.endswith('bbb'))
@@ -414,7 +448,7 @@ class TestMkstempInner(BaseTestCase):
dir = tempfile.mkdtemp()
self.assertTrue(dir.endswith('aaa'))
- (fd, name) = self.default_mkstemp_inner()
+ (fd, name) = self.make_temp()
os.close(fd)
self.assertTrue(name.endswith('bbb'))
@@ -475,6 +509,20 @@ class TestGetTempDir(BaseTestCase):
self.assertTrue(a is b)
+ def test_case_sensitive(self):
+ # gettempdir should not flatten its case
+ # even on a case-insensitive file system
+ case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
+ _tempdir, tempfile.tempdir = tempfile.tempdir, None
+ try:
+ with support.EnvironmentVarGuard() as env:
+ # Fake the first env var which is checked as a candidate
+ env["TMPDIR"] = case_sensitive_tempdir
+ self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
+ finally:
+ tempfile.tempdir = _tempdir
+ support.rmdir(case_sensitive_tempdir)
+
class TestMkstemp(BaseTestCase):
"""Test mkstemp()."""
@@ -512,9 +560,12 @@ class TestMkstemp(BaseTestCase):
os.rmdir(dir)
-class TestMkdtemp(BaseTestCase):
+class TestMkdtemp(TestBadTempdir, BaseTestCase):
"""Test mkdtemp()."""
+ def make_temp(self):
+ return tempfile.mkdtemp()
+
def do_create(self, dir=None, pre="", suf=""):
if dir is None:
dir = tempfile.gettempdir()
@@ -563,7 +614,7 @@ class TestMkdtemp(BaseTestCase):
mode = stat.S_IMODE(os.stat(dir).st_mode)
mode &= 0o777 # Mask off sticky bits inherited from /tmp
expected = 0o700
- if sys.platform in ('win32', 'os2emx'):
+ if sys.platform == 'win32':
# There's no distinction among 'user', 'group' and 'world';
# replicate the 'user' bits.
user = expected >> 6
@@ -691,6 +742,19 @@ class TestNamedTemporaryFile(BaseTestCase):
# No reference cycle was created.
self.assertIsNone(wr())
+ def test_iter(self):
+ # Issue #23700: getting iterator from a temporary file should keep
+ # it alive as long as it's being iterated over
+ lines = [b'spam\n', b'eggs\n', b'beans\n']
+ def make_file():
+ f = tempfile.NamedTemporaryFile(mode='w+b')
+ f.write(b''.join(lines))
+ f.seek(0)
+ return f
+ for i, l in enumerate(make_file()):
+ self.assertEqual(l, lines[i])
+ self.assertEqual(i, len(lines) - 1)
+
def test_creates_named(self):
# NamedTemporaryFile creates files with names
f = tempfile.NamedTemporaryFile()
@@ -743,6 +807,19 @@ class TestNamedTemporaryFile(BaseTestCase):
pass
self.assertRaises(ValueError, use_closed)
+ def test_no_leak_fd(self):
+ # Issue #21058: don't leak file descriptor when io.open() fails
+ closed = []
+ os_close = os.close
+ def close(fd):
+ closed.append(fd)
+ os_close(fd)
+
+ with mock.patch('os.close', side_effect=close):
+ with mock.patch('io.open', side_effect=ValueError):
+ self.assertRaises(ValueError, tempfile.NamedTemporaryFile)
+ self.assertEqual(len(closed), 1)
+
# How to test the mode and bufsize parameters?
@@ -1046,6 +1123,20 @@ if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
roundtrip("\u039B", "w+", encoding="utf-16")
roundtrip("foo\r\n", "w+", newline="")
+ def test_no_leak_fd(self):
+ # Issue #21058: don't leak file descriptor when io.open() fails
+ closed = []
+ os_close = os.close
+ def close(fd):
+ closed.append(fd)
+ os_close(fd)
+
+ with mock.patch('os.close', side_effect=close):
+ with mock.patch('io.open', side_effect=ValueError):
+ self.assertRaises(ValueError, tempfile.TemporaryFile)
+ self.assertEqual(len(closed), 1)
+
+
# Helper for test_del_on_shutdown
class NulledModules:
@@ -1139,8 +1230,9 @@ class TestTemporaryDirectory(BaseTestCase):
def test_del_on_shutdown(self):
# A TemporaryDirectory may be cleaned up during shutdown
with self.do_create() as dir:
- for mod in ('os', 'shutil', 'sys', 'tempfile', 'warnings'):
+ for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'):
code = """if True:
+ import builtins
import os
import shutil
import sys
@@ -1165,6 +1257,31 @@ class TestTemporaryDirectory(BaseTestCase):
"TemporaryDirectory %s exists after cleanup" % tmp_name)
err = err.decode('utf-8', 'backslashreplace')
self.assertNotIn("Exception ", err)
+ self.assertIn("ResourceWarning: Implicitly cleaning up", err)
+
+ def test_exit_on_shutdown(self):
+ # Issue #22427
+ with self.do_create() as dir:
+ code = """if True:
+ import sys
+ import tempfile
+ import warnings
+
+ def generator():
+ with tempfile.TemporaryDirectory(dir={dir!r}) as tmp:
+ yield tmp
+ g = generator()
+ sys.stdout.buffer.write(next(g).encode())
+
+ warnings.filterwarnings("always", category=ResourceWarning)
+ """.format(dir=dir)
+ rc, out, err = script_helper.assert_python_ok("-c", code)
+ tmp_name = out.decode().strip()
+ self.assertFalse(os.path.exists(tmp_name),
+ "TemporaryDirectory %s exists after cleanup" % tmp_name)
+ err = err.decode('utf-8', 'backslashreplace')
+ self.assertNotIn("Exception ", err)
+ self.assertIn("ResourceWarning: Implicitly cleaning up", err)
def test_warnings_on_cleanup(self):
# ResourceWarning will be triggered by __del__