diff options
Diffstat (limited to 'Lib/test/test_subprocess.py')
| -rw-r--r-- | Lib/test/test_subprocess.py | 250 |
1 files changed, 220 insertions, 30 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 4719cc0..758e094 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -1,5 +1,6 @@ import unittest -from test import script_helper +from unittest import mock +from test.support import script_helper from test import support import subprocess import sys @@ -381,7 +382,7 @@ class ProcessTestCase(BaseTestCase): python_dir, python_base = self._split_python_path() abs_python = os.path.join(python_dir, python_base) rel_python = os.path.join(os.curdir, python_base) - with script_helper.temp_dir() as wrong_dir: + with support.temp_dir() as wrong_dir: # Before calling with an absolute path, confirm that using a # relative path fails. self.assertRaises(FileNotFoundError, subprocess.Popen, @@ -504,6 +505,27 @@ class ProcessTestCase(BaseTestCase): tf.seek(0) self.assertStderrEqual(tf.read(), b"strawberry") + def test_stderr_redirect_with_no_stdout_redirect(self): + # test stderr=STDOUT while stdout=None (not set) + + # - grandchild prints to stderr + # - child redirects grandchild's stderr to its stdout + # - the parent should get grandchild's stderr in child's stdout + p = subprocess.Popen([sys.executable, "-c", + 'import sys, subprocess;' + 'rc = subprocess.call([sys.executable, "-c",' + ' "import sys;"' + ' "sys.stderr.write(\'42\')"],' + ' stderr=subprocess.STDOUT);' + 'sys.exit(rc)'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + #NOTE: stdout should get stderr from grandchild + self.assertStderrEqual(stdout, b'42') + self.assertStderrEqual(stderr, b'') # should be empty + self.assertEqual(p.returncode, 0) + def test_stdout_stderr_pipe(self): # capture stdout and stderr to the same pipe p = subprocess.Popen([sys.executable, "-c", @@ -664,7 +686,7 @@ class ProcessTestCase(BaseTestCase): self.assertEqual(stdout, "banana") self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n") - def test_communicate_timeout_large_ouput(self): + def test_communicate_timeout_large_output(self): # Test an expiring timeout while the child is outputting lots of data. p = subprocess.Popen([sys.executable, "-c", 'import sys,os,time;' @@ -1219,6 +1241,102 @@ class ProcessTestCase(BaseTestCase): fds_after_exception = os.listdir(fd_directory) self.assertEqual(fds_before_popen, fds_after_exception) + +class RunFuncTestCase(BaseTestCase): + def run_python(self, code, **kwargs): + """Run Python code in a subprocess using subprocess.run""" + argv = [sys.executable, "-c", code] + return subprocess.run(argv, **kwargs) + + def test_returncode(self): + # call() function with sequence argument + cp = self.run_python("import sys; sys.exit(47)") + self.assertEqual(cp.returncode, 47) + with self.assertRaises(subprocess.CalledProcessError): + cp.check_returncode() + + def test_check(self): + with self.assertRaises(subprocess.CalledProcessError) as c: + self.run_python("import sys; sys.exit(47)", check=True) + self.assertEqual(c.exception.returncode, 47) + + def test_check_zero(self): + # check_returncode shouldn't raise when returncode is zero + cp = self.run_python("import sys; sys.exit(0)", check=True) + self.assertEqual(cp.returncode, 0) + + def test_timeout(self): + # run() function with timeout argument; we want to test that the child + # process gets killed when the timeout expires. If the child isn't + # killed, this call will deadlock since subprocess.run waits for the + # child. + with self.assertRaises(subprocess.TimeoutExpired): + self.run_python("while True: pass", timeout=0.0001) + + def test_capture_stdout(self): + # capture stdout with zero return code + cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE) + self.assertIn(b'BDFL', cp.stdout) + + def test_capture_stderr(self): + cp = self.run_python("import sys; sys.stderr.write('BDFL')", + stderr=subprocess.PIPE) + self.assertIn(b'BDFL', cp.stderr) + + def test_check_output_stdin_arg(self): + # run() can be called with stdin set to a file + tf = tempfile.TemporaryFile() + self.addCleanup(tf.close) + tf.write(b'pear') + tf.seek(0) + cp = self.run_python( + "import sys; sys.stdout.write(sys.stdin.read().upper())", + stdin=tf, stdout=subprocess.PIPE) + self.assertIn(b'PEAR', cp.stdout) + + def test_check_output_input_arg(self): + # check_output() can be called with input set to a string + cp = self.run_python( + "import sys; sys.stdout.write(sys.stdin.read().upper())", + input=b'pear', stdout=subprocess.PIPE) + self.assertIn(b'PEAR', cp.stdout) + + def test_check_output_stdin_with_input_arg(self): + # run() refuses to accept 'stdin' with 'input' + tf = tempfile.TemporaryFile() + self.addCleanup(tf.close) + tf.write(b'pear') + tf.seek(0) + with self.assertRaises(ValueError, + msg="Expected ValueError when stdin and input args supplied.") as c: + output = self.run_python("print('will not be run')", + stdin=tf, input=b'hare') + self.assertIn('stdin', c.exception.args[0]) + self.assertIn('input', c.exception.args[0]) + + def test_check_output_timeout(self): + with self.assertRaises(subprocess.TimeoutExpired) as c: + cp = self.run_python(( + "import sys, time\n" + "sys.stdout.write('BDFL')\n" + "sys.stdout.flush()\n" + "time.sleep(3600)"), + # Some heavily loaded buildbots (sparc Debian 3.x) require + # this much time to start and print. + timeout=3, stdout=subprocess.PIPE) + self.assertEqual(c.exception.output, b'BDFL') + # output is aliased to stdout + self.assertEqual(c.exception.stdout, b'BDFL') + + def test_run_kwargs(self): + newenv = os.environ.copy() + newenv["FRUIT"] = "banana" + cp = self.run_python(('import sys, os;' + 'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'), + env=newenv) + self.assertEqual(cp.returncode, 33) + + @unittest.skipIf(mswindows, "POSIX specific tests") class POSIXProcessTestCase(BaseTestCase): @@ -1236,7 +1354,7 @@ class POSIXProcessTestCase(BaseTestCase): desired_exception = e desired_exception.strerror += ': ' + repr(self._nonexistent_dir) else: - self.fail("chdir to nonexistant directory %s succeeded." % + self.fail("chdir to nonexistent directory %s succeeded." % self._nonexistent_dir) return desired_exception @@ -1429,10 +1547,14 @@ class POSIXProcessTestCase(BaseTestCase): [_, hard] = limits setrlimit(RLIMIT_NPROC, (0, hard)) self.addCleanup(setrlimit, RLIMIT_NPROC, limits) - # Forking should raise EAGAIN, translated to BlockingIOError - with self.assertRaises(BlockingIOError): + try: subprocess.call([sys.executable, '-c', ''], preexec_fn=lambda: None) + except BlockingIOError: + # Forking should raise EAGAIN, translated to BlockingIOError + pass + else: + self.skipTest('RLIMIT_NPROC had no effect; probably superuser') def test_args_string(self): # args is a string @@ -2234,8 +2356,6 @@ class POSIXProcessTestCase(BaseTestCase): func = lambda: None gc.enable() - executable_list = "exec" # error: must be a sequence - for args, exe_list, cwd, env_list in ( (123, [b"exe"], None, [b"env"]), ([b"arg"], 123, None, [b"env"]), @@ -2253,6 +2373,80 @@ class POSIXProcessTestCase(BaseTestCase): if not gc_enabled: gc.disable() + @support.cpython_only + def test_fork_exec_sorted_fd_sanity_check(self): + # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check. + import _posixsubprocess + gc_enabled = gc.isenabled() + try: + gc.enable() + + for fds_to_keep in ( + (-1, 2, 3, 4, 5), # Negative number. + ('str', 4), # Not an int. + (18, 23, 42, 2**63), # Out of range. + (5, 4), # Not sorted. + (6, 7, 7, 8), # Duplicate. + ): + with self.assertRaises( + ValueError, + msg='fds_to_keep={}'.format(fds_to_keep)) as c: + _posixsubprocess.fork_exec( + [b"false"], [b"false"], + True, fds_to_keep, None, [b"env"], + -1, -1, -1, -1, + 1, 2, 3, 4, + True, True, None) + self.assertIn('fds_to_keep', str(c.exception)) + finally: + if not gc_enabled: + gc.disable() + + def test_communicate_BrokenPipeError_stdin_close(self): + # By not setting stdout or stderr or a timeout we force the fast path + # that just calls _stdin_write() internally due to our mock. + proc = subprocess.Popen([sys.executable, '-c', 'pass']) + with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: + mock_proc_stdin.close.side_effect = BrokenPipeError + proc.communicate() # Should swallow BrokenPipeError from close. + mock_proc_stdin.close.assert_called_with() + + def test_communicate_BrokenPipeError_stdin_write(self): + # By not setting stdout or stderr or a timeout we force the fast path + # that just calls _stdin_write() internally due to our mock. + proc = subprocess.Popen([sys.executable, '-c', 'pass']) + with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: + mock_proc_stdin.write.side_effect = BrokenPipeError + proc.communicate(b'stuff') # Should swallow the BrokenPipeError. + mock_proc_stdin.write.assert_called_once_with(b'stuff') + mock_proc_stdin.close.assert_called_once_with() + + def test_communicate_BrokenPipeError_stdin_flush(self): + # Setting stdin and stdout forces the ._communicate() code path. + # python -h exits faster than python -c pass (but spams stdout). + proc = subprocess.Popen([sys.executable, '-h'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE) + with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \ + open(os.devnull, 'wb') as dev_null: + mock_proc_stdin.flush.side_effect = BrokenPipeError + # because _communicate registers a selector using proc.stdin... + mock_proc_stdin.fileno.return_value = dev_null.fileno() + # _communicate() should swallow BrokenPipeError from flush. + proc.communicate(b'stuff') + mock_proc_stdin.flush.assert_called_once_with() + + def test_communicate_BrokenPipeError_stdin_close_with_timeout(self): + # Setting stdin and stdout forces the ._communicate() code path. + # python -h exits faster than python -c pass (but spams stdout). + proc = subprocess.Popen([sys.executable, '-h'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE) + with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: + mock_proc_stdin.close.side_effect = BrokenPipeError + # _communicate() should swallow BrokenPipeError from close. + proc.communicate(timeout=999) + mock_proc_stdin.close.assert_called_once_with() @unittest.skipUnless(mswindows, "Windows specific tests") @@ -2392,7 +2586,7 @@ class Win32ProcessTestCase(BaseTestCase): def test_terminate_dead(self): self._kill_dead_process('terminate') -class CommandTests(unittest.TestCase): +class MiscTests(unittest.TestCase): def test_getoutput(self): self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy') self.assertEqual(subprocess.getstatusoutput('echo xyzzy'), @@ -2412,6 +2606,21 @@ class CommandTests(unittest.TestCase): if dir is not None: os.rmdir(dir) + def test__all__(self): + """Ensure that __all__ is populated properly.""" + # STARTUPINFO added to __all__ in 3.6 + intentionally_excluded = {"list2cmdline", "STARTUPINFO", "Handle"} + exported = set(subprocess.__all__) + possible_exports = set() + import types + for name, value in subprocess.__dict__.items(): + if name.startswith('_'): + continue + if isinstance(value, (types.ModuleType,)): + continue + possible_exports.add(name) + self.assertEqual(exported, possible_exports - intentionally_excluded) + @unittest.skipUnless(hasattr(selectors, 'PollSelector'), "Test needs selectors.PollSelector") @@ -2426,25 +2635,6 @@ class ProcessTestCaseNoPoll(ProcessTestCase): ProcessTestCase.tearDown(self) -class HelperFunctionTests(unittest.TestCase): - @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows") - def test_eintr_retry_call(self): - record_calls = [] - def fake_os_func(*args): - record_calls.append(args) - if len(record_calls) == 2: - raise OSError(errno.EINTR, "fake interrupted system call") - return tuple(reversed(args)) - - self.assertEqual((999, 256), - subprocess._eintr_retry_call(fake_os_func, 256, 999)) - self.assertEqual([(256, 999)], record_calls) - # This time there will be an EINTR so it will loop once. - self.assertEqual((666,), - subprocess._eintr_retry_call(fake_os_func, 666)) - self.assertEqual([(256, 999), (666,), (666,)], record_calls) - - @unittest.skipUnless(mswindows, "Windows-specific tests") class CommandsWithSpaces (BaseTestCase): @@ -2547,11 +2737,11 @@ def test_main(): unit_tests = (ProcessTestCase, POSIXProcessTestCase, Win32ProcessTestCase, - CommandTests, + MiscTests, ProcessTestCaseNoPoll, - HelperFunctionTests, CommandsWithSpaces, ContextManagerTests, + RunFuncTestCase, ) support.run_unittest(*unit_tests) |
