diff options
Diffstat (limited to 'Lib/test/test_subprocess.py')
-rw-r--r-- | Lib/test/test_subprocess.py | 274 |
1 files changed, 205 insertions, 69 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 15fb498..b260c81 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -16,6 +16,7 @@ import warnings import select import shutil import gc +import textwrap try: import resource @@ -62,6 +63,8 @@ class BaseTestCase(unittest.TestCase): # shutdown time. That frustrates tests trying to check stderr produced # from a spawned Python process. actual = support.strip_python_stderr(stderr) + # strip_python_stderr also strips whitespace, so we do too. + expected = expected.strip() self.assertEqual(actual, expected, msg) @@ -73,6 +76,15 @@ class ProcessTestCase(BaseTestCase): "import sys; sys.exit(47)"]) self.assertEqual(rc, 47) + def test_call_timeout(self): + # call() function with timeout argument; we want to test that the child + # process gets killed when the timeout expires. If the child isn't + # killed, this call will deadlock since subprocess.call waits for the + # child. + self.assertRaises(subprocess.TimeoutExpired, subprocess.call, + [sys.executable, "-c", "while True: pass"], + timeout=0.1) + def test_check_call_zero(self): # check_call() function with zero return code rc = subprocess.check_call([sys.executable, "-c", @@ -115,6 +127,21 @@ class ProcessTestCase(BaseTestCase): self.fail("Expected ValueError when stdout arg supplied.") self.assertIn('stdout', c.exception.args[0]) + def test_check_output_timeout(self): + # check_output() function with timeout arg + with self.assertRaises(subprocess.TimeoutExpired) as c: + output = subprocess.check_output( + [sys.executable, "-c", + "import sys, time\n" + "sys.stdout.write('BDFL')\n" + "sys.stdout.flush()\n" + "time.sleep(3600)"], + # Some heavily loaded buildbots (sparc Debian 3.x) require + # this much time to start and print. + timeout=3) + self.fail("Expected TimeoutExpired.") + self.assertEqual(c.exception.output, b'BDFL') + def test_call_kwargs(self): # call() function with keyword args newenv = os.environ.copy() @@ -165,6 +192,40 @@ class ProcessTestCase(BaseTestCase): p.wait() self.assertEqual(p.stderr, None) + def _assert_python(self, pre_args, **kwargs): + # We include sys.exit() to prevent the test runner from hanging + # whenever python is found. + args = pre_args + ["import sys; sys.exit(47)"] + p = subprocess.Popen(args, **kwargs) + p.wait() + self.assertEqual(47, p.returncode) + + def test_executable(self): + # Check that the executable argument works. + # + # On Unix (non-Mac and non-Windows), Python looks at args[0] to + # determine where its standard library is, so we need the directory + # of args[0] to be valid for the Popen() call to Python to succeed. + # See also issue #16170 and issue #7774. + doesnotexist = os.path.join(os.path.dirname(sys.executable), + "doesnotexist") + self._assert_python([doesnotexist, "-c"], executable=sys.executable) + + def test_executable_takes_precedence(self): + # Check that the executable argument takes precedence over args[0]. + # + # Verify first that the call succeeds without the executable arg. + pre_args = [sys.executable, "-c"] + self._assert_python(pre_args) + self.assertRaises(FileNotFoundError, self._assert_python, pre_args, + executable="doesnotexist") + + @unittest.skipIf(mswindows, "executable argument replaces shell") + def test_executable_replaces_shell(self): + # Check that the executable argument replaces the default shell + # when shell=True. + self._assert_python([], executable=sys.executable, shell=True) + # For use in the test_cwd* tests below. def _normalize_cwd(self, cwd): # Normalize an expected cwd (for Tru64 support). @@ -215,9 +276,9 @@ class ProcessTestCase(BaseTestCase): with support.temp_cwd() as wrong_dir: # Before calling with the correct cwd, confirm that the call fails # without cwd and with the wrong cwd. - self.assertRaises(OSError, subprocess.Popen, + self.assertRaises(FileNotFoundError, subprocess.Popen, [rel_python]) - self.assertRaises(OSError, subprocess.Popen, + self.assertRaises(FileNotFoundError, subprocess.Popen, [rel_python], cwd=wrong_dir) python_dir = self._normalize_cwd(python_dir) self._assert_cwd(python_dir, rel_python, cwd=python_dir) @@ -232,9 +293,9 @@ class ProcessTestCase(BaseTestCase): with support.temp_cwd() as wrong_dir: # Before calling with the correct cwd, confirm that the call fails # without cwd and with the wrong cwd. - self.assertRaises(OSError, subprocess.Popen, + self.assertRaises(FileNotFoundError, subprocess.Popen, [doesntexist], executable=rel_python) - self.assertRaises(OSError, subprocess.Popen, + self.assertRaises(FileNotFoundError, subprocess.Popen, [doesntexist], executable=rel_python, cwd=wrong_dir) python_dir = self._normalize_cwd(python_dir) @@ -250,17 +311,21 @@ class ProcessTestCase(BaseTestCase): with script_helper.temp_dir() as wrong_dir: # Before calling with an absolute path, confirm that using a # relative path fails. - self.assertRaises(OSError, subprocess.Popen, + self.assertRaises(FileNotFoundError, subprocess.Popen, [rel_python], cwd=wrong_dir) wrong_dir = self._normalize_cwd(wrong_dir) self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir) + @unittest.skipIf(sys.base_prefix != sys.prefix, + 'Test is not venv-compatible') def test_executable_with_cwd(self): python_dir, python_base = self._split_python_path() python_dir = self._normalize_cwd(python_dir) self._assert_cwd(python_dir, "somethingyoudonthave", executable=sys.executable, cwd=python_dir) + @unittest.skipIf(sys.base_prefix != sys.prefix, + 'Test is not venv-compatible') @unittest.skipIf(sysconfig.is_python_build(), "need an installed Python. See #7774") def test_executable_without_cwd(self): @@ -398,6 +463,31 @@ class ProcessTestCase(BaseTestCase): rc = subprocess.call([sys.executable, "-c", cmd], stdout=1) self.assertEqual(rc, 2) + def test_stdout_devnull(self): + p = subprocess.Popen([sys.executable, "-c", + 'for i in range(10240):' + 'print("x" * 1024)'], + stdout=subprocess.DEVNULL) + p.wait() + self.assertEqual(p.stdout, None) + + def test_stderr_devnull(self): + p = subprocess.Popen([sys.executable, "-c", + 'import sys\n' + 'for i in range(10240):' + 'sys.stderr.write("x" * 1024)'], + stderr=subprocess.DEVNULL) + p.wait() + self.assertEqual(p.stderr, None) + + def test_stdin_devnull(self): + p = subprocess.Popen([sys.executable, "-c", + 'import sys;' + 'sys.stdin.read(1)'], + stdin=subprocess.DEVNULL) + p.wait() + self.assertEqual(p.stdin, None) + def test_env(self): newenv = os.environ.copy() newenv["FRUIT"] = "orange" @@ -468,6 +558,41 @@ class ProcessTestCase(BaseTestCase): self.assertEqual(stdout, b"banana") self.assertStderrEqual(stderr, b"pineapple") + def test_communicate_timeout(self): + p = subprocess.Popen([sys.executable, "-c", + 'import sys,os,time;' + 'sys.stderr.write("pineapple\\n");' + 'time.sleep(1);' + 'sys.stderr.write("pear\\n");' + 'sys.stdout.write(sys.stdin.read())'], + universal_newlines=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana", + timeout=0.3) + # Make sure we can keep waiting for it, and that we get the whole output + # after it completes. + (stdout, stderr) = p.communicate() + self.assertEqual(stdout, "banana") + self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n") + + def test_communicate_timeout_large_ouput(self): + # Test an expiring timeout while the child is outputting lots of data. + p = subprocess.Popen([sys.executable, "-c", + 'import sys,os,time;' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));'], + stdout=subprocess.PIPE) + self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4) + (stdout, _) = p.communicate() + self.assertEqual(len(stdout), 4 * 64 * 1024) + # Test for the fd leak reported in http://bugs.python.org/issue2791. def test_communicate_pipe_fd_leak(self): for stdin_pipe in (False, True): @@ -504,24 +629,21 @@ class ProcessTestCase(BaseTestCase): # This test will probably deadlock rather than fail, if # communicate() does not work properly. x, y = os.pipe() - if mswindows: - pipe_buf = 512 - else: - pipe_buf = os.fpathconf(x, "PC_PIPE_BUF") os.close(x) os.close(y) p = subprocess.Popen([sys.executable, "-c", 'import sys,os;' 'sys.stdout.write(sys.stdin.read(47));' - 'sys.stderr.write("xyz"*%d);' - 'sys.stdout.write(sys.stdin.read())' % pipe_buf], + 'sys.stderr.write("x" * %d);' + 'sys.stdout.write(sys.stdin.read())' % + support.PIPE_MAX_SIZE], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.addCleanup(p.stdout.close) self.addCleanup(p.stderr.close) self.addCleanup(p.stdin.close) - string_to_write = b"abc"*pipe_buf + string_to_write = b"a" * support.PIPE_MAX_SIZE (stdout, stderr) = p.communicate(string_to_write) self.assertEqual(stdout, string_to_write) @@ -596,8 +718,6 @@ class ProcessTestCase(BaseTestCase): universal_newlines=1) self.addCleanup(p.stdout.close) self.addCleanup(p.stderr.close) - # BUG: can't give a non-empty stdin because it breaks both the - # select- and poll-based communicate() implementations. (stdout, stderr) = p.communicate() self.assertEqual(stdout, "line2\nline4\nline5\nline6\nline7\nline8") @@ -605,12 +725,12 @@ class ProcessTestCase(BaseTestCase): def test_universal_newlines_communicate_stdin(self): # universal newlines through communicate(), with only stdin p = subprocess.Popen([sys.executable, "-c", - 'import sys,os;' + SETBINARY + '''\nif True: - s = sys.stdin.readline() - assert s == "line1\\n", repr(s) - s = sys.stdin.read() - assert s == "line3\\n", repr(s) - '''], + 'import sys,os;' + SETBINARY + textwrap.dedent(''' + s = sys.stdin.readline() + assert s == "line1\\n", repr(s) + s = sys.stdin.read() + assert s == "line3\\n", repr(s) + ''')], stdin=subprocess.PIPE, universal_newlines=1) (stdout, stderr) = p.communicate("line1\nline3\n") @@ -628,6 +748,35 @@ class ProcessTestCase(BaseTestCase): p.communicate() self.assertEqual(p.returncode, 0) + def test_universal_newlines_communicate_stdin_stdout_stderr(self): + # universal newlines through communicate(), with stdin, stdout, stderr + p = subprocess.Popen([sys.executable, "-c", + 'import sys,os;' + SETBINARY + textwrap.dedent(''' + s = sys.stdin.buffer.readline() + sys.stdout.buffer.write(s) + sys.stdout.buffer.write(b"line2\\r") + sys.stderr.buffer.write(b"eline2\\n") + s = sys.stdin.buffer.read() + sys.stdout.buffer.write(s) + sys.stdout.buffer.write(b"line4\\n") + sys.stdout.buffer.write(b"line5\\r\\n") + sys.stderr.buffer.write(b"eline6\\r") + sys.stderr.buffer.write(b"eline7\\r\\nz") + ''')], + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=True) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stderr.close) + (stdout, stderr) = p.communicate("line1\nline3\n") + self.assertEqual(p.returncode, 0) + self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout) + # Python debug build push something like "[42442 refs]\n" + # to stderr at exit of subprocess. + # Don't use assertStderrEqual because it strips CR and LF from output. + self.assertTrue(stderr.startswith("eline2\neline6\neline7\n")) + def test_universal_newlines_communicate_encodings(self): # Check that universal newlines mode works for various encodings, # in particular for encodings in the UTF-16 and UTF-32 families. @@ -657,7 +806,6 @@ class ProcessTestCase(BaseTestCase): stdout, stderr = popen.communicate(input='') finally: locale.getpreferredencoding = old_getpreferredencoding - self.assertEqual(stdout, '1\n2\n3\n4') def test_no_leaking(self): @@ -717,30 +865,32 @@ class ProcessTestCase(BaseTestCase): self.assertEqual(subprocess.list2cmdline(['ab', '']), 'ab ""') - def test_poll(self): - p = subprocess.Popen([sys.executable, - "-c", "import time; time.sleep(1)"]) - count = 0 - while p.poll() is None: - time.sleep(0.1) - count += 1 - # We expect that the poll loop probably went around about 10 times, - # but, based on system scheduling we can't control, it's possible - # poll() never returned None. It "should be" very rare that it - # didn't go around at least twice. - self.assertGreaterEqual(count, 2) + p = subprocess.Popen([sys.executable, "-c", + "import os; os.read(0, 1)"], + stdin=subprocess.PIPE) + self.addCleanup(p.stdin.close) + self.assertIsNone(p.poll()) + os.write(p.stdin.fileno(), b'A') + p.wait() # Subsequent invocations should just return the returncode self.assertEqual(p.poll(), 0) - def test_wait(self): - p = subprocess.Popen([sys.executable, - "-c", "import time; time.sleep(2)"]) + p = subprocess.Popen([sys.executable, "-c", "pass"]) self.assertEqual(p.wait(), 0) # Subsequent invocations should just return the returncode self.assertEqual(p.wait(), 0) + def test_wait_timeout(self): + p = subprocess.Popen([sys.executable, + "-c", "import time; time.sleep(0.1)"]) + with self.assertRaises(subprocess.TimeoutExpired) as c: + p.wait(timeout=0.01) + self.assertIn("0.01", str(c.exception)) # For coverage of __str__. + # Some heavily loaded buildbots (sparc Debian 3.x) require this much + # time to start. + self.assertEqual(p.wait(timeout=3), 0) def test_invalid_bufsize(self): # an invalid type of the bufsize argument should raise @@ -819,25 +969,29 @@ class ProcessTestCase(BaseTestCase): p = subprocess.Popen([sys.executable, "-c", 'pass'], stdin=subprocess.PIPE) self.addCleanup(p.stdin.close) - time.sleep(2) + p.wait() p.communicate(b"x" * 2**20) - @unittest.skipUnless(hasattr(signal, 'SIGALRM'), - "Requires signal.SIGALRM") + @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), + "Requires signal.SIGUSR1") + @unittest.skipUnless(hasattr(os, 'kill'), + "Requires os.kill") + @unittest.skipUnless(hasattr(os, 'getppid'), + "Requires os.getppid") def test_communicate_eintr(self): # Issue #12493: communicate() should handle EINTR def handler(signum, frame): pass - old_handler = signal.signal(signal.SIGALRM, handler) - self.addCleanup(signal.signal, signal.SIGALRM, old_handler) + old_handler = signal.signal(signal.SIGUSR1, handler) + self.addCleanup(signal.signal, signal.SIGUSR1, old_handler) - # the process is running for 2 seconds - args = [sys.executable, "-c", 'import time; time.sleep(2)'] + args = [sys.executable, "-c", + 'import os, signal;' + 'os.kill(os.getppid(), signal.SIGUSR1)'] for stream in ('stdout', 'stderr'): kw = {stream: subprocess.PIPE} with subprocess.Popen(args, **kw) as process: - signal.alarm(1) - # communicate() will be interrupted by SIGALRM + # communicate() will be interrupted by SIGUSR1 process.communicate() @@ -1401,6 +1555,11 @@ class POSIXProcessTestCase(BaseTestCase): exitcode = subprocess.call([abs_program, "-c", "pass"]) self.assertEqual(exitcode, 0) + # absolute bytes path as a string + cmd = b"'" + abs_program + b"' -c pass" + exitcode = subprocess.call(cmd, shell=True) + self.assertEqual(exitcode, 0) + # bytes program, unicode PATH env = os.environ.copy() env["PATH"] = path @@ -1584,7 +1743,7 @@ class POSIXProcessTestCase(BaseTestCase): stdout, stderr = p.communicate() self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" " non-zero with this error:\n%s" % - stderr.decode('utf8')) + stderr.decode('utf-8')) def test_select_unbuffered(self): # Issue #11459: bufsize=0 should really set the pipes as @@ -1831,28 +1990,6 @@ class ProcessTestCaseNoPoll(ProcessTestCase): ProcessTestCase.tearDown(self) -@unittest.skipUnless(getattr(subprocess, '_posixsubprocess', False), - "_posixsubprocess extension module not found.") -class ProcessTestCasePOSIXPurePython(ProcessTestCase, POSIXProcessTestCase): - @classmethod - def setUpClass(cls): - global subprocess - assert subprocess._posixsubprocess - # Reimport subprocess while forcing _posixsubprocess to not exist. - with support.check_warnings(('.*_posixsubprocess .* not being used.*', - RuntimeWarning)): - subprocess = support.import_fresh_module( - 'subprocess', blocked=['_posixsubprocess']) - assert not subprocess._posixsubprocess - - @classmethod - def tearDownClass(cls): - global subprocess - # Reimport subprocess as it should be, restoring order to the universe. - subprocess = support.import_fresh_module('subprocess') - assert subprocess._posixsubprocess - - class HelperFunctionTests(unittest.TestCase): @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows") def test_eintr_retry_call(self): @@ -1961,7 +2098,6 @@ def test_main(): unit_tests = (ProcessTestCase, POSIXProcessTestCase, Win32ProcessTestCase, - ProcessTestCasePOSIXPurePython, CommandTests, ProcessTestCaseNoPoll, HelperFunctionTests, |