summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_subprocess.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_subprocess.py')
-rw-r--r--Lib/test/test_subprocess.py392
1 files changed, 289 insertions, 103 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
index 1a50de3..f5a70a3 100644
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -16,11 +16,16 @@ import warnings
import select
import shutil
import gc
+import textwrap
try:
import resource
except ImportError:
resource = None
+try:
+ import threading
+except ImportError:
+ threading = None
mswindows = (sys.platform == "win32")
@@ -62,6 +67,8 @@ class BaseTestCase(unittest.TestCase):
# shutdown time. That frustrates tests trying to check stderr produced
# from a spawned Python process.
actual = support.strip_python_stderr(stderr)
+ # strip_python_stderr also strips whitespace, so we do too.
+ expected = expected.strip()
self.assertEqual(actual, expected, msg)
@@ -88,6 +95,9 @@ class ProcessTestCase(BaseTestCase):
self.assertIsInstance(p.stdout, io.BufferedIOBase)
self.assertIsInstance(p.stderr, io.BufferedIOBase)
finally:
+ p.stdin.close()
+ p.stdout.close()
+ p.stderr.close()
p.wait()
def test_io_unbuffered_works(self):
@@ -99,6 +109,9 @@ class ProcessTestCase(BaseTestCase):
self.assertIsInstance(p.stdout, io.RawIOBase)
self.assertIsInstance(p.stderr, io.RawIOBase)
finally:
+ p.stdin.close()
+ p.stdout.close()
+ p.stderr.close()
p.wait()
def test_call_seq(self):
@@ -107,6 +120,15 @@ class ProcessTestCase(BaseTestCase):
"import sys; sys.exit(47)"])
self.assertEqual(rc, 47)
+ def test_call_timeout(self):
+ # call() function with timeout argument; we want to test that the child
+ # process gets killed when the timeout expires. If the child isn't
+ # killed, this call will deadlock since subprocess.call waits for the
+ # child.
+ self.assertRaises(subprocess.TimeoutExpired, subprocess.call,
+ [sys.executable, "-c", "while True: pass"],
+ timeout=0.1)
+
def test_check_call_zero(self):
# check_call() function with zero return code
rc = subprocess.check_call([sys.executable, "-c",
@@ -149,6 +171,21 @@ class ProcessTestCase(BaseTestCase):
self.fail("Expected ValueError when stdout arg supplied.")
self.assertIn('stdout', c.exception.args[0])
+ def test_check_output_timeout(self):
+ # check_output() function with timeout arg
+ with self.assertRaises(subprocess.TimeoutExpired) as c:
+ output = subprocess.check_output(
+ [sys.executable, "-c",
+ "import sys, time\n"
+ "sys.stdout.write('BDFL')\n"
+ "sys.stdout.flush()\n"
+ "time.sleep(3600)"],
+ # Some heavily loaded buildbots (sparc Debian 3.x) require
+ # this much time to start and print.
+ timeout=3)
+ self.fail("Expected TimeoutExpired.")
+ self.assertEqual(c.exception.output, b'BDFL')
+
def test_call_kwargs(self):
# call() function with keyword args
newenv = os.environ.copy()
@@ -210,6 +247,40 @@ class ProcessTestCase(BaseTestCase):
p.wait()
self.assertEqual(p.stderr, None)
+ def _assert_python(self, pre_args, **kwargs):
+ # We include sys.exit() to prevent the test runner from hanging
+ # whenever python is found.
+ args = pre_args + ["import sys; sys.exit(47)"]
+ p = subprocess.Popen(args, **kwargs)
+ p.wait()
+ self.assertEqual(47, p.returncode)
+
+ def test_executable(self):
+ # Check that the executable argument works.
+ #
+ # On Unix (non-Mac and non-Windows), Python looks at args[0] to
+ # determine where its standard library is, so we need the directory
+ # of args[0] to be valid for the Popen() call to Python to succeed.
+ # See also issue #16170 and issue #7774.
+ doesnotexist = os.path.join(os.path.dirname(sys.executable),
+ "doesnotexist")
+ self._assert_python([doesnotexist, "-c"], executable=sys.executable)
+
+ def test_executable_takes_precedence(self):
+ # Check that the executable argument takes precedence over args[0].
+ #
+ # Verify first that the call succeeds without the executable arg.
+ pre_args = [sys.executable, "-c"]
+ self._assert_python(pre_args)
+ self.assertRaises(FileNotFoundError, self._assert_python, pre_args,
+ executable="doesnotexist")
+
+ @unittest.skipIf(mswindows, "executable argument replaces shell")
+ def test_executable_replaces_shell(self):
+ # Check that the executable argument replaces the default shell
+ # when shell=True.
+ self._assert_python([], executable=sys.executable, shell=True)
+
# For use in the test_cwd* tests below.
def _normalize_cwd(self, cwd):
# Normalize an expected cwd (for Tru64 support).
@@ -260,9 +331,9 @@ class ProcessTestCase(BaseTestCase):
with support.temp_cwd() as wrong_dir:
# Before calling with the correct cwd, confirm that the call fails
# without cwd and with the wrong cwd.
- self.assertRaises(OSError, subprocess.Popen,
+ self.assertRaises(FileNotFoundError, subprocess.Popen,
[rel_python])
- self.assertRaises(OSError, subprocess.Popen,
+ self.assertRaises(FileNotFoundError, subprocess.Popen,
[rel_python], cwd=wrong_dir)
python_dir = self._normalize_cwd(python_dir)
self._assert_cwd(python_dir, rel_python, cwd=python_dir)
@@ -277,9 +348,9 @@ class ProcessTestCase(BaseTestCase):
with support.temp_cwd() as wrong_dir:
# Before calling with the correct cwd, confirm that the call fails
# without cwd and with the wrong cwd.
- self.assertRaises(OSError, subprocess.Popen,
+ self.assertRaises(FileNotFoundError, subprocess.Popen,
[doesntexist], executable=rel_python)
- self.assertRaises(OSError, subprocess.Popen,
+ self.assertRaises(FileNotFoundError, subprocess.Popen,
[doesntexist], executable=rel_python,
cwd=wrong_dir)
python_dir = self._normalize_cwd(python_dir)
@@ -295,23 +366,28 @@ class ProcessTestCase(BaseTestCase):
with script_helper.temp_dir() as wrong_dir:
# Before calling with an absolute path, confirm that using a
# relative path fails.
- self.assertRaises(OSError, subprocess.Popen,
+ self.assertRaises(FileNotFoundError, subprocess.Popen,
[rel_python], cwd=wrong_dir)
wrong_dir = self._normalize_cwd(wrong_dir)
self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir)
+ @unittest.skipIf(sys.base_prefix != sys.prefix,
+ 'Test is not venv-compatible')
def test_executable_with_cwd(self):
python_dir, python_base = self._split_python_path()
python_dir = self._normalize_cwd(python_dir)
self._assert_cwd(python_dir, "somethingyoudonthave",
executable=sys.executable, cwd=python_dir)
+ @unittest.skipIf(sys.base_prefix != sys.prefix,
+ 'Test is not venv-compatible')
@unittest.skipIf(sysconfig.is_python_build(),
"need an installed Python. See #7774")
def test_executable_without_cwd(self):
# For a normal installation, it should work without 'cwd'
# argument. For test runs in the build directory, see #7774.
- self._assert_cwd('', "somethingyoudonthave", executable=sys.executable)
+ self._assert_cwd(os.getcwd(), "somethingyoudonthave",
+ executable=sys.executable)
def test_stdin_pipe(self):
# stdin redirection
@@ -456,6 +532,31 @@ class ProcessTestCase(BaseTestCase):
self.assertEqual(p.returncode, 0, err)
self.assertEqual(out.rstrip(), b'test with stdout=1')
+ def test_stdout_devnull(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'for i in range(10240):'
+ 'print("x" * 1024)'],
+ stdout=subprocess.DEVNULL)
+ p.wait()
+ self.assertEqual(p.stdout, None)
+
+ def test_stderr_devnull(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys\n'
+ 'for i in range(10240):'
+ 'sys.stderr.write("x" * 1024)'],
+ stderr=subprocess.DEVNULL)
+ p.wait()
+ self.assertEqual(p.stderr, None)
+
+ def test_stdin_devnull(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys;'
+ 'sys.stdin.read(1)'],
+ stdin=subprocess.DEVNULL)
+ p.wait()
+ self.assertEqual(p.stdin, None)
+
def test_env(self):
newenv = os.environ.copy()
newenv["FRUIT"] = "orange"
@@ -526,6 +627,41 @@ class ProcessTestCase(BaseTestCase):
self.assertEqual(stdout, b"banana")
self.assertStderrEqual(stderr, b"pineapple")
+ def test_communicate_timeout(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os,time;'
+ 'sys.stderr.write("pineapple\\n");'
+ 'time.sleep(1);'
+ 'sys.stderr.write("pear\\n");'
+ 'sys.stdout.write(sys.stdin.read())'],
+ universal_newlines=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana",
+ timeout=0.3)
+ # Make sure we can keep waiting for it, and that we get the whole output
+ # after it completes.
+ (stdout, stderr) = p.communicate()
+ self.assertEqual(stdout, "banana")
+ self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n")
+
+ def test_communicate_timeout_large_ouput(self):
+ # Test an expiring timeout while the child is outputting lots of data.
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os,time;'
+ 'sys.stdout.write("a" * (64 * 1024));'
+ 'time.sleep(0.2);'
+ 'sys.stdout.write("a" * (64 * 1024));'
+ 'time.sleep(0.2);'
+ 'sys.stdout.write("a" * (64 * 1024));'
+ 'time.sleep(0.2);'
+ 'sys.stdout.write("a" * (64 * 1024));'],
+ stdout=subprocess.PIPE)
+ self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
+ (stdout, _) = p.communicate()
+ self.assertEqual(len(stdout), 4 * 64 * 1024)
+
# Test for the fd leak reported in http://bugs.python.org/issue2791.
def test_communicate_pipe_fd_leak(self):
for stdin_pipe in (False, True):
@@ -562,24 +698,21 @@ class ProcessTestCase(BaseTestCase):
# This test will probably deadlock rather than fail, if
# communicate() does not work properly.
x, y = os.pipe()
- if mswindows:
- pipe_buf = 512
- else:
- pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
os.close(x)
os.close(y)
p = subprocess.Popen([sys.executable, "-c",
'import sys,os;'
'sys.stdout.write(sys.stdin.read(47));'
- 'sys.stderr.write("xyz"*%d);'
- 'sys.stdout.write(sys.stdin.read())' % pipe_buf],
+ 'sys.stderr.write("x" * %d);'
+ 'sys.stdout.write(sys.stdin.read())' %
+ support.PIPE_MAX_SIZE],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
self.addCleanup(p.stdin.close)
- string_to_write = b"abc"*pipe_buf
+ string_to_write = b"a" * support.PIPE_MAX_SIZE
(stdout, stderr) = p.communicate(string_to_write)
self.assertEqual(stdout, string_to_write)
@@ -661,12 +794,12 @@ class ProcessTestCase(BaseTestCase):
def test_universal_newlines_communicate_stdin(self):
# universal newlines through communicate(), with only stdin
p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;' + SETBINARY + '''\nif True:
- s = sys.stdin.readline()
- assert s == "line1\\n", repr(s)
- s = sys.stdin.read()
- assert s == "line3\\n", repr(s)
- '''],
+ 'import sys,os;' + SETBINARY + textwrap.dedent('''
+ s = sys.stdin.readline()
+ assert s == "line1\\n", repr(s)
+ s = sys.stdin.read()
+ assert s == "line3\\n", repr(s)
+ ''')],
stdin=subprocess.PIPE,
universal_newlines=1)
(stdout, stderr) = p.communicate("line1\nline3\n")
@@ -687,18 +820,18 @@ class ProcessTestCase(BaseTestCase):
def test_universal_newlines_communicate_stdin_stdout_stderr(self):
# universal newlines through communicate(), with stdin, stdout, stderr
p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;' + SETBINARY + '''\nif True:
- s = sys.stdin.buffer.readline()
- sys.stdout.buffer.write(s)
- sys.stdout.buffer.write(b"line2\\r")
- sys.stderr.buffer.write(b"eline2\\n")
- s = sys.stdin.buffer.read()
- sys.stdout.buffer.write(s)
- sys.stdout.buffer.write(b"line4\\n")
- sys.stdout.buffer.write(b"line5\\r\\n")
- sys.stderr.buffer.write(b"eline6\\r")
- sys.stderr.buffer.write(b"eline7\\r\\nz")
- '''],
+ 'import sys,os;' + SETBINARY + textwrap.dedent('''
+ s = sys.stdin.buffer.readline()
+ sys.stdout.buffer.write(s)
+ sys.stdout.buffer.write(b"line2\\r")
+ sys.stderr.buffer.write(b"eline2\\n")
+ s = sys.stdin.buffer.read()
+ sys.stdout.buffer.write(s)
+ sys.stdout.buffer.write(b"line4\\n")
+ sys.stdout.buffer.write(b"line5\\r\\n")
+ sys.stderr.buffer.write(b"eline6\\r")
+ sys.stderr.buffer.write(b"eline7\\r\\nz")
+ ''')],
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
@@ -742,7 +875,6 @@ class ProcessTestCase(BaseTestCase):
stdout, stderr = popen.communicate(input='')
finally:
locale.getpreferredencoding = old_getpreferredencoding
-
self.assertEqual(stdout, '1\n2\n3\n4')
def test_no_leaking(self):
@@ -802,30 +934,32 @@ class ProcessTestCase(BaseTestCase):
self.assertEqual(subprocess.list2cmdline(['ab', '']),
'ab ""')
-
def test_poll(self):
- p = subprocess.Popen([sys.executable,
- "-c", "import time; time.sleep(1)"])
- count = 0
- while p.poll() is None:
- time.sleep(0.1)
- count += 1
- # We expect that the poll loop probably went around about 10 times,
- # but, based on system scheduling we can't control, it's possible
- # poll() never returned None. It "should be" very rare that it
- # didn't go around at least twice.
- self.assertGreaterEqual(count, 2)
+ p = subprocess.Popen([sys.executable, "-c",
+ "import os; os.read(0, 1)"],
+ stdin=subprocess.PIPE)
+ self.addCleanup(p.stdin.close)
+ self.assertIsNone(p.poll())
+ os.write(p.stdin.fileno(), b'A')
+ p.wait()
# Subsequent invocations should just return the returncode
self.assertEqual(p.poll(), 0)
-
def test_wait(self):
- p = subprocess.Popen([sys.executable,
- "-c", "import time; time.sleep(2)"])
+ p = subprocess.Popen([sys.executable, "-c", "pass"])
self.assertEqual(p.wait(), 0)
# Subsequent invocations should just return the returncode
self.assertEqual(p.wait(), 0)
+ def test_wait_timeout(self):
+ p = subprocess.Popen([sys.executable,
+ "-c", "import time; time.sleep(0.3)"])
+ with self.assertRaises(subprocess.TimeoutExpired) as c:
+ p.wait(timeout=0.0001)
+ self.assertIn("0.0001", str(c.exception)) # For coverage of __str__.
+ # Some heavily loaded buildbots (sparc Debian 3.x) require this much
+ # time to start.
+ self.assertEqual(p.wait(timeout=3), 0)
def test_invalid_bufsize(self):
# an invalid type of the bufsize argument should raise
@@ -857,6 +991,36 @@ class ProcessTestCase(BaseTestCase):
if c.exception.errno not in (errno.ENOENT, errno.EACCES):
raise c.exception
+ @unittest.skipIf(threading is None, "threading required")
+ def test_double_close_on_error(self):
+ # Issue #18851
+ fds = []
+ def open_fds():
+ for i in range(20):
+ fds.extend(os.pipe())
+ time.sleep(0.001)
+ t = threading.Thread(target=open_fds)
+ t.start()
+ try:
+ with self.assertRaises(EnvironmentError):
+ subprocess.Popen(['nonexisting_i_hope'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ finally:
+ t.join()
+ exc = None
+ for fd in fds:
+ # If a double close occurred, some of those fds will
+ # already have been closed by mistake, and os.close()
+ # here will raise.
+ try:
+ os.close(fd)
+ except OSError as e:
+ exc = e
+ if exc is not None:
+ raise exc
+
def test_issue8780(self):
# Ensure that stdout is inherited from the parent
# if stdout=PIPE is not used
@@ -904,25 +1068,29 @@ class ProcessTestCase(BaseTestCase):
p = subprocess.Popen([sys.executable, "-c", 'pass'],
stdin=subprocess.PIPE)
self.addCleanup(p.stdin.close)
- time.sleep(2)
+ p.wait()
p.communicate(b"x" * 2**20)
- @unittest.skipUnless(hasattr(signal, 'SIGALRM'),
- "Requires signal.SIGALRM")
+ @unittest.skipUnless(hasattr(signal, 'SIGUSR1'),
+ "Requires signal.SIGUSR1")
+ @unittest.skipUnless(hasattr(os, 'kill'),
+ "Requires os.kill")
+ @unittest.skipUnless(hasattr(os, 'getppid'),
+ "Requires os.getppid")
def test_communicate_eintr(self):
# Issue #12493: communicate() should handle EINTR
def handler(signum, frame):
pass
- old_handler = signal.signal(signal.SIGALRM, handler)
- self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
+ old_handler = signal.signal(signal.SIGUSR1, handler)
+ self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
- # the process is running for 2 seconds
- args = [sys.executable, "-c", 'import time; time.sleep(2)']
+ args = [sys.executable, "-c",
+ 'import os, signal;'
+ 'os.kill(os.getppid(), signal.SIGUSR1)']
for stream in ('stdout', 'stderr'):
kw = {stream: subprocess.PIPE}
with subprocess.Popen(args, **kw) as process:
- signal.alarm(1)
- # communicate() will be interrupted by SIGALRM
+ # communicate() will be interrupted by SIGUSR1
process.communicate()
@@ -1130,7 +1298,8 @@ class POSIXProcessTestCase(BaseTestCase):
self.stderr.fileno()),
msg="At least one fd was closed early.")
finally:
- map(os.close, devzero_fds)
+ for fd in devzero_fds:
+ os.close(fd)
@unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
def test_preexec_errpipe_does_not_double_close_pipes(self):
@@ -1263,16 +1432,22 @@ class POSIXProcessTestCase(BaseTestCase):
def _kill_process(self, method, *args):
# Do not inherit file handles from the parent.
# It should fix failures on some platforms.
- p = subprocess.Popen([sys.executable, "-c", """if 1:
- import sys, time
- sys.stdout.write('x\\n')
- sys.stdout.flush()
- time.sleep(30)
- """],
- close_fds=True,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ # Also set the SIGINT handler to the default to make sure it's not
+ # being ignored (some tests rely on that.)
+ old_handler = signal.signal(signal.SIGINT, signal.default_int_handler)
+ try:
+ p = subprocess.Popen([sys.executable, "-c", """if 1:
+ import sys, time
+ sys.stdout.write('x\\n')
+ sys.stdout.flush()
+ time.sleep(30)
+ """],
+ close_fds=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ finally:
+ signal.signal(signal.SIGINT, old_handler)
# Wait for the interpreter to be completely initialized before
# sending any signal.
p.stdout.read(1)
@@ -1384,6 +1559,27 @@ class POSIXProcessTestCase(BaseTestCase):
# all standard fds closed.
self.check_close_std_fds([0, 1, 2])
+ def test_small_errpipe_write_fd(self):
+ """Issue #15798: Popen should work when stdio fds are available."""
+ new_stdin = os.dup(0)
+ new_stdout = os.dup(1)
+ try:
+ os.close(0)
+ os.close(1)
+
+ # Side test: if errpipe_write fails to have its CLOEXEC
+ # flag set this should cause the parent to think the exec
+ # failed. Extremely unlikely: everyone supports CLOEXEC.
+ subprocess.Popen([
+ sys.executable, "-c",
+ "print('AssertionError:0:CLOEXEC failure.')"]).wait()
+ finally:
+ # Restore original stdin and stdout
+ os.dup2(new_stdin, 0)
+ os.dup2(new_stdout, 1)
+ os.close(new_stdin)
+ os.close(new_stdout)
+
def test_remapping_std_fds(self):
# open up some temporary files
temps = [mkstemp() for i in range(3)]
@@ -1546,6 +1742,11 @@ class POSIXProcessTestCase(BaseTestCase):
exitcode = subprocess.call([abs_program, "-c", "pass"])
self.assertEqual(exitcode, 0)
+ # absolute bytes path as a string
+ cmd = b"'" + abs_program + b"' -c pass"
+ exitcode = subprocess.call(cmd, shell=True)
+ self.assertEqual(exitcode, 0)
+
# bytes program, unicode PATH
env = os.environ.copy()
env["PATH"] = path
@@ -1729,7 +1930,7 @@ class POSIXProcessTestCase(BaseTestCase):
stdout, stderr = p.communicate()
self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
" non-zero with this error:\n%s" %
- stderr.decode('utf8'))
+ stderr.decode('utf-8'))
def test_select_unbuffered(self):
# Issue #11459: bufsize=0 should really set the pipes as
@@ -1798,6 +1999,23 @@ class POSIXProcessTestCase(BaseTestCase):
self.assertRaises(OSError, os.waitpid, pid, 0)
self.assertNotIn(ident, [id(o) for o in subprocess._active])
+ def test_close_fds_after_preexec(self):
+ fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
+
+ # this FD is used as dup2() target by preexec_fn, and should be closed
+ # in the child process
+ fd = os.dup(1)
+ self.addCleanup(os.close, fd)
+
+ p = subprocess.Popen([sys.executable, fd_status],
+ stdout=subprocess.PIPE, close_fds=True,
+ preexec_fn=lambda: os.dup2(1, fd))
+ output, ignored = p.communicate()
+
+ remaining_fds = set(map(int, output.split(b',')))
+
+ self.assertNotIn(fd, remaining_fds)
+
@unittest.skipUnless(mswindows, "Windows specific tests")
class Win32ProcessTestCase(BaseTestCase):
@@ -1936,13 +2154,6 @@ class Win32ProcessTestCase(BaseTestCase):
def test_terminate_dead(self):
self._kill_dead_process('terminate')
-
-# The module says:
-# "NB This only works (and is only relevant) for UNIX."
-#
-# Actually, getoutput should work on any platform with an os.popen, but
-# I'll take the comment as given, and skip this suite.
-@unittest.skipUnless(os.name == 'posix', "only relevant for UNIX")
class CommandTests(unittest.TestCase):
def test_getoutput(self):
self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
@@ -1956,8 +2167,8 @@ class CommandTests(unittest.TestCase):
try:
dir = tempfile.mkdtemp()
name = os.path.join(dir, "foo")
-
- status, output = subprocess.getstatusoutput('cat ' + name)
+ status, output = subprocess.getstatusoutput(
+ ("type " if mswindows else "cat ") + name)
self.assertNotEqual(status, 0)
finally:
if dir is not None:
@@ -1976,28 +2187,6 @@ class ProcessTestCaseNoPoll(ProcessTestCase):
ProcessTestCase.tearDown(self)
-@unittest.skipUnless(getattr(subprocess, '_posixsubprocess', False),
- "_posixsubprocess extension module not found.")
-class ProcessTestCasePOSIXPurePython(ProcessTestCase, POSIXProcessTestCase):
- @classmethod
- def setUpClass(cls):
- global subprocess
- assert subprocess._posixsubprocess
- # Reimport subprocess while forcing _posixsubprocess to not exist.
- with support.check_warnings(('.*_posixsubprocess .* not being used.*',
- RuntimeWarning)):
- subprocess = support.import_fresh_module(
- 'subprocess', blocked=['_posixsubprocess'])
- assert not subprocess._posixsubprocess
-
- @classmethod
- def tearDownClass(cls):
- global subprocess
- # Reimport subprocess as it should be, restoring order to the universe.
- subprocess = support.import_fresh_module('subprocess')
- assert subprocess._posixsubprocess
-
-
class HelperFunctionTests(unittest.TestCase):
@unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
def test_eintr_retry_call(self):
@@ -2092,20 +2281,17 @@ class ContextManagerTests(BaseTestCase):
self.assertEqual(proc.returncode, 1)
def test_invalid_args(self):
- with self.assertRaises(EnvironmentError) as c:
+ with self.assertRaises(FileNotFoundError) as c:
with subprocess.Popen(['nonexisting_i_hope'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as proc:
pass
- self.assertEqual(c.exception.errno, errno.ENOENT)
-
def test_main():
unit_tests = (ProcessTestCase,
POSIXProcessTestCase,
Win32ProcessTestCase,
- ProcessTestCasePOSIXPurePython,
CommandTests,
ProcessTestCaseNoPoll,
HelperFunctionTests,