summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_subprocess.py
diff options
context:
space:
mode:
authorFredrik Lundh <fredrik@pythonware.com>2004-10-12 15:26:28 (GMT)
committerFredrik Lundh <fredrik@pythonware.com>2004-10-12 15:26:28 (GMT)
commit5b3687df2e6dce2c09662ec9287e8f23075c4f1d (patch)
tree5bd9cee1abd6c618abac496c4a72eba05920c297 /Lib/test/test_subprocess.py
parentabf8a56e68a38f5bcff1fa1aa3742ff35854dd45 (diff)
downloadcpython-5b3687df2e6dce2c09662ec9287e8f23075c4f1d.zip
cpython-5b3687df2e6dce2c09662ec9287e8f23075c4f1d.tar.gz
cpython-5b3687df2e6dce2c09662ec9287e8f23075c4f1d.tar.bz2
Added Peter Astrand's subprocess module.
Diffstat (limited to 'Lib/test/test_subprocess.py')
-rw-r--r--Lib/test/test_subprocess.py514
1 files changed, 514 insertions, 0 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
new file mode 100644
index 0000000..348da57
--- /dev/null
+++ b/Lib/test/test_subprocess.py
@@ -0,0 +1,514 @@
+import unittest
+from test import test_support
+import subprocess
+import sys
+import signal
+import os
+import tempfile
+import time
+
+mswindows = (sys.platform == "win32")
+
+#
+# Depends on the following external programs: Python
+#
+
+if mswindows:
+ SETBINARY = 'import msvcrt; msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY);'
+else:
+ SETBINARY = ''
+
+class ProcessTestCase(unittest.TestCase):
+ def mkstemp(self):
+ """wrapper for mkstemp, calling mktemp if mkstemp is not available"""
+ if hasattr(tempfile, "mkstemp"):
+ return tempfile.mkstemp()
+ else:
+ fname = tempfile.mktemp()
+ return os.open(fname, os.O_RDWR|os.O_CREAT), fname
+
+ #
+ # Generic tests
+ #
+ def test_call_seq(self):
+ """call() function with sequence argument"""
+ rc = subprocess.call([sys.executable, "-c", "import sys; sys.exit(47)"])
+ self.assertEqual(rc, 47)
+
+ def test_call_kwargs(self):
+ """call() function with keyword args"""
+ newenv = os.environ.copy()
+ newenv["FRUIT"] = "banana"
+ rc = subprocess.call([sys.executable, "-c",
+ 'import sys, os;' \
+ 'sys.exit(os.getenv("FRUIT")=="banana")'],
+ env=newenv)
+ self.assertEqual(rc, 1)
+
+ def test_stdin_none(self):
+ """.stdin is None when not redirected"""
+ p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ p.wait()
+ self.assertEqual(p.stdin, None)
+
+ def test_stdout_none(self):
+ """.stdout is None when not redirected"""
+ p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
+ stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+ p.wait()
+ self.assertEqual(p.stdout, None)
+
+ def test_stderr_none(self):
+ """.stderr is None when not redirected"""
+ p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ p.wait()
+ self.assertEqual(p.stderr, None)
+
+ def test_executable(self):
+ """executable"""
+ p = subprocess.Popen(["somethingyoudonthave", "-c", "import sys; sys.exit(47)"],
+ executable=sys.executable)
+ p.wait()
+ self.assertEqual(p.returncode, 47)
+
+ def test_stdin_pipe(self):
+ """stdin redirection"""
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.exit(sys.stdin.read() == "pear")'],
+ stdin=subprocess.PIPE)
+ p.stdin.write("pear")
+ p.stdin.close()
+ p.wait()
+ self.assertEqual(p.returncode, 1)
+
+ def test_stdin_filedes(self):
+ """stdin is set to open file descriptor"""
+ tf = tempfile.TemporaryFile()
+ d = tf.fileno()
+ os.write(d, "pear")
+ os.lseek(d, 0, 0)
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.exit(sys.stdin.read() == "pear")'],
+ stdin=d)
+ p.wait()
+ self.assertEqual(p.returncode, 1)
+
+ def test_stdin_fileobj(self):
+ """stdin is set to open file object"""
+ tf = tempfile.TemporaryFile()
+ tf.write("pear")
+ tf.seek(0)
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.exit(sys.stdin.read() == "pear")'],
+ stdin=tf)
+ p.wait()
+ self.assertEqual(p.returncode, 1)
+
+ def test_stdout_pipe(self):
+ """stdout redirection"""
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.stdout.write("orange")'],
+ stdout=subprocess.PIPE)
+ self.assertEqual(p.stdout.read(), "orange")
+
+ def test_stdout_filedes(self):
+ """stdout is set to open file descriptor"""
+ tf = tempfile.TemporaryFile()
+ d = tf.fileno()
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.stdout.write("orange")'],
+ stdout=d)
+ p.wait()
+ os.lseek(d, 0, 0)
+ self.assertEqual(os.read(d, 1024), "orange")
+
+ def test_stdout_fileobj(self):
+ """stdout is set to open file object"""
+ tf = tempfile.TemporaryFile()
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.stdout.write("orange")'],
+ stdout=tf)
+ p.wait()
+ tf.seek(0)
+ self.assertEqual(tf.read(), "orange")
+
+ def test_stderr_pipe(self):
+ """stderr redirection"""
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.stderr.write("strawberry")'],
+ stderr=subprocess.PIPE)
+ self.assertEqual(p.stderr.read(), "strawberry")
+
+ def test_stderr_filedes(self):
+ """stderr is set to open file descriptor"""
+ tf = tempfile.TemporaryFile()
+ d = tf.fileno()
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.stderr.write("strawberry")'],
+ stderr=d)
+ p.wait()
+ os.lseek(d, 0, 0)
+ self.assertEqual(os.read(d, 1024), "strawberry")
+
+ def test_stderr_fileobj(self):
+ """stderr is set to open file object"""
+ tf = tempfile.TemporaryFile()
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.stderr.write("strawberry")'],
+ stderr=tf)
+ p.wait()
+ tf.seek(0)
+ self.assertEqual(tf.read(), "strawberry")
+
+ def test_stdout_stderr_pipe(self):
+ """capture stdout and stderr to the same pipe"""
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys;' \
+ 'sys.stdout.write("apple");' \
+ 'sys.stdout.flush();' \
+ 'sys.stderr.write("orange")'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ self.assertEqual(p.stdout.read(), "appleorange")
+
+ def test_stdout_stderr_file(self):
+ """capture stdout and stderr to the same open file"""
+ tf = tempfile.TemporaryFile()
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys;' \
+ 'sys.stdout.write("apple");' \
+ 'sys.stdout.flush();' \
+ 'sys.stderr.write("orange")'],
+ stdout=tf,
+ stderr=tf)
+ p.wait()
+ tf.seek(0)
+ self.assertEqual(tf.read(), "appleorange")
+
+ def test_cwd(self):
+ """cwd"""
+ tmpdir = os.getenv("TEMP", "/tmp")
+ tmpdir = os.path.realpath(tmpdir)
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' \
+ 'sys.stdout.write(os.getcwd())'],
+ stdout=subprocess.PIPE,
+ cwd=tmpdir)
+ self.assertEqual(p.stdout.read(), tmpdir)
+
+ def test_env(self):
+ """env"""
+ newenv = os.environ.copy()
+ newenv["FRUIT"] = "orange"
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' \
+ 'sys.stdout.write(os.getenv("FRUIT"))'],
+ stdout=subprocess.PIPE,
+ env=newenv)
+ self.assertEqual(p.stdout.read(), "orange")
+
+ def test_communicate(self):
+ """communicate()"""
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' \
+ 'sys.stderr.write("pineapple");' \
+ 'sys.stdout.write(sys.stdin.read())'],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ (stdout, stderr) = p.communicate("banana")
+ self.assertEqual(stdout, "banana")
+ self.assertEqual(stderr, "pineapple")
+
+ def test_communicate_returns(self):
+ """communicate() should return None if no redirection is active"""
+ p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(47)"])
+ (stdout, stderr) = p.communicate()
+ self.assertEqual(stdout, None)
+ self.assertEqual(stderr, None)
+
+ def test_communicate_pipe_buf(self):
+ """communicate() with writes larger than pipe_buf"""
+ # This test will probably deadlock rather than fail, if
+ # communicate() does not work properly.
+ x, y = os.pipe()
+ if mswindows:
+ pipe_buf = 512
+ else:
+ pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
+ os.close(x)
+ os.close(y)
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;'
+ 'sys.stdout.write(sys.stdin.read(47));' \
+ 'sys.stderr.write("xyz"*%d);' \
+ 'sys.stdout.write(sys.stdin.read())' % pipe_buf],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ string_to_write = "abc"*pipe_buf
+ (stdout, stderr) = p.communicate(string_to_write)
+ self.assertEqual(stdout, string_to_write)
+
+ def test_writes_before_communicate(self):
+ """stdin.write before communicate()"""
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' \
+ 'sys.stdout.write(sys.stdin.read())'],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ p.stdin.write("banana")
+ (stdout, stderr) = p.communicate("split")
+ self.assertEqual(stdout, "bananasplit")
+ self.assertEqual(stderr, "")
+
+ def test_universal_newlines(self):
+ """universal newlines"""
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' + SETBINARY + \
+ 'sys.stdout.write("line1\\n");' \
+ 'sys.stdout.flush();' \
+ 'sys.stdout.write("line2\\r");' \
+ 'sys.stdout.flush();' \
+ 'sys.stdout.write("line3\\r\\n");' \
+ 'sys.stdout.flush();' \
+ 'sys.stdout.write("line4\\r");' \
+ 'sys.stdout.flush();' \
+ 'sys.stdout.write("\\nline5");'
+ 'sys.stdout.flush();' \
+ 'sys.stdout.write("\\nline6");'],
+ stdout=subprocess.PIPE,
+ universal_newlines=1)
+ stdout = p.stdout.read()
+ if hasattr(open, 'newlines'):
+ # Interpreter with universal newline support
+ self.assertEqual(stdout, "line1\nline2\nline3\nline4\nline5\nline6")
+ else:
+ # Interpreter without universal newline support
+ self.assertEqual(stdout, "line1\nline2\rline3\r\nline4\r\nline5\nline6")
+
+ def test_universal_newlines_communicate(self):
+ """universal newlines through communicate()"""
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' + SETBINARY + \
+ 'sys.stdout.write("line1\\n");' \
+ 'sys.stdout.flush();' \
+ 'sys.stdout.write("line2\\r");' \
+ 'sys.stdout.flush();' \
+ 'sys.stdout.write("line3\\r\\n");' \
+ 'sys.stdout.flush();' \
+ 'sys.stdout.write("line4\\r");' \
+ 'sys.stdout.flush();' \
+ 'sys.stdout.write("\\nline5");'
+ 'sys.stdout.flush();' \
+ 'sys.stdout.write("\\nline6");'],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=1)
+ (stdout, stderr) = p.communicate()
+ if hasattr(open, 'newlines'):
+ # Interpreter with universal newline support
+ self.assertEqual(stdout, "line1\nline2\nline3\nline4\nline5\nline6")
+ else:
+ # Interpreter without universal newline support
+ self.assertEqual(stdout, "line1\nline2\rline3\r\nline4\r\nline5\nline6")
+
+ def test_no_leaking(self):
+ """Make sure we leak no resources"""
+ for i in range(1026):
+ p = subprocess.Popen([sys.executable, "-c", "import sys;sys.stdout.write(sys.stdin.read())"],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ data = p.communicate("lime")[0]
+ self.assertEqual(data, "lime")
+
+
+ def test_list2cmdline(self):
+ """list2cmdline"""
+
+ self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
+ '"a b c" d e')
+ self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
+ 'ab\\"c \\ d')
+ self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
+ 'a\\\\\\b "de fg" h')
+ self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
+ 'a\\\\\\"b c d')
+ self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
+ '"a\\\\b c" d e')
+ self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
+ '"a\\\\b\\ c" d e')
+
+
+ def test_poll(self):
+ """poll"""
+ p = subprocess.Popen([sys.executable,
+ "-c", "import time; time.sleep(4)"])
+ while p.poll() == None:
+ sys.stdout.write(".")
+ sys.stdout.flush()
+ time.sleep(0.5)
+ # Subsequent invocations should just return the returncode
+ self.assertEqual(p.poll(), 0)
+
+
+ def test_wait(self):
+ """wait"""
+ p = subprocess.Popen([sys.executable,
+ "-c", "import time; time.sleep(2)"])
+ self.assertEqual(p.wait(), 0)
+ # Subsequent invocations should just return the returncode
+ self.assertEqual(p.wait(), 0)
+
+ #
+ # POSIX tests
+ #
+ if not mswindows:
+ def test_exceptions(self):
+ """catched & re-raised exceptions"""
+ try:
+ p = subprocess.Popen([sys.executable, "-c", ""],
+ cwd="/this/path/does/not/exist")
+ except OSError, e:
+ # The attribute child_traceback should contain "os.chdir"
+ # somewhere.
+ self.assertNotEqual(e.child_traceback.find("os.chdir"), -1)
+ else:
+ self.fail("Expected OSError")
+
+ def test_run_abort(self):
+ """returncode handles signal termination"""
+ p = subprocess.Popen([sys.executable, "-c", "import os; os.abort()"])
+ p.wait()
+ self.assertEqual(-p.returncode, signal.SIGABRT)
+
+ def test_preexec(self):
+ """preexec function"""
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' \
+ 'sys.stdout.write(os.getenv("FRUIT"))'],
+ stdout=subprocess.PIPE,
+ preexec_fn=lambda: os.putenv("FRUIT", "apple"))
+ self.assertEqual(p.stdout.read(), "apple")
+
+ def test_close_fds(self):
+ """close_fds"""
+ # Make sure we have some fds open
+ os.pipe()
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' \
+ 'sys.stdout.write(str(os.dup(0)))'],
+ stdout=subprocess.PIPE, close_fds=1)
+ # When all fds are closed, the next free fd should be 3.
+ self.assertEqual(p.stdout.read(), "3")
+
+ def test_args_string(self):
+ """args is a string"""
+ f, fname = self.mkstemp()
+ os.write(f, "#!/bin/sh\n")
+ os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" % sys.executable)
+ os.close(f)
+ os.chmod(fname, 0700)
+ p = subprocess.Popen(fname)
+ p.wait()
+ self.assertEqual(p.returncode, 47)
+ os.remove(fname)
+
+ def test_invalid_args(self):
+ """invalid arguments should raise ValueError"""
+ self.assertRaises(ValueError, subprocess.call,
+ [sys.executable, "-c", "import sys; sys.exit(47)"],
+ startupinfo=47)
+ self.assertRaises(ValueError, subprocess.call,
+ [sys.executable, "-c", "import sys; sys.exit(47)"],
+ creationflags=47)
+
+ def test_shell_sequence(self):
+ """Run command through the shell (sequence)"""
+ newenv = os.environ.copy()
+ newenv["FRUIT"] = "apple"
+ p = subprocess.Popen(["echo $FRUIT"], shell=1,
+ stdout=subprocess.PIPE,
+ env=newenv)
+ self.assertEqual(p.stdout.read().strip(), "apple")
+
+ def test_shell_string(self):
+ """Run command through the shell (string)"""
+ newenv = os.environ.copy()
+ newenv["FRUIT"] = "apple"
+ p = subprocess.Popen("echo $FRUIT", shell=1,
+ stdout=subprocess.PIPE,
+ env=newenv)
+ self.assertEqual(p.stdout.read().strip(), "apple")
+
+ def test_call_string(self):
+ """call() function with string argument on UNIX"""
+ f, fname = self.mkstemp()
+ os.write(f, "#!/bin/sh\n")
+ os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" % sys.executable)
+ os.close(f)
+ os.chmod(fname, 0700)
+ rc = subprocess.call(fname)
+ self.assertEqual(rc, 47)
+
+
+ #
+ # Windows tests
+ #
+ if mswindows:
+ def test_startupinfo(self):
+ """startupinfo argument"""
+ # We uses hardcoded constants, because we do not want to
+ # depend on win32all.
+ STARTF_USESHOWWINDOW = 1
+ SW_MAXIMIZE = 3
+ startupinfo = subprocess.STARTUPINFO()
+ startupinfo.dwFlags = STARTF_USESHOWWINDOW
+ startupinfo.wShowWindow = SW_MAXIMIZE
+ # Since Python is a console process, it won't be affected
+ # by wShowWindow, but the argument should be silently
+ # ignored
+ subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
+ startupinfo=startupinfo)
+
+ def test_creationflags(self):
+ """creationflags argument"""
+ CREATE_NEW_CONSOLE = 16
+ subprocess.call(sys.executable + ' -c "import time; time.sleep(2)"',
+ creationflags=CREATE_NEW_CONSOLE)
+
+ def test_invalid_args(self):
+ """invalid arguments should raise ValueError"""
+ self.assertRaises(ValueError, subprocess.call,
+ [sys.executable, "-c", "import sys; sys.exit(47)"],
+ preexec_fn=lambda: 1)
+ self.assertRaises(ValueError, subprocess.call,
+ [sys.executable, "-c", "import sys; sys.exit(47)"],
+ close_fds=True)
+
+ def test_shell_sequence(self):
+ """Run command through the shell (sequence)"""
+ newenv = os.environ.copy()
+ newenv["FRUIT"] = "physalis"
+ p = subprocess.Popen(["set"], shell=1,
+ stdout=subprocess.PIPE,
+ env=newenv)
+ self.assertNotEqual(p.stdout.read().find("physalis"), -1)
+
+ def test_shell_string(self):
+ """Run command through the shell (string)"""
+ newenv = os.environ.copy()
+ newenv["FRUIT"] = "physalis"
+ p = subprocess.Popen("set", shell=1,
+ stdout=subprocess.PIPE,
+ env=newenv)
+ self.assertNotEqual(p.stdout.read().find("physalis"), -1)
+
+ def test_call_string(self):
+ """call() function with string argument on Windows"""
+ rc = subprocess.call(sys.executable + ' -c "import sys; sys.exit(47)"')
+ self.assertEqual(rc, 47)
+
+
+
+def test_main():
+ test_support.run_unittest(ProcessTestCase)
+
+if __name__ == "__main__":
+ test_main()
+