summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_subprocess.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_subprocess.py')
-rw-r--r--Lib/test/test_subprocess.py250
1 files changed, 220 insertions, 30 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
index 4719cc0..758e094 100644
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -1,5 +1,6 @@
import unittest
-from test import script_helper
+from unittest import mock
+from test.support import script_helper
from test import support
import subprocess
import sys
@@ -381,7 +382,7 @@ class ProcessTestCase(BaseTestCase):
python_dir, python_base = self._split_python_path()
abs_python = os.path.join(python_dir, python_base)
rel_python = os.path.join(os.curdir, python_base)
- with script_helper.temp_dir() as wrong_dir:
+ with support.temp_dir() as wrong_dir:
# Before calling with an absolute path, confirm that using a
# relative path fails.
self.assertRaises(FileNotFoundError, subprocess.Popen,
@@ -504,6 +505,27 @@ class ProcessTestCase(BaseTestCase):
tf.seek(0)
self.assertStderrEqual(tf.read(), b"strawberry")
+ def test_stderr_redirect_with_no_stdout_redirect(self):
+ # test stderr=STDOUT while stdout=None (not set)
+
+ # - grandchild prints to stderr
+ # - child redirects grandchild's stderr to its stdout
+ # - the parent should get grandchild's stderr in child's stdout
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys, subprocess;'
+ 'rc = subprocess.call([sys.executable, "-c",'
+ ' "import sys;"'
+ ' "sys.stderr.write(\'42\')"],'
+ ' stderr=subprocess.STDOUT);'
+ 'sys.exit(rc)'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ #NOTE: stdout should get stderr from grandchild
+ self.assertStderrEqual(stdout, b'42')
+ self.assertStderrEqual(stderr, b'') # should be empty
+ self.assertEqual(p.returncode, 0)
+
def test_stdout_stderr_pipe(self):
# capture stdout and stderr to the same pipe
p = subprocess.Popen([sys.executable, "-c",
@@ -664,7 +686,7 @@ class ProcessTestCase(BaseTestCase):
self.assertEqual(stdout, "banana")
self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n")
- def test_communicate_timeout_large_ouput(self):
+ def test_communicate_timeout_large_output(self):
# Test an expiring timeout while the child is outputting lots of data.
p = subprocess.Popen([sys.executable, "-c",
'import sys,os,time;'
@@ -1219,6 +1241,102 @@ class ProcessTestCase(BaseTestCase):
fds_after_exception = os.listdir(fd_directory)
self.assertEqual(fds_before_popen, fds_after_exception)
+
+class RunFuncTestCase(BaseTestCase):
+ def run_python(self, code, **kwargs):
+ """Run Python code in a subprocess using subprocess.run"""
+ argv = [sys.executable, "-c", code]
+ return subprocess.run(argv, **kwargs)
+
+ def test_returncode(self):
+ # call() function with sequence argument
+ cp = self.run_python("import sys; sys.exit(47)")
+ self.assertEqual(cp.returncode, 47)
+ with self.assertRaises(subprocess.CalledProcessError):
+ cp.check_returncode()
+
+ def test_check(self):
+ with self.assertRaises(subprocess.CalledProcessError) as c:
+ self.run_python("import sys; sys.exit(47)", check=True)
+ self.assertEqual(c.exception.returncode, 47)
+
+ def test_check_zero(self):
+ # check_returncode shouldn't raise when returncode is zero
+ cp = self.run_python("import sys; sys.exit(0)", check=True)
+ self.assertEqual(cp.returncode, 0)
+
+ def test_timeout(self):
+ # run() function with timeout argument; we want to test that the child
+ # process gets killed when the timeout expires. If the child isn't
+ # killed, this call will deadlock since subprocess.run waits for the
+ # child.
+ with self.assertRaises(subprocess.TimeoutExpired):
+ self.run_python("while True: pass", timeout=0.0001)
+
+ def test_capture_stdout(self):
+ # capture stdout with zero return code
+ cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE)
+ self.assertIn(b'BDFL', cp.stdout)
+
+ def test_capture_stderr(self):
+ cp = self.run_python("import sys; sys.stderr.write('BDFL')",
+ stderr=subprocess.PIPE)
+ self.assertIn(b'BDFL', cp.stderr)
+
+ def test_check_output_stdin_arg(self):
+ # run() can be called with stdin set to a file
+ tf = tempfile.TemporaryFile()
+ self.addCleanup(tf.close)
+ tf.write(b'pear')
+ tf.seek(0)
+ cp = self.run_python(
+ "import sys; sys.stdout.write(sys.stdin.read().upper())",
+ stdin=tf, stdout=subprocess.PIPE)
+ self.assertIn(b'PEAR', cp.stdout)
+
+ def test_check_output_input_arg(self):
+ # check_output() can be called with input set to a string
+ cp = self.run_python(
+ "import sys; sys.stdout.write(sys.stdin.read().upper())",
+ input=b'pear', stdout=subprocess.PIPE)
+ self.assertIn(b'PEAR', cp.stdout)
+
+ def test_check_output_stdin_with_input_arg(self):
+ # run() refuses to accept 'stdin' with 'input'
+ tf = tempfile.TemporaryFile()
+ self.addCleanup(tf.close)
+ tf.write(b'pear')
+ tf.seek(0)
+ with self.assertRaises(ValueError,
+ msg="Expected ValueError when stdin and input args supplied.") as c:
+ output = self.run_python("print('will not be run')",
+ stdin=tf, input=b'hare')
+ self.assertIn('stdin', c.exception.args[0])
+ self.assertIn('input', c.exception.args[0])
+
+ def test_check_output_timeout(self):
+ with self.assertRaises(subprocess.TimeoutExpired) as c:
+ cp = self.run_python((
+ "import sys, time\n"
+ "sys.stdout.write('BDFL')\n"
+ "sys.stdout.flush()\n"
+ "time.sleep(3600)"),
+ # Some heavily loaded buildbots (sparc Debian 3.x) require
+ # this much time to start and print.
+ timeout=3, stdout=subprocess.PIPE)
+ self.assertEqual(c.exception.output, b'BDFL')
+ # output is aliased to stdout
+ self.assertEqual(c.exception.stdout, b'BDFL')
+
+ def test_run_kwargs(self):
+ newenv = os.environ.copy()
+ newenv["FRUIT"] = "banana"
+ cp = self.run_python(('import sys, os;'
+ 'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'),
+ env=newenv)
+ self.assertEqual(cp.returncode, 33)
+
+
@unittest.skipIf(mswindows, "POSIX specific tests")
class POSIXProcessTestCase(BaseTestCase):
@@ -1236,7 +1354,7 @@ class POSIXProcessTestCase(BaseTestCase):
desired_exception = e
desired_exception.strerror += ': ' + repr(self._nonexistent_dir)
else:
- self.fail("chdir to nonexistant directory %s succeeded." %
+ self.fail("chdir to nonexistent directory %s succeeded." %
self._nonexistent_dir)
return desired_exception
@@ -1429,10 +1547,14 @@ class POSIXProcessTestCase(BaseTestCase):
[_, hard] = limits
setrlimit(RLIMIT_NPROC, (0, hard))
self.addCleanup(setrlimit, RLIMIT_NPROC, limits)
- # Forking should raise EAGAIN, translated to BlockingIOError
- with self.assertRaises(BlockingIOError):
+ try:
subprocess.call([sys.executable, '-c', ''],
preexec_fn=lambda: None)
+ except BlockingIOError:
+ # Forking should raise EAGAIN, translated to BlockingIOError
+ pass
+ else:
+ self.skipTest('RLIMIT_NPROC had no effect; probably superuser')
def test_args_string(self):
# args is a string
@@ -2234,8 +2356,6 @@ class POSIXProcessTestCase(BaseTestCase):
func = lambda: None
gc.enable()
- executable_list = "exec" # error: must be a sequence
-
for args, exe_list, cwd, env_list in (
(123, [b"exe"], None, [b"env"]),
([b"arg"], 123, None, [b"env"]),
@@ -2253,6 +2373,80 @@ class POSIXProcessTestCase(BaseTestCase):
if not gc_enabled:
gc.disable()
+ @support.cpython_only
+ def test_fork_exec_sorted_fd_sanity_check(self):
+ # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check.
+ import _posixsubprocess
+ gc_enabled = gc.isenabled()
+ try:
+ gc.enable()
+
+ for fds_to_keep in (
+ (-1, 2, 3, 4, 5), # Negative number.
+ ('str', 4), # Not an int.
+ (18, 23, 42, 2**63), # Out of range.
+ (5, 4), # Not sorted.
+ (6, 7, 7, 8), # Duplicate.
+ ):
+ with self.assertRaises(
+ ValueError,
+ msg='fds_to_keep={}'.format(fds_to_keep)) as c:
+ _posixsubprocess.fork_exec(
+ [b"false"], [b"false"],
+ True, fds_to_keep, None, [b"env"],
+ -1, -1, -1, -1,
+ 1, 2, 3, 4,
+ True, True, None)
+ self.assertIn('fds_to_keep', str(c.exception))
+ finally:
+ if not gc_enabled:
+ gc.disable()
+
+ def test_communicate_BrokenPipeError_stdin_close(self):
+ # By not setting stdout or stderr or a timeout we force the fast path
+ # that just calls _stdin_write() internally due to our mock.
+ proc = subprocess.Popen([sys.executable, '-c', 'pass'])
+ with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
+ mock_proc_stdin.close.side_effect = BrokenPipeError
+ proc.communicate() # Should swallow BrokenPipeError from close.
+ mock_proc_stdin.close.assert_called_with()
+
+ def test_communicate_BrokenPipeError_stdin_write(self):
+ # By not setting stdout or stderr or a timeout we force the fast path
+ # that just calls _stdin_write() internally due to our mock.
+ proc = subprocess.Popen([sys.executable, '-c', 'pass'])
+ with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
+ mock_proc_stdin.write.side_effect = BrokenPipeError
+ proc.communicate(b'stuff') # Should swallow the BrokenPipeError.
+ mock_proc_stdin.write.assert_called_once_with(b'stuff')
+ mock_proc_stdin.close.assert_called_once_with()
+
+ def test_communicate_BrokenPipeError_stdin_flush(self):
+ # Setting stdin and stdout forces the ._communicate() code path.
+ # python -h exits faster than python -c pass (but spams stdout).
+ proc = subprocess.Popen([sys.executable, '-h'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE)
+ with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \
+ open(os.devnull, 'wb') as dev_null:
+ mock_proc_stdin.flush.side_effect = BrokenPipeError
+ # because _communicate registers a selector using proc.stdin...
+ mock_proc_stdin.fileno.return_value = dev_null.fileno()
+ # _communicate() should swallow BrokenPipeError from flush.
+ proc.communicate(b'stuff')
+ mock_proc_stdin.flush.assert_called_once_with()
+
+ def test_communicate_BrokenPipeError_stdin_close_with_timeout(self):
+ # Setting stdin and stdout forces the ._communicate() code path.
+ # python -h exits faster than python -c pass (but spams stdout).
+ proc = subprocess.Popen([sys.executable, '-h'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE)
+ with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
+ mock_proc_stdin.close.side_effect = BrokenPipeError
+ # _communicate() should swallow BrokenPipeError from close.
+ proc.communicate(timeout=999)
+ mock_proc_stdin.close.assert_called_once_with()
@unittest.skipUnless(mswindows, "Windows specific tests")
@@ -2392,7 +2586,7 @@ class Win32ProcessTestCase(BaseTestCase):
def test_terminate_dead(self):
self._kill_dead_process('terminate')
-class CommandTests(unittest.TestCase):
+class MiscTests(unittest.TestCase):
def test_getoutput(self):
self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
@@ -2412,6 +2606,21 @@ class CommandTests(unittest.TestCase):
if dir is not None:
os.rmdir(dir)
+ def test__all__(self):
+ """Ensure that __all__ is populated properly."""
+ # STARTUPINFO added to __all__ in 3.6
+ intentionally_excluded = {"list2cmdline", "STARTUPINFO", "Handle"}
+ exported = set(subprocess.__all__)
+ possible_exports = set()
+ import types
+ for name, value in subprocess.__dict__.items():
+ if name.startswith('_'):
+ continue
+ if isinstance(value, (types.ModuleType,)):
+ continue
+ possible_exports.add(name)
+ self.assertEqual(exported, possible_exports - intentionally_excluded)
+
@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
"Test needs selectors.PollSelector")
@@ -2426,25 +2635,6 @@ class ProcessTestCaseNoPoll(ProcessTestCase):
ProcessTestCase.tearDown(self)
-class HelperFunctionTests(unittest.TestCase):
- @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
- def test_eintr_retry_call(self):
- record_calls = []
- def fake_os_func(*args):
- record_calls.append(args)
- if len(record_calls) == 2:
- raise OSError(errno.EINTR, "fake interrupted system call")
- return tuple(reversed(args))
-
- self.assertEqual((999, 256),
- subprocess._eintr_retry_call(fake_os_func, 256, 999))
- self.assertEqual([(256, 999)], record_calls)
- # This time there will be an EINTR so it will loop once.
- self.assertEqual((666,),
- subprocess._eintr_retry_call(fake_os_func, 666))
- self.assertEqual([(256, 999), (666,), (666,)], record_calls)
-
-
@unittest.skipUnless(mswindows, "Windows-specific tests")
class CommandsWithSpaces (BaseTestCase):
@@ -2547,11 +2737,11 @@ def test_main():
unit_tests = (ProcessTestCase,
POSIXProcessTestCase,
Win32ProcessTestCase,
- CommandTests,
+ MiscTests,
ProcessTestCaseNoPoll,
- HelperFunctionTests,
CommandsWithSpaces,
ContextManagerTests,
+ RunFuncTestCase,
)
support.run_unittest(*unit_tests)