diff options
author | Gregory P. Smith <greg@krypto.org> | 2015-04-14 23:14:25 (GMT) |
---|---|---|
committer | Gregory P. Smith <greg@krypto.org> | 2015-04-14 23:14:25 (GMT) |
commit | 6e73000723640121ce7529ec91a01323bd7b76b5 (patch) | |
tree | c989873ac2aa10fe16570b1b67b7cfea0569d513 /Lib | |
parent | a8723a02ea109beabe2dfe1bbe18958afc5b7af9 (diff) | |
download | cpython-6e73000723640121ce7529ec91a01323bd7b76b5.zip cpython-6e73000723640121ce7529ec91a01323bd7b76b5.tar.gz cpython-6e73000723640121ce7529ec91a01323bd7b76b5.tar.bz2 |
Add a subprocess.run() function than returns a CalledProcess instance for a
more consistent API than the existing call* functions.
(enhancement from issue 23342)
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/subprocess.py | 124 | ||||
-rw-r--r-- | Lib/test/test_subprocess.py | 97 |
2 files changed, 205 insertions, 16 deletions
diff --git a/Lib/subprocess.py b/Lib/subprocess.py index e92928e..b6c4374 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -377,27 +377,51 @@ class CalledProcessError(SubprocessError): The exit status will be stored in the returncode attribute; check_output() will also store the output in the output attribute. """ - def __init__(self, returncode, cmd, output=None): + def __init__(self, returncode, cmd, output=None, stderr=None): self.returncode = returncode self.cmd = cmd self.output = output + self.stderr = stderr + def __str__(self): return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode) + @property + def stdout(self): + """Alias for output attribute, to match stderr""" + return self.output + + @stdout.setter + def stdout(self, value): + # There's no obvious reason to set this, but allow it anyway so + # .stdout is a transparent alias for .output + self.output = value + class TimeoutExpired(SubprocessError): """This exception is raised when the timeout expires while waiting for a child process. """ - def __init__(self, cmd, timeout, output=None): + def __init__(self, cmd, timeout, output=None, stderr=None): self.cmd = cmd self.timeout = timeout self.output = output + self.stderr = stderr def __str__(self): return ("Command '%s' timed out after %s seconds" % (self.cmd, self.timeout)) + @property + def stdout(self): + return self.output + + @stdout.setter + def stdout(self, value): + # There's no obvious reason to set this, but allow it anyway so + # .stdout is a transparent alias for .output + self.output = value + if _mswindows: import threading @@ -433,8 +457,8 @@ else: __all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput", - "getoutput", "check_output", "CalledProcessError", "DEVNULL", - "SubprocessError", "TimeoutExpired"] + "getoutput", "check_output", "run", "CalledProcessError", "DEVNULL", + "SubprocessError", "TimeoutExpired", "CompletedProcess"] # NOTE: We intentionally exclude list2cmdline as it is # considered an internal implementation detail. issue10838. @@ -595,29 +619,97 @@ def check_output(*popenargs, timeout=None, **kwargs): """ if 'stdout' in kwargs: raise ValueError('stdout argument not allowed, it will be overridden.') - if 'input' in kwargs: + + if 'input' in kwargs and kwargs['input'] is None: + # Explicitly passing input=None was previously equivalent to passing an + # empty string. That is maintained here for backwards compatibility. + kwargs['input'] = '' if kwargs.get('universal_newlines', False) else b'' + + return run(*popenargs, stdout=PIPE, timeout=timeout, check=True, + **kwargs).stdout + + +class CompletedProcess(object): + """A process that has finished running. + + This is returned by run(). + + Attributes: + args: The list or str args passed to run(). + returncode: The exit code of the process, negative for signals. + stdout: The standard output (None if not captured). + stderr: The standard error (None if not captured). + """ + def __init__(self, args, returncode, stdout=None, stderr=None): + self.args = args + self.returncode = returncode + self.stdout = stdout + self.stderr = stderr + + def __repr__(self): + args = ['args={!r}'.format(self.args), + 'returncode={!r}'.format(self.returncode)] + if self.stdout is not None: + args.append('stdout={!r}'.format(self.stdout)) + if self.stderr is not None: + args.append('stderr={!r}'.format(self.stderr)) + return "{}({})".format(type(self).__name__, ', '.join(args)) + + def check_returncode(self): + """Raise CalledProcessError if the exit code is non-zero.""" + if self.returncode: + raise CalledProcessError(self.returncode, self.args, self.stdout, + self.stderr) + + +def run(*popenargs, input=None, timeout=None, check=False, **kwargs): + """Run command with arguments and return a CompletedProcess instance. + + The returned instance will have attributes args, returncode, stdout and + stderr. By default, stdout and stderr are not captured, and those attributes + will be None. Pass stdout=PIPE and/or stderr=PIPE in order to capture them. + + If check is True and the exit code was non-zero, it raises a + CalledProcessError. The CalledProcessError object will have the return code + in the returncode attribute, and output & stderr attributes if those streams + were captured. + + If timeout is given, and the process takes too long, a TimeoutExpired + exception will be raised. + + There is an optional argument "input", allowing you to + pass a string to the subprocess's stdin. If you use this argument + you may not also use the Popen constructor's "stdin" argument, as + it will be used internally. + + The other arguments are the same as for the Popen constructor. + + If universal_newlines=True is passed, the "input" argument must be a + string and stdout/stderr in the returned object will be strings rather than + bytes. + """ + if input is not None: if 'stdin' in kwargs: raise ValueError('stdin and input arguments may not both be used.') - inputdata = kwargs['input'] - del kwargs['input'] kwargs['stdin'] = PIPE - else: - inputdata = None - with Popen(*popenargs, stdout=PIPE, **kwargs) as process: + + with Popen(*popenargs, **kwargs) as process: try: - output, unused_err = process.communicate(inputdata, timeout=timeout) + stdout, stderr = process.communicate(input, timeout=timeout) except TimeoutExpired: process.kill() - output, unused_err = process.communicate() - raise TimeoutExpired(process.args, timeout, output=output) + stdout, stderr = process.communicate() + raise TimeoutExpired(process.args, timeout, output=stdout, + stderr=stderr) except: process.kill() process.wait() raise retcode = process.poll() - if retcode: - raise CalledProcessError(retcode, process.args, output=output) - return output + if check and retcode: + raise CalledProcessError(retcode, process.args, + output=stdout, stderr=stderr) + return CompletedProcess(process.args, retcode, stdout, stderr) def list2cmdline(seq): diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 7b66945..7398bdc 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -1232,6 +1232,102 @@ class ProcessTestCase(BaseTestCase): fds_after_exception = os.listdir(fd_directory) self.assertEqual(fds_before_popen, fds_after_exception) + +class RunFuncTestCase(BaseTestCase): + def run_python(self, code, **kwargs): + """Run Python code in a subprocess using subprocess.run""" + argv = [sys.executable, "-c", code] + return subprocess.run(argv, **kwargs) + + def test_returncode(self): + # call() function with sequence argument + cp = self.run_python("import sys; sys.exit(47)") + self.assertEqual(cp.returncode, 47) + with self.assertRaises(subprocess.CalledProcessError): + cp.check_returncode() + + def test_check(self): + with self.assertRaises(subprocess.CalledProcessError) as c: + self.run_python("import sys; sys.exit(47)", check=True) + self.assertEqual(c.exception.returncode, 47) + + def test_check_zero(self): + # check_returncode shouldn't raise when returncode is zero + cp = self.run_python("import sys; sys.exit(0)", check=True) + self.assertEqual(cp.returncode, 0) + + def test_timeout(self): + # run() function with timeout argument; we want to test that the child + # process gets killed when the timeout expires. If the child isn't + # killed, this call will deadlock since subprocess.run waits for the + # child. + with self.assertRaises(subprocess.TimeoutExpired): + self.run_python("while True: pass", timeout=0.0001) + + def test_capture_stdout(self): + # capture stdout with zero return code + cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE) + self.assertIn(b'BDFL', cp.stdout) + + def test_capture_stderr(self): + cp = self.run_python("import sys; sys.stderr.write('BDFL')", + stderr=subprocess.PIPE) + self.assertIn(b'BDFL', cp.stderr) + + def test_check_output_stdin_arg(self): + # run() can be called with stdin set to a file + tf = tempfile.TemporaryFile() + self.addCleanup(tf.close) + tf.write(b'pear') + tf.seek(0) + cp = self.run_python( + "import sys; sys.stdout.write(sys.stdin.read().upper())", + stdin=tf, stdout=subprocess.PIPE) + self.assertIn(b'PEAR', cp.stdout) + + def test_check_output_input_arg(self): + # check_output() can be called with input set to a string + cp = self.run_python( + "import sys; sys.stdout.write(sys.stdin.read().upper())", + input=b'pear', stdout=subprocess.PIPE) + self.assertIn(b'PEAR', cp.stdout) + + def test_check_output_stdin_with_input_arg(self): + # run() refuses to accept 'stdin' with 'input' + tf = tempfile.TemporaryFile() + self.addCleanup(tf.close) + tf.write(b'pear') + tf.seek(0) + with self.assertRaises(ValueError, + msg="Expected ValueError when stdin and input args supplied.") as c: + output = self.run_python("print('will not be run')", + stdin=tf, input=b'hare') + self.assertIn('stdin', c.exception.args[0]) + self.assertIn('input', c.exception.args[0]) + + def test_check_output_timeout(self): + with self.assertRaises(subprocess.TimeoutExpired) as c: + cp = self.run_python(( + "import sys, time\n" + "sys.stdout.write('BDFL')\n" + "sys.stdout.flush()\n" + "time.sleep(3600)"), + # Some heavily loaded buildbots (sparc Debian 3.x) require + # this much time to start and print. + timeout=3, stdout=subprocess.PIPE) + self.assertEqual(c.exception.output, b'BDFL') + # output is aliased to stdout + self.assertEqual(c.exception.stdout, b'BDFL') + + def test_run_kwargs(self): + newenv = os.environ.copy() + newenv["FRUIT"] = "banana" + cp = self.run_python(('import sys, os;' + 'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'), + env=newenv) + self.assertEqual(cp.returncode, 33) + + @unittest.skipIf(mswindows, "POSIX specific tests") class POSIXProcessTestCase(BaseTestCase): @@ -2542,6 +2638,7 @@ def test_main(): ProcessTestCaseNoPoll, CommandsWithSpaces, ContextManagerTests, + RunFuncTestCase, ) support.run_unittest(*unit_tests) |