diff options
Diffstat (limited to 'Lib/test/test_subprocess.py')
-rw-r--r-- | Lib/test/test_subprocess.py | 220 |
1 files changed, 132 insertions, 88 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 758e094..73da195 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -1,20 +1,16 @@ import unittest from unittest import mock -from test.support import script_helper from test import support import subprocess import sys import signal import io -import locale import os import errno import tempfile import time -import re import selectors import sysconfig -import warnings import select import shutil import gc @@ -25,6 +21,9 @@ try: except ImportError: threading = None +if support.PGO: + raise unittest.SkipTest("test is not helpful for PGO") + mswindows = (sys.platform == "win32") # @@ -294,7 +293,8 @@ class ProcessTestCase(BaseTestCase): # Verify first that the call succeeds without the executable arg. pre_args = [sys.executable, "-c"] self._assert_python(pre_args) - self.assertRaises(FileNotFoundError, self._assert_python, pre_args, + self.assertRaises((FileNotFoundError, PermissionError), + self._assert_python, pre_args, executable="doesnotexist") @unittest.skipIf(mswindows, "executable argument replaces shell") @@ -448,8 +448,8 @@ class ProcessTestCase(BaseTestCase): p = subprocess.Popen([sys.executable, "-c", 'import sys; sys.stdout.write("orange")'], stdout=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read(), b"orange") + with p: + self.assertEqual(p.stdout.read(), b"orange") def test_stdout_filedes(self): # stdout is set to open file descriptor @@ -479,8 +479,8 @@ class ProcessTestCase(BaseTestCase): p = subprocess.Popen([sys.executable, "-c", 'import sys; sys.stderr.write("strawberry")'], stderr=subprocess.PIPE) - self.addCleanup(p.stderr.close) - self.assertStderrEqual(p.stderr.read(), b"strawberry") + with p: + self.assertStderrEqual(p.stderr.read(), b"strawberry") def test_stderr_filedes(self): # stderr is set to open file descriptor @@ -535,8 +535,8 @@ class ProcessTestCase(BaseTestCase): 'sys.stderr.write("orange")'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - self.addCleanup(p.stdout.close) - self.assertStderrEqual(p.stdout.read(), b"appleorange") + with p: + self.assertStderrEqual(p.stdout.read(), b"appleorange") def test_stdout_stderr_file(self): # capture stdout and stderr to the same open file @@ -794,18 +794,19 @@ class ProcessTestCase(BaseTestCase): stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=1) - p.stdin.write("line1\n") - p.stdin.flush() - self.assertEqual(p.stdout.readline(), "line1\n") - p.stdin.write("line3\n") - p.stdin.close() - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.readline(), - "line2\n") - self.assertEqual(p.stdout.read(6), - "line3\n") - self.assertEqual(p.stdout.read(), - "line4\nline5\nline6\nline7\nline8") + with p: + p.stdin.write("line1\n") + p.stdin.flush() + self.assertEqual(p.stdout.readline(), "line1\n") + p.stdin.write("line3\n") + p.stdin.close() + self.addCleanup(p.stdout.close) + self.assertEqual(p.stdout.readline(), + "line2\n") + self.assertEqual(p.stdout.read(6), + "line3\n") + self.assertEqual(p.stdout.read(), + "line4\nline5\nline6\nline7\nline8") def test_universal_newlines_communicate(self): # universal newlines through communicate() @@ -894,31 +895,42 @@ class ProcessTestCase(BaseTestCase): # # UTF-16 and UTF-32-BE are sufficient to check both with BOM and # without, and UTF-16 and UTF-32. - import _bootlocale for encoding in ['utf-16', 'utf-32-be']: - old_getpreferredencoding = _bootlocale.getpreferredencoding - # Indirectly via io.TextIOWrapper, Popen() defaults to - # locale.getpreferredencoding(False) and earlier in Python 3.2 to - # locale.getpreferredencoding(). - def getpreferredencoding(do_setlocale=True): - return encoding code = ("import sys; " r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" % encoding) args = [sys.executable, '-c', code] - try: - _bootlocale.getpreferredencoding = getpreferredencoding - # We set stdin to be non-None because, as of this writing, - # a different code path is used when the number of pipes is - # zero or one. - popen = subprocess.Popen(args, universal_newlines=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE) - stdout, stderr = popen.communicate(input='') - finally: - _bootlocale.getpreferredencoding = old_getpreferredencoding + # We set stdin to be non-None because, as of this writing, + # a different code path is used when the number of pipes is + # zero or one. + popen = subprocess.Popen(args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + encoding=encoding) + stdout, stderr = popen.communicate(input='') self.assertEqual(stdout, '1\n2\n3\n4') + def test_communicate_errors(self): + for errors, expected in [ + ('ignore', ''), + ('replace', '\ufffd\ufffd'), + ('surrogateescape', '\udc80\udc80'), + ('backslashreplace', '\\x80\\x80'), + ]: + code = ("import sys; " + r"sys.stdout.buffer.write(b'[\x80\x80]')") + args = [sys.executable, '-c', code] + # We set stdin to be non-None because, as of this writing, + # a different code path is used when the number of pipes is + # zero or one. + popen = subprocess.Popen(args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + encoding='utf-8', + errors=errors) + stdout, stderr = popen.communicate(input='') + self.assertEqual(stdout, '[{}]'.format(expected)) + def test_no_leaking(self): # Make sure we leak no resources if not mswindows: @@ -1431,6 +1443,27 @@ class POSIXProcessTestCase(BaseTestCase): p.wait() self.assertEqual(-p.returncode, signal.SIGABRT) + def test_CalledProcessError_str_signal(self): + err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd") + error_string = str(err) + # We're relying on the repr() of the signal.Signals intenum to provide + # the word signal, the signal name and the numeric value. + self.assertIn("signal", error_string.lower()) + # We're not being specific about the signal name as some signals have + # multiple names and which name is revealed can vary. + self.assertIn("SIG", error_string) + self.assertIn(str(signal.SIGABRT), error_string) + + def test_CalledProcessError_str_unknown_signal(self): + err = subprocess.CalledProcessError(-9876543, "fake cmd") + error_string = str(err) + self.assertIn("unknown signal 9876543.", error_string) + + def test_CalledProcessError_str_non_zero(self): + err = subprocess.CalledProcessError(2, "fake cmd") + error_string = str(err) + self.assertIn("non-zero exit status 2.", error_string) + def test_preexec(self): # DISCLAIMER: Setting environment variables is *not* a good use # of a preexec_fn. This is merely a test. @@ -1439,8 +1472,8 @@ class POSIXProcessTestCase(BaseTestCase): 'sys.stdout.write(os.getenv("FRUIT"))'], stdout=subprocess.PIPE, preexec_fn=lambda: os.putenv("FRUIT", "apple")) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read(), b"apple") + with p: + self.assertEqual(p.stdout.read(), b"apple") def test_preexec_exception(self): def raise_it(): @@ -1561,7 +1594,7 @@ class POSIXProcessTestCase(BaseTestCase): fd, fname = tempfile.mkstemp() # reopen in text mode with open(fd, "w", errors="surrogateescape") as fobj: - fobj.write("#!/bin/sh\n") + fobj.write("#!%s\n" % support.unix_shell) fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % sys.executable) os.chmod(fname, 0o700) @@ -1588,8 +1621,8 @@ class POSIXProcessTestCase(BaseTestCase): p = subprocess.Popen(["echo $FRUIT"], shell=1, stdout=subprocess.PIPE, env=newenv) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") + with p: + self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") def test_shell_string(self): # Run command through the shell (string) @@ -1598,15 +1631,15 @@ class POSIXProcessTestCase(BaseTestCase): p = subprocess.Popen("echo $FRUIT", shell=1, stdout=subprocess.PIPE, env=newenv) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") + with p: + self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") def test_call_string(self): # call() function with string argument on UNIX fd, fname = tempfile.mkstemp() # reopen in text mode with open(fd, "w", errors="surrogateescape") as fobj: - fobj.write("#!/bin/sh\n") + fobj.write("#!%s\n" % support.unix_shell) fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % sys.executable) os.chmod(fname, 0o700) @@ -1631,8 +1664,8 @@ class POSIXProcessTestCase(BaseTestCase): for sh in shells: p = subprocess.Popen("echo $0", executable=sh, shell=True, stdout=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii')) + with p: + self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii')) def _kill_process(self, method, *args): # Do not inherit file handles from the parent. @@ -2290,7 +2323,9 @@ class POSIXProcessTestCase(BaseTestCase): self.addCleanup(p.stderr.close) ident = id(p) pid = p.pid - del p + with support.check_warnings(('', ResourceWarning)): + p = None + # check that p is in the active processes list self.assertIn(ident, [id(o) for o in subprocess._active]) @@ -2309,7 +2344,9 @@ class POSIXProcessTestCase(BaseTestCase): self.addCleanup(p.stderr.close) ident = id(p) pid = p.pid - del p + with support.check_warnings(('', ResourceWarning)): + p = None + os.kill(pid, signal.SIGKILL) # check that p is in the active processes list self.assertIn(ident, [id(o) for o in subprocess._active]) @@ -2501,8 +2538,8 @@ class Win32ProcessTestCase(BaseTestCase): p = subprocess.Popen(["set"], shell=1, stdout=subprocess.PIPE, env=newenv) - self.addCleanup(p.stdout.close) - self.assertIn(b"physalis", p.stdout.read()) + with p: + self.assertIn(b"physalis", p.stdout.read()) def test_shell_string(self): # Run command through the shell (string) @@ -2511,8 +2548,20 @@ class Win32ProcessTestCase(BaseTestCase): p = subprocess.Popen("set", shell=1, stdout=subprocess.PIPE, env=newenv) - self.addCleanup(p.stdout.close) - self.assertIn(b"physalis", p.stdout.read()) + with p: + self.assertIn(b"physalis", p.stdout.read()) + + def test_shell_encodings(self): + # Run command through the shell (string) + for enc in ['ansi', 'oem']: + newenv = os.environ.copy() + newenv["FRUIT"] = "physalis" + p = subprocess.Popen("set", shell=1, + stdout=subprocess.PIPE, + env=newenv, + encoding=enc) + with p: + self.assertIn("physalis", p.stdout.read(), enc) def test_call_string(self): # call() function with string argument on Windows @@ -2531,16 +2580,14 @@ class Win32ProcessTestCase(BaseTestCase): stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - self.addCleanup(p.stdin.close) - # Wait for the interpreter to be completely initialized before - # sending any signal. - p.stdout.read(1) - getattr(p, method)(*args) - _, stderr = p.communicate() - self.assertStderrEqual(stderr, b'') - returncode = p.wait() + with p: + # Wait for the interpreter to be completely initialized before + # sending any signal. + p.stdout.read(1) + getattr(p, method)(*args) + _, stderr = p.communicate() + self.assertStderrEqual(stderr, b'') + returncode = p.wait() self.assertNotEqual(returncode, 0) def _kill_dead_process(self, method, *args): @@ -2553,19 +2600,17 @@ class Win32ProcessTestCase(BaseTestCase): stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - self.addCleanup(p.stdin.close) - # Wait for the interpreter to be completely initialized before - # sending any signal. - p.stdout.read(1) - # The process should end after this - time.sleep(1) - # This shouldn't raise even though the child is now dead - getattr(p, method)(*args) - _, stderr = p.communicate() - self.assertStderrEqual(stderr, b'') - rc = p.wait() + with p: + # Wait for the interpreter to be completely initialized before + # sending any signal. + p.stdout.read(1) + # The process should end after this + time.sleep(1) + # This shouldn't raise even though the child is now dead + getattr(p, method)(*args) + _, stderr = p.communicate() + self.assertStderrEqual(stderr, b'') + rc = p.wait() self.assertEqual(rc, 42) def test_send_signal(self): @@ -2608,8 +2653,7 @@ class MiscTests(unittest.TestCase): def test__all__(self): """Ensure that __all__ is populated properly.""" - # STARTUPINFO added to __all__ in 3.6 - intentionally_excluded = {"list2cmdline", "STARTUPINFO", "Handle"} + intentionally_excluded = {"list2cmdline", "Handle"} exported = set(subprocess.__all__) possible_exports = set() import types @@ -2654,11 +2698,11 @@ class CommandsWithSpaces (BaseTestCase): def with_spaces(self, *args, **kwargs): kwargs['stdout'] = subprocess.PIPE p = subprocess.Popen(*args, **kwargs) - self.addCleanup(p.stdout.close) - self.assertEqual( - p.stdout.read ().decode("mbcs"), - "2 [%r, 'ab cd']" % self.fname - ) + with p: + self.assertEqual( + p.stdout.read ().decode("mbcs"), + "2 [%r, 'ab cd']" % self.fname + ) def test_shell_string_with_spaces(self): # call() function with string argument with spaces on Windows @@ -2710,7 +2754,7 @@ class ContextManagerTests(BaseTestCase): self.assertEqual(proc.returncode, 1) def test_invalid_args(self): - with self.assertRaises(FileNotFoundError) as c: + with self.assertRaises((FileNotFoundError, PermissionError)) as c: with subprocess.Popen(['nonexisting_i_hope'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc: |