summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_subprocess.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_subprocess.py')
-rw-r--r--Lib/test/test_subprocess.py274
1 files changed, 205 insertions, 69 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
index 15fb498..b260c81 100644
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -16,6 +16,7 @@ import warnings
import select
import shutil
import gc
+import textwrap
try:
import resource
@@ -62,6 +63,8 @@ class BaseTestCase(unittest.TestCase):
# shutdown time. That frustrates tests trying to check stderr produced
# from a spawned Python process.
actual = support.strip_python_stderr(stderr)
+ # strip_python_stderr also strips whitespace, so we do too.
+ expected = expected.strip()
self.assertEqual(actual, expected, msg)
@@ -73,6 +76,15 @@ class ProcessTestCase(BaseTestCase):
"import sys; sys.exit(47)"])
self.assertEqual(rc, 47)
+ def test_call_timeout(self):
+ # call() function with timeout argument; we want to test that the child
+ # process gets killed when the timeout expires. If the child isn't
+ # killed, this call will deadlock since subprocess.call waits for the
+ # child.
+ self.assertRaises(subprocess.TimeoutExpired, subprocess.call,
+ [sys.executable, "-c", "while True: pass"],
+ timeout=0.1)
+
def test_check_call_zero(self):
# check_call() function with zero return code
rc = subprocess.check_call([sys.executable, "-c",
@@ -115,6 +127,21 @@ class ProcessTestCase(BaseTestCase):
self.fail("Expected ValueError when stdout arg supplied.")
self.assertIn('stdout', c.exception.args[0])
+ def test_check_output_timeout(self):
+ # check_output() function with timeout arg
+ with self.assertRaises(subprocess.TimeoutExpired) as c:
+ output = subprocess.check_output(
+ [sys.executable, "-c",
+ "import sys, time\n"
+ "sys.stdout.write('BDFL')\n"
+ "sys.stdout.flush()\n"
+ "time.sleep(3600)"],
+ # Some heavily loaded buildbots (sparc Debian 3.x) require
+ # this much time to start and print.
+ timeout=3)
+ self.fail("Expected TimeoutExpired.")
+ self.assertEqual(c.exception.output, b'BDFL')
+
def test_call_kwargs(self):
# call() function with keyword args
newenv = os.environ.copy()
@@ -165,6 +192,40 @@ class ProcessTestCase(BaseTestCase):
p.wait()
self.assertEqual(p.stderr, None)
+ def _assert_python(self, pre_args, **kwargs):
+ # We include sys.exit() to prevent the test runner from hanging
+ # whenever python is found.
+ args = pre_args + ["import sys; sys.exit(47)"]
+ p = subprocess.Popen(args, **kwargs)
+ p.wait()
+ self.assertEqual(47, p.returncode)
+
+ def test_executable(self):
+ # Check that the executable argument works.
+ #
+ # On Unix (non-Mac and non-Windows), Python looks at args[0] to
+ # determine where its standard library is, so we need the directory
+ # of args[0] to be valid for the Popen() call to Python to succeed.
+ # See also issue #16170 and issue #7774.
+ doesnotexist = os.path.join(os.path.dirname(sys.executable),
+ "doesnotexist")
+ self._assert_python([doesnotexist, "-c"], executable=sys.executable)
+
+ def test_executable_takes_precedence(self):
+ # Check that the executable argument takes precedence over args[0].
+ #
+ # Verify first that the call succeeds without the executable arg.
+ pre_args = [sys.executable, "-c"]
+ self._assert_python(pre_args)
+ self.assertRaises(FileNotFoundError, self._assert_python, pre_args,
+ executable="doesnotexist")
+
+ @unittest.skipIf(mswindows, "executable argument replaces shell")
+ def test_executable_replaces_shell(self):
+ # Check that the executable argument replaces the default shell
+ # when shell=True.
+ self._assert_python([], executable=sys.executable, shell=True)
+
# For use in the test_cwd* tests below.
def _normalize_cwd(self, cwd):
# Normalize an expected cwd (for Tru64 support).
@@ -215,9 +276,9 @@ class ProcessTestCase(BaseTestCase):
with support.temp_cwd() as wrong_dir:
# Before calling with the correct cwd, confirm that the call fails
# without cwd and with the wrong cwd.
- self.assertRaises(OSError, subprocess.Popen,
+ self.assertRaises(FileNotFoundError, subprocess.Popen,
[rel_python])
- self.assertRaises(OSError, subprocess.Popen,
+ self.assertRaises(FileNotFoundError, subprocess.Popen,
[rel_python], cwd=wrong_dir)
python_dir = self._normalize_cwd(python_dir)
self._assert_cwd(python_dir, rel_python, cwd=python_dir)
@@ -232,9 +293,9 @@ class ProcessTestCase(BaseTestCase):
with support.temp_cwd() as wrong_dir:
# Before calling with the correct cwd, confirm that the call fails
# without cwd and with the wrong cwd.
- self.assertRaises(OSError, subprocess.Popen,
+ self.assertRaises(FileNotFoundError, subprocess.Popen,
[doesntexist], executable=rel_python)
- self.assertRaises(OSError, subprocess.Popen,
+ self.assertRaises(FileNotFoundError, subprocess.Popen,
[doesntexist], executable=rel_python,
cwd=wrong_dir)
python_dir = self._normalize_cwd(python_dir)
@@ -250,17 +311,21 @@ class ProcessTestCase(BaseTestCase):
with script_helper.temp_dir() as wrong_dir:
# Before calling with an absolute path, confirm that using a
# relative path fails.
- self.assertRaises(OSError, subprocess.Popen,
+ self.assertRaises(FileNotFoundError, subprocess.Popen,
[rel_python], cwd=wrong_dir)
wrong_dir = self._normalize_cwd(wrong_dir)
self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir)
+ @unittest.skipIf(sys.base_prefix != sys.prefix,
+ 'Test is not venv-compatible')
def test_executable_with_cwd(self):
python_dir, python_base = self._split_python_path()
python_dir = self._normalize_cwd(python_dir)
self._assert_cwd(python_dir, "somethingyoudonthave",
executable=sys.executable, cwd=python_dir)
+ @unittest.skipIf(sys.base_prefix != sys.prefix,
+ 'Test is not venv-compatible')
@unittest.skipIf(sysconfig.is_python_build(),
"need an installed Python. See #7774")
def test_executable_without_cwd(self):
@@ -398,6 +463,31 @@ class ProcessTestCase(BaseTestCase):
rc = subprocess.call([sys.executable, "-c", cmd], stdout=1)
self.assertEqual(rc, 2)
+ def test_stdout_devnull(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'for i in range(10240):'
+ 'print("x" * 1024)'],
+ stdout=subprocess.DEVNULL)
+ p.wait()
+ self.assertEqual(p.stdout, None)
+
+ def test_stderr_devnull(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys\n'
+ 'for i in range(10240):'
+ 'sys.stderr.write("x" * 1024)'],
+ stderr=subprocess.DEVNULL)
+ p.wait()
+ self.assertEqual(p.stderr, None)
+
+ def test_stdin_devnull(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys;'
+ 'sys.stdin.read(1)'],
+ stdin=subprocess.DEVNULL)
+ p.wait()
+ self.assertEqual(p.stdin, None)
+
def test_env(self):
newenv = os.environ.copy()
newenv["FRUIT"] = "orange"
@@ -468,6 +558,41 @@ class ProcessTestCase(BaseTestCase):
self.assertEqual(stdout, b"banana")
self.assertStderrEqual(stderr, b"pineapple")
+ def test_communicate_timeout(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os,time;'
+ 'sys.stderr.write("pineapple\\n");'
+ 'time.sleep(1);'
+ 'sys.stderr.write("pear\\n");'
+ 'sys.stdout.write(sys.stdin.read())'],
+ universal_newlines=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana",
+ timeout=0.3)
+ # Make sure we can keep waiting for it, and that we get the whole output
+ # after it completes.
+ (stdout, stderr) = p.communicate()
+ self.assertEqual(stdout, "banana")
+ self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n")
+
+ def test_communicate_timeout_large_ouput(self):
+ # Test an expiring timeout while the child is outputting lots of data.
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os,time;'
+ 'sys.stdout.write("a" * (64 * 1024));'
+ 'time.sleep(0.2);'
+ 'sys.stdout.write("a" * (64 * 1024));'
+ 'time.sleep(0.2);'
+ 'sys.stdout.write("a" * (64 * 1024));'
+ 'time.sleep(0.2);'
+ 'sys.stdout.write("a" * (64 * 1024));'],
+ stdout=subprocess.PIPE)
+ self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
+ (stdout, _) = p.communicate()
+ self.assertEqual(len(stdout), 4 * 64 * 1024)
+
# Test for the fd leak reported in http://bugs.python.org/issue2791.
def test_communicate_pipe_fd_leak(self):
for stdin_pipe in (False, True):
@@ -504,24 +629,21 @@ class ProcessTestCase(BaseTestCase):
# This test will probably deadlock rather than fail, if
# communicate() does not work properly.
x, y = os.pipe()
- if mswindows:
- pipe_buf = 512
- else:
- pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
os.close(x)
os.close(y)
p = subprocess.Popen([sys.executable, "-c",
'import sys,os;'
'sys.stdout.write(sys.stdin.read(47));'
- 'sys.stderr.write("xyz"*%d);'
- 'sys.stdout.write(sys.stdin.read())' % pipe_buf],
+ 'sys.stderr.write("x" * %d);'
+ 'sys.stdout.write(sys.stdin.read())' %
+ support.PIPE_MAX_SIZE],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
self.addCleanup(p.stdin.close)
- string_to_write = b"abc"*pipe_buf
+ string_to_write = b"a" * support.PIPE_MAX_SIZE
(stdout, stderr) = p.communicate(string_to_write)
self.assertEqual(stdout, string_to_write)
@@ -596,8 +718,6 @@ class ProcessTestCase(BaseTestCase):
universal_newlines=1)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
- # BUG: can't give a non-empty stdin because it breaks both the
- # select- and poll-based communicate() implementations.
(stdout, stderr) = p.communicate()
self.assertEqual(stdout,
"line2\nline4\nline5\nline6\nline7\nline8")
@@ -605,12 +725,12 @@ class ProcessTestCase(BaseTestCase):
def test_universal_newlines_communicate_stdin(self):
# universal newlines through communicate(), with only stdin
p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;' + SETBINARY + '''\nif True:
- s = sys.stdin.readline()
- assert s == "line1\\n", repr(s)
- s = sys.stdin.read()
- assert s == "line3\\n", repr(s)
- '''],
+ 'import sys,os;' + SETBINARY + textwrap.dedent('''
+ s = sys.stdin.readline()
+ assert s == "line1\\n", repr(s)
+ s = sys.stdin.read()
+ assert s == "line3\\n", repr(s)
+ ''')],
stdin=subprocess.PIPE,
universal_newlines=1)
(stdout, stderr) = p.communicate("line1\nline3\n")
@@ -628,6 +748,35 @@ class ProcessTestCase(BaseTestCase):
p.communicate()
self.assertEqual(p.returncode, 0)
+ def test_universal_newlines_communicate_stdin_stdout_stderr(self):
+ # universal newlines through communicate(), with stdin, stdout, stderr
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' + SETBINARY + textwrap.dedent('''
+ s = sys.stdin.buffer.readline()
+ sys.stdout.buffer.write(s)
+ sys.stdout.buffer.write(b"line2\\r")
+ sys.stderr.buffer.write(b"eline2\\n")
+ s = sys.stdin.buffer.read()
+ sys.stdout.buffer.write(s)
+ sys.stdout.buffer.write(b"line4\\n")
+ sys.stdout.buffer.write(b"line5\\r\\n")
+ sys.stderr.buffer.write(b"eline6\\r")
+ sys.stderr.buffer.write(b"eline7\\r\\nz")
+ ''')],
+ stdin=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ universal_newlines=True)
+ self.addCleanup(p.stdout.close)
+ self.addCleanup(p.stderr.close)
+ (stdout, stderr) = p.communicate("line1\nline3\n")
+ self.assertEqual(p.returncode, 0)
+ self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout)
+ # Python debug build push something like "[42442 refs]\n"
+ # to stderr at exit of subprocess.
+ # Don't use assertStderrEqual because it strips CR and LF from output.
+ self.assertTrue(stderr.startswith("eline2\neline6\neline7\n"))
+
def test_universal_newlines_communicate_encodings(self):
# Check that universal newlines mode works for various encodings,
# in particular for encodings in the UTF-16 and UTF-32 families.
@@ -657,7 +806,6 @@ class ProcessTestCase(BaseTestCase):
stdout, stderr = popen.communicate(input='')
finally:
locale.getpreferredencoding = old_getpreferredencoding
-
self.assertEqual(stdout, '1\n2\n3\n4')
def test_no_leaking(self):
@@ -717,30 +865,32 @@ class ProcessTestCase(BaseTestCase):
self.assertEqual(subprocess.list2cmdline(['ab', '']),
'ab ""')
-
def test_poll(self):
- p = subprocess.Popen([sys.executable,
- "-c", "import time; time.sleep(1)"])
- count = 0
- while p.poll() is None:
- time.sleep(0.1)
- count += 1
- # We expect that the poll loop probably went around about 10 times,
- # but, based on system scheduling we can't control, it's possible
- # poll() never returned None. It "should be" very rare that it
- # didn't go around at least twice.
- self.assertGreaterEqual(count, 2)
+ p = subprocess.Popen([sys.executable, "-c",
+ "import os; os.read(0, 1)"],
+ stdin=subprocess.PIPE)
+ self.addCleanup(p.stdin.close)
+ self.assertIsNone(p.poll())
+ os.write(p.stdin.fileno(), b'A')
+ p.wait()
# Subsequent invocations should just return the returncode
self.assertEqual(p.poll(), 0)
-
def test_wait(self):
- p = subprocess.Popen([sys.executable,
- "-c", "import time; time.sleep(2)"])
+ p = subprocess.Popen([sys.executable, "-c", "pass"])
self.assertEqual(p.wait(), 0)
# Subsequent invocations should just return the returncode
self.assertEqual(p.wait(), 0)
+ def test_wait_timeout(self):
+ p = subprocess.Popen([sys.executable,
+ "-c", "import time; time.sleep(0.1)"])
+ with self.assertRaises(subprocess.TimeoutExpired) as c:
+ p.wait(timeout=0.01)
+ self.assertIn("0.01", str(c.exception)) # For coverage of __str__.
+ # Some heavily loaded buildbots (sparc Debian 3.x) require this much
+ # time to start.
+ self.assertEqual(p.wait(timeout=3), 0)
def test_invalid_bufsize(self):
# an invalid type of the bufsize argument should raise
@@ -819,25 +969,29 @@ class ProcessTestCase(BaseTestCase):
p = subprocess.Popen([sys.executable, "-c", 'pass'],
stdin=subprocess.PIPE)
self.addCleanup(p.stdin.close)
- time.sleep(2)
+ p.wait()
p.communicate(b"x" * 2**20)
- @unittest.skipUnless(hasattr(signal, 'SIGALRM'),
- "Requires signal.SIGALRM")
+ @unittest.skipUnless(hasattr(signal, 'SIGUSR1'),
+ "Requires signal.SIGUSR1")
+ @unittest.skipUnless(hasattr(os, 'kill'),
+ "Requires os.kill")
+ @unittest.skipUnless(hasattr(os, 'getppid'),
+ "Requires os.getppid")
def test_communicate_eintr(self):
# Issue #12493: communicate() should handle EINTR
def handler(signum, frame):
pass
- old_handler = signal.signal(signal.SIGALRM, handler)
- self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
+ old_handler = signal.signal(signal.SIGUSR1, handler)
+ self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
- # the process is running for 2 seconds
- args = [sys.executable, "-c", 'import time; time.sleep(2)']
+ args = [sys.executable, "-c",
+ 'import os, signal;'
+ 'os.kill(os.getppid(), signal.SIGUSR1)']
for stream in ('stdout', 'stderr'):
kw = {stream: subprocess.PIPE}
with subprocess.Popen(args, **kw) as process:
- signal.alarm(1)
- # communicate() will be interrupted by SIGALRM
+ # communicate() will be interrupted by SIGUSR1
process.communicate()
@@ -1401,6 +1555,11 @@ class POSIXProcessTestCase(BaseTestCase):
exitcode = subprocess.call([abs_program, "-c", "pass"])
self.assertEqual(exitcode, 0)
+ # absolute bytes path as a string
+ cmd = b"'" + abs_program + b"' -c pass"
+ exitcode = subprocess.call(cmd, shell=True)
+ self.assertEqual(exitcode, 0)
+
# bytes program, unicode PATH
env = os.environ.copy()
env["PATH"] = path
@@ -1584,7 +1743,7 @@ class POSIXProcessTestCase(BaseTestCase):
stdout, stderr = p.communicate()
self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
" non-zero with this error:\n%s" %
- stderr.decode('utf8'))
+ stderr.decode('utf-8'))
def test_select_unbuffered(self):
# Issue #11459: bufsize=0 should really set the pipes as
@@ -1831,28 +1990,6 @@ class ProcessTestCaseNoPoll(ProcessTestCase):
ProcessTestCase.tearDown(self)
-@unittest.skipUnless(getattr(subprocess, '_posixsubprocess', False),
- "_posixsubprocess extension module not found.")
-class ProcessTestCasePOSIXPurePython(ProcessTestCase, POSIXProcessTestCase):
- @classmethod
- def setUpClass(cls):
- global subprocess
- assert subprocess._posixsubprocess
- # Reimport subprocess while forcing _posixsubprocess to not exist.
- with support.check_warnings(('.*_posixsubprocess .* not being used.*',
- RuntimeWarning)):
- subprocess = support.import_fresh_module(
- 'subprocess', blocked=['_posixsubprocess'])
- assert not subprocess._posixsubprocess
-
- @classmethod
- def tearDownClass(cls):
- global subprocess
- # Reimport subprocess as it should be, restoring order to the universe.
- subprocess = support.import_fresh_module('subprocess')
- assert subprocess._posixsubprocess
-
-
class HelperFunctionTests(unittest.TestCase):
@unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
def test_eintr_retry_call(self):
@@ -1961,7 +2098,6 @@ def test_main():
unit_tests = (ProcessTestCase,
POSIXProcessTestCase,
Win32ProcessTestCase,
- ProcessTestCasePOSIXPurePython,
CommandTests,
ProcessTestCaseNoPoll,
HelperFunctionTests,