summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_subprocess.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_subprocess.py')
-rw-r--r--Lib/test/test_subprocess.py424
1 files changed, 337 insertions, 87 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
index f5a70a3..3591b5e 100644
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -11,6 +11,7 @@ import errno
import tempfile
import time
import re
+import selectors
import sysconfig
import warnings
import select
@@ -19,10 +20,6 @@ import gc
import textwrap
try:
- import resource
-except ImportError:
- resource = None
-try:
import threading
except ImportError:
threading = None
@@ -162,8 +159,28 @@ class ProcessTestCase(BaseTestCase):
stderr=subprocess.STDOUT)
self.assertIn(b'BDFL', output)
+ def test_check_output_stdin_arg(self):
+ # check_output() can be called with stdin set to a file
+ tf = tempfile.TemporaryFile()
+ self.addCleanup(tf.close)
+ tf.write(b'pear')
+ tf.seek(0)
+ output = subprocess.check_output(
+ [sys.executable, "-c",
+ "import sys; sys.stdout.write(sys.stdin.read().upper())"],
+ stdin=tf)
+ self.assertIn(b'PEAR', output)
+
+ def test_check_output_input_arg(self):
+ # check_output() can be called with input set to a string
+ output = subprocess.check_output(
+ [sys.executable, "-c",
+ "import sys; sys.stdout.write(sys.stdin.read().upper())"],
+ input=b'pear')
+ self.assertIn(b'PEAR', output)
+
def test_check_output_stdout_arg(self):
- # check_output() function stderr redirected to stdout
+ # check_output() refuses to accept 'stdout' argument
with self.assertRaises(ValueError) as c:
output = subprocess.check_output(
[sys.executable, "-c", "print('will not be run')"],
@@ -171,6 +188,20 @@ class ProcessTestCase(BaseTestCase):
self.fail("Expected ValueError when stdout arg supplied.")
self.assertIn('stdout', c.exception.args[0])
+ def test_check_output_stdin_with_input_arg(self):
+ # check_output() refuses to accept 'stdin' with 'input'
+ tf = tempfile.TemporaryFile()
+ self.addCleanup(tf.close)
+ tf.write(b'pear')
+ tf.seek(0)
+ with self.assertRaises(ValueError) as c:
+ output = subprocess.check_output(
+ [sys.executable, "-c", "print('will not be run')"],
+ stdin=tf, input=b'hare')
+ self.fail("Expected ValueError when stdin and input args supplied.")
+ self.assertIn('stdin', c.exception.args[0])
+ self.assertIn('input', c.exception.args[0])
+
def test_check_output_timeout(self):
# check_output() function with timeout arg
with self.assertRaises(subprocess.TimeoutExpired) as c:
@@ -755,6 +786,7 @@ class ProcessTestCase(BaseTestCase):
stdout=subprocess.PIPE,
universal_newlines=1)
p.stdin.write("line1\n")
+ p.stdin.flush()
self.assertEqual(p.stdout.readline(), "line1\n")
p.stdin.write("line3\n")
p.stdin.close()
@@ -853,8 +885,9 @@ class ProcessTestCase(BaseTestCase):
#
# UTF-16 and UTF-32-BE are sufficient to check both with BOM and
# without, and UTF-16 and UTF-32.
+ import _bootlocale
for encoding in ['utf-16', 'utf-32-be']:
- old_getpreferredencoding = locale.getpreferredencoding
+ old_getpreferredencoding = _bootlocale.getpreferredencoding
# Indirectly via io.TextIOWrapper, Popen() defaults to
# locale.getpreferredencoding(False) and earlier in Python 3.2 to
# locale.getpreferredencoding().
@@ -865,7 +898,7 @@ class ProcessTestCase(BaseTestCase):
encoding)
args = [sys.executable, '-c', code]
try:
- locale.getpreferredencoding = getpreferredencoding
+ _bootlocale.getpreferredencoding = getpreferredencoding
# We set stdin to be non-None because, as of this writing,
# a different code path is used when the number of pipes is
# zero or one.
@@ -874,7 +907,7 @@ class ProcessTestCase(BaseTestCase):
stdout=subprocess.PIPE)
stdout, stderr = popen.communicate(input='')
finally:
- locale.getpreferredencoding = old_getpreferredencoding
+ _bootlocale.getpreferredencoding = old_getpreferredencoding
self.assertEqual(stdout, '1\n2\n3\n4')
def test_no_leaking(self):
@@ -975,6 +1008,39 @@ class ProcessTestCase(BaseTestCase):
p = subprocess.Popen([sys.executable, "-c", "pass"], bufsize=None)
self.assertEqual(p.wait(), 0)
+ def _test_bufsize_equal_one(self, line, expected, universal_newlines):
+ # subprocess may deadlock with bufsize=1, see issue #21332
+ with subprocess.Popen([sys.executable, "-c", "import sys;"
+ "sys.stdout.write(sys.stdin.readline());"
+ "sys.stdout.flush()"],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ bufsize=1,
+ universal_newlines=universal_newlines) as p:
+ p.stdin.write(line) # expect that it flushes the line in text mode
+ os.close(p.stdin.fileno()) # close it without flushing the buffer
+ read_line = p.stdout.readline()
+ try:
+ p.stdin.close()
+ except OSError:
+ pass
+ p.stdin = None
+ self.assertEqual(p.returncode, 0)
+ self.assertEqual(read_line, expected)
+
+ def test_bufsize_equal_one_text_mode(self):
+ # line is flushed in text mode with bufsize=1.
+ # we should get the full line in return
+ line = "line\n"
+ self._test_bufsize_equal_one(line, line, universal_newlines=True)
+
+ def test_bufsize_equal_one_binary_mode(self):
+ # line is not flushed in binary mode with bufsize=1.
+ # we should get empty response
+ line = b'line' + os.linesep.encode() # assume ascii-based locale
+ self._test_bufsize_equal_one(line, b'', universal_newlines=False)
+
def test_leaking_fds_on_error(self):
# see bug #5179: Popen leaks file descriptors to PIPEs if
# the child fails to execute; this will eventually exhaust
@@ -982,8 +1048,7 @@ class ProcessTestCase(BaseTestCase):
# value for that limit, but Windows has 2048, so we loop
# 1024 times (each call leaked two fds).
for i in range(1024):
- # Windows raises IOError. Others raise OSError.
- with self.assertRaises(EnvironmentError) as c:
+ with self.assertRaises(OSError) as c:
subprocess.Popen(['nonexisting_i_hope'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
@@ -1021,6 +1086,59 @@ class ProcessTestCase(BaseTestCase):
if exc is not None:
raise exc
+ @unittest.skipIf(threading is None, "threading required")
+ def test_threadsafe_wait(self):
+ """Issue21291: Popen.wait() needs to be threadsafe for returncode."""
+ proc = subprocess.Popen([sys.executable, '-c',
+ 'import time; time.sleep(12)'])
+ self.assertEqual(proc.returncode, None)
+ results = []
+
+ def kill_proc_timer_thread():
+ results.append(('thread-start-poll-result', proc.poll()))
+ # terminate it from the thread and wait for the result.
+ proc.kill()
+ proc.wait()
+ results.append(('thread-after-kill-and-wait', proc.returncode))
+ # this wait should be a no-op given the above.
+ proc.wait()
+ results.append(('thread-after-second-wait', proc.returncode))
+
+ # This is a timing sensitive test, the failure mode is
+ # triggered when both the main thread and this thread are in
+ # the wait() call at once. The delay here is to allow the
+ # main thread to most likely be blocked in its wait() call.
+ t = threading.Timer(0.2, kill_proc_timer_thread)
+ t.start()
+
+ if mswindows:
+ expected_errorcode = 1
+ else:
+ # Should be -9 because of the proc.kill() from the thread.
+ expected_errorcode = -9
+
+ # Wait for the process to finish; the thread should kill it
+ # long before it finishes on its own. Supplying a timeout
+ # triggers a different code path for better coverage.
+ proc.wait(timeout=20)
+ self.assertEqual(proc.returncode, expected_errorcode,
+ msg="unexpected result in wait from main thread")
+
+ # This should be a no-op with no change in returncode.
+ proc.wait()
+ self.assertEqual(proc.returncode, expected_errorcode,
+ msg="unexpected result in second main wait.")
+
+ t.join()
+ # Ensure that all of the thread results are as expected.
+ # When a race condition occurs in wait(), the returncode could
+ # be set by the wrong thread that doesn't actually have it
+ # leading to an incorrect value.
+ self.assertEqual([('thread-start-poll-result', None),
+ ('thread-after-kill-and-wait', expected_errorcode),
+ ('thread-after-second-wait', expected_errorcode)],
+ results)
+
def test_issue8780(self):
# Ensure that stdout is inherited from the parent
# if stdout=PIPE is not used
@@ -1114,47 +1232,6 @@ class ProcessTestCase(BaseTestCase):
fds_after_exception = os.listdir(fd_directory)
self.assertEqual(fds_before_popen, fds_after_exception)
-
-# context manager
-class _SuppressCoreFiles(object):
- """Try to prevent core files from being created."""
- old_limit = None
-
- def __enter__(self):
- """Try to save previous ulimit, then set it to (0, 0)."""
- if resource is not None:
- try:
- self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
- resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
- except (ValueError, resource.error):
- pass
-
- if sys.platform == 'darwin':
- # Check if the 'Crash Reporter' on OSX was configured
- # in 'Developer' mode and warn that it will get triggered
- # when it is.
- #
- # This assumes that this context manager is used in tests
- # that might trigger the next manager.
- value = subprocess.Popen(['/usr/bin/defaults', 'read',
- 'com.apple.CrashReporter', 'DialogType'],
- stdout=subprocess.PIPE).communicate()[0]
- if value.strip() == b'developer':
- print("this tests triggers the Crash Reporter, "
- "that is intentional", end='')
- sys.stdout.flush()
-
- def __exit__(self, *args):
- """Return core file behavior to default."""
- if self.old_limit is None:
- return
- if resource is not None:
- try:
- resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
- except (ValueError, resource.error):
- pass
-
-
@unittest.skipIf(mswindows, "POSIX specific tests")
class POSIXProcessTestCase(BaseTestCase):
@@ -1243,7 +1320,7 @@ class POSIXProcessTestCase(BaseTestCase):
def test_run_abort(self):
# returncode handles signal termination
- with _SuppressCoreFiles():
+ with support.SuppressCrashReport():
p = subprocess.Popen([sys.executable, "-c",
'import os; os.abort()'])
p.wait()
@@ -1266,7 +1343,7 @@ class POSIXProcessTestCase(BaseTestCase):
try:
p = subprocess.Popen([sys.executable, "-c", ""],
preexec_fn=raise_it)
- except RuntimeError as e:
+ except subprocess.SubprocessError as e:
self.assertTrue(
subprocess._posixsubprocess,
"Expected a ValueError from the preexec_fn")
@@ -1306,9 +1383,10 @@ class POSIXProcessTestCase(BaseTestCase):
"""Issue16140: Don't double close pipes on preexec error."""
def raise_it():
- raise RuntimeError("force the _execute_child() errpipe_data path.")
+ raise subprocess.SubprocessError(
+ "force the _execute_child() errpipe_data path.")
- with self.assertRaises(RuntimeError):
+ with self.assertRaises(subprocess.SubprocessError):
self._TestExecuteChildPopen(
self, [sys.executable, "-c", "pass"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
@@ -1507,16 +1585,28 @@ class POSIXProcessTestCase(BaseTestCase):
# Terminating a dead process
self._kill_dead_process('terminate')
+ def _save_fds(self, save_fds):
+ fds = []
+ for fd in save_fds:
+ inheritable = os.get_inheritable(fd)
+ saved = os.dup(fd)
+ fds.append((fd, saved, inheritable))
+ return fds
+
+ def _restore_fds(self, fds):
+ for fd, saved, inheritable in fds:
+ os.dup2(saved, fd, inheritable=inheritable)
+ os.close(saved)
+
def check_close_std_fds(self, fds):
# Issue #9905: test that subprocess pipes still work properly with
# some standard fds closed
stdin = 0
- newfds = []
- for a in fds:
- b = os.dup(a)
- newfds.append(b)
- if a == 0:
- stdin = b
+ saved_fds = self._save_fds(fds)
+ for fd, saved, inheritable in saved_fds:
+ if fd == 0:
+ stdin = saved
+ break
try:
for fd in fds:
os.close(fd)
@@ -1531,10 +1621,7 @@ class POSIXProcessTestCase(BaseTestCase):
err = support.strip_python_stderr(err)
self.assertEqual((out, err), (b'apple', b'orange'))
finally:
- for b, a in zip(newfds, fds):
- os.dup2(b, a)
- for b in newfds:
- os.close(b)
+ self._restore_fds(saved_fds)
def test_close_fd_0(self):
self.check_close_std_fds([0])
@@ -1595,7 +1682,7 @@ class POSIXProcessTestCase(BaseTestCase):
os.lseek(temp_fds[1], 0, 0)
# move the standard file descriptors out of the way
- saved_fds = [os.dup(fd) for fd in range(3)]
+ saved_fds = self._save_fds(range(3))
try:
# duplicate the file objects over the standard fd's
for fd, temp_fd in enumerate(temp_fds):
@@ -1611,10 +1698,7 @@ class POSIXProcessTestCase(BaseTestCase):
stderr=temp_fds[0])
p.wait()
finally:
- # restore the original fd's underneath sys.stdin, etc.
- for std, saved in enumerate(saved_fds):
- os.dup2(saved, std)
- os.close(saved)
+ self._restore_fds(saved_fds)
for fd in temp_fds:
os.lseek(fd, 0, 0)
@@ -1638,7 +1722,7 @@ class POSIXProcessTestCase(BaseTestCase):
os.unlink(fname)
# save a copy of the standard file descriptors
- saved_fds = [os.dup(fd) for fd in range(3)]
+ saved_fds = self._save_fds(range(3))
try:
# duplicate the temp files over the standard fd's 0, 1, 2
for fd, temp_fd in enumerate(temp_fds):
@@ -1664,9 +1748,7 @@ class POSIXProcessTestCase(BaseTestCase):
out = os.read(stdout_no, 1024)
err = support.strip_python_stderr(os.read(stderr_no, 1024))
finally:
- for std, saved in enumerate(saved_fds):
- os.dup2(saved, std)
- os.close(saved)
+ self._restore_fds(saved_fds)
self.assertEqual(out, b"got STDIN")
self.assertEqual(err, b"err")
@@ -1698,40 +1780,47 @@ class POSIXProcessTestCase(BaseTestCase):
# Pure Python implementations keeps the message
self.assertIsNone(subprocess._posixsubprocess)
self.assertEqual(str(err), "surrogate:\uDCff")
- except RuntimeError as err:
+ except subprocess.SubprocessError as err:
# _posixsubprocess uses a default message
self.assertIsNotNone(subprocess._posixsubprocess)
self.assertEqual(str(err), "Exception occurred in preexec_fn.")
else:
- self.fail("Expected ValueError or RuntimeError")
+ self.fail("Expected ValueError or subprocess.SubprocessError")
def test_undecodable_env(self):
for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')):
+ encoded_value = value.encode("ascii", "surrogateescape")
+
# test str with surrogates
script = "import os; print(ascii(os.getenv(%s)))" % repr(key)
env = os.environ.copy()
env[key] = value
- # Use C locale to get ascii for the locale encoding to force
+ # Use C locale to get ASCII for the locale encoding to force
# surrogate-escaping of \xFF in the child process; otherwise it can
# be decoded as-is if the default locale is latin-1.
env['LC_ALL'] = 'C'
+ if sys.platform.startswith("aix"):
+ # On AIX, the C locale uses the Latin1 encoding
+ decoded_value = encoded_value.decode("latin1", "surrogateescape")
+ else:
+ # On other UNIXes, the C locale uses the ASCII encoding
+ decoded_value = value
stdout = subprocess.check_output(
[sys.executable, "-c", script],
env=env)
stdout = stdout.rstrip(b'\n\r')
- self.assertEqual(stdout.decode('ascii'), ascii(value))
+ self.assertEqual(stdout.decode('ascii'), ascii(decoded_value))
# test bytes
key = key.encode("ascii", "surrogateescape")
- value = value.encode("ascii", "surrogateescape")
script = "import os; print(ascii(os.getenvb(%s)))" % repr(key)
env = os.environ.copy()
- env[key] = value
+ env[key] = encoded_value
stdout = subprocess.check_output(
[sys.executable, "-c", script],
env=env)
stdout = stdout.rstrip(b'\n\r')
- self.assertEqual(stdout.decode('ascii'), ascii(value))
+ self.assertEqual(stdout.decode('ascii'), ascii(encoded_value))
def test_bytes_program(self):
abs_program = os.fsencode(sys.executable)
@@ -1833,10 +1922,13 @@ class POSIXProcessTestCase(BaseTestCase):
open_fds = set(fds)
# add a bunch more fds
for _ in range(9):
- fd = os.open("/dev/null", os.O_RDONLY)
+ fd = os.open(os.devnull, os.O_RDONLY)
self.addCleanup(os.close, fd)
open_fds.add(fd)
+ for fd in open_fds:
+ os.set_inheritable(fd, True)
+
p = subprocess.Popen([sys.executable, fd_status],
stdout=subprocess.PIPE, close_fds=False)
output, ignored = p.communicate()
@@ -1867,6 +1959,86 @@ class POSIXProcessTestCase(BaseTestCase):
"Some fds not in pass_fds were left open")
self.assertIn(1, remaining_fds, "Subprocess failed")
+
+ @unittest.skipIf(sys.platform.startswith("freebsd") and
+ os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev,
+ "Requires fdescfs mounted on /dev/fd on FreeBSD.")
+ def test_close_fds_when_max_fd_is_lowered(self):
+ """Confirm that issue21618 is fixed (may fail under valgrind)."""
+ fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
+
+ # This launches the meat of the test in a child process to
+ # avoid messing with the larger unittest processes maximum
+ # number of file descriptors.
+ # This process launches:
+ # +--> Process that lowers its RLIMIT_NOFILE aftr setting up
+ # a bunch of high open fds above the new lower rlimit.
+ # Those are reported via stdout before launching a new
+ # process with close_fds=False to run the actual test:
+ # +--> The TEST: This one launches a fd_status.py
+ # subprocess with close_fds=True so we can find out if
+ # any of the fds above the lowered rlimit are still open.
+ p = subprocess.Popen([sys.executable, '-c', textwrap.dedent(
+ '''
+ import os, resource, subprocess, sys, textwrap
+ open_fds = set()
+ # Add a bunch more fds to pass down.
+ for _ in range(40):
+ fd = os.open(os.devnull, os.O_RDONLY)
+ open_fds.add(fd)
+
+ # Leave a two pairs of low ones available for use by the
+ # internal child error pipe and the stdout pipe.
+ # We also leave 10 more open as some Python buildbots run into
+ # "too many open files" errors during the test if we do not.
+ for fd in sorted(open_fds)[:14]:
+ os.close(fd)
+ open_fds.remove(fd)
+
+ for fd in open_fds:
+ #self.addCleanup(os.close, fd)
+ os.set_inheritable(fd, True)
+
+ max_fd_open = max(open_fds)
+
+ # Communicate the open_fds to the parent unittest.TestCase process.
+ print(','.join(map(str, sorted(open_fds))))
+ sys.stdout.flush()
+
+ rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE)
+ try:
+ # 29 is lower than the highest fds we are leaving open.
+ resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max))
+ # Launch a new Python interpreter with our low fd rlim_cur that
+ # inherits open fds above that limit. It then uses subprocess
+ # with close_fds=True to get a report of open fds in the child.
+ # An explicit list of fds to check is passed to fd_status.py as
+ # letting fd_status rely on its default logic would miss the
+ # fds above rlim_cur as it normally only checks up to that limit.
+ subprocess.Popen(
+ [sys.executable, '-c',
+ textwrap.dedent("""
+ import subprocess, sys
+ subprocess.Popen([sys.executable, %r] +
+ [str(x) for x in range({max_fd})],
+ close_fds=True).wait()
+ """.format(max_fd=max_fd_open+1))],
+ close_fds=False).wait()
+ finally:
+ resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max))
+ ''' % fd_status)], stdout=subprocess.PIPE)
+
+ output, unused_stderr = p.communicate()
+ output_lines = output.splitlines()
+ self.assertEqual(len(output_lines), 2,
+ msg="expected exactly two lines of output:\n%r" % output)
+ opened_fds = set(map(int, output_lines[0].strip().split(b',')))
+ remaining_fds = set(map(int, output_lines[1].strip().split(b',')))
+
+ self.assertFalse(remaining_fds & opened_fds,
+ msg="Some fds were left open.")
+
+
# Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file
# descriptor of a pipe closed in the parent process is valid in the
# child process according to fstat(), but the mode of the file
@@ -1881,6 +2053,8 @@ class POSIXProcessTestCase(BaseTestCase):
fds = os.pipe()
self.addCleanup(os.close, fds[0])
self.addCleanup(os.close, fds[1])
+ os.set_inheritable(fds[0], True)
+ os.set_inheritable(fds[1], True)
open_fds.update(fds)
for fd in open_fds:
@@ -1903,6 +2077,32 @@ class POSIXProcessTestCase(BaseTestCase):
close_fds=False, pass_fds=(fd, )))
self.assertIn('overriding close_fds', str(context.warning))
+ def test_pass_fds_inheritable(self):
+ script = support.findfile("fd_status.py", subdir="subprocessdata")
+
+ inheritable, non_inheritable = os.pipe()
+ self.addCleanup(os.close, inheritable)
+ self.addCleanup(os.close, non_inheritable)
+ os.set_inheritable(inheritable, True)
+ os.set_inheritable(non_inheritable, False)
+ pass_fds = (inheritable, non_inheritable)
+ args = [sys.executable, script]
+ args += list(map(str, pass_fds))
+
+ p = subprocess.Popen(args,
+ stdout=subprocess.PIPE, close_fds=True,
+ pass_fds=pass_fds)
+ output, ignored = p.communicate()
+ fds = set(map(int, output.split(b',')))
+
+ # the inheritable file descriptor must be inherited, so its inheritable
+ # flag must be set in the child process after fork() and before exec()
+ self.assertEqual(fds, set(pass_fds), "output=%a" % output)
+
+ # inheritable flag must not be changed in the parent process
+ self.assertEqual(os.get_inheritable(inheritable), True)
+ self.assertEqual(os.get_inheritable(non_inheritable), False)
+
def test_stdout_stdin_are_single_inout_fd(self):
with io.open(os.devnull, "r+") as inout:
p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
@@ -1990,7 +2190,7 @@ class POSIXProcessTestCase(BaseTestCase):
# let some time for the process to exit, and create a new Popen: this
# should trigger the wait() of p
time.sleep(0.2)
- with self.assertRaises(EnvironmentError) as c:
+ with self.assertRaises(OSError) as c:
with subprocess.Popen(['nonexisting_i_hope'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as proc:
@@ -2016,6 +2216,39 @@ class POSIXProcessTestCase(BaseTestCase):
self.assertNotIn(fd, remaining_fds)
+ @support.cpython_only
+ def test_fork_exec(self):
+ # Issue #22290: fork_exec() must not crash on memory allocation failure
+ # or other errors
+ import _posixsubprocess
+ gc_enabled = gc.isenabled()
+ try:
+ # Use a preexec function and enable the garbage collector
+ # to force fork_exec() to re-enable the garbage collector
+ # on error.
+ func = lambda: None
+ gc.enable()
+
+ executable_list = "exec" # error: must be a sequence
+
+ for args, exe_list, cwd, env_list in (
+ (123, [b"exe"], None, [b"env"]),
+ ([b"arg"], 123, None, [b"env"]),
+ ([b"arg"], [b"exe"], 123, [b"env"]),
+ ([b"arg"], [b"exe"], None, 123),
+ ):
+ with self.assertRaises(TypeError):
+ _posixsubprocess.fork_exec(
+ args, exe_list,
+ True, [], cwd, env_list,
+ -1, -1, -1, -1,
+ 1, 2, 3, 4,
+ True, True, func)
+ finally:
+ if not gc_enabled:
+ gc.disable()
+
+
@unittest.skipUnless(mswindows, "Windows specific tests")
class Win32ProcessTestCase(BaseTestCase):
@@ -2175,15 +2408,16 @@ class CommandTests(unittest.TestCase):
os.rmdir(dir)
-@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
- "poll system call not supported")
+@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
+ "Test needs selectors.PollSelector")
class ProcessTestCaseNoPoll(ProcessTestCase):
def setUp(self):
- subprocess._has_poll = False
+ self.orig_selector = subprocess._PopenSelector
+ subprocess._PopenSelector = selectors.SelectSelector
ProcessTestCase.setUp(self)
def tearDown(self):
- subprocess._has_poll = True
+ subprocess._PopenSelector = self.orig_selector
ProcessTestCase.tearDown(self)
@@ -2287,6 +2521,22 @@ class ContextManagerTests(BaseTestCase):
stderr=subprocess.PIPE) as proc:
pass
+ def test_broken_pipe_cleanup(self):
+ """Broken pipe error should not prevent wait() (Issue 21619)"""
+ proc = subprocess.Popen([sys.executable, '-c', 'pass'],
+ stdin=subprocess.PIPE,
+ bufsize=support.PIPE_MAX_SIZE*2)
+ proc = proc.__enter__()
+ # Prepare to send enough data to overflow any OS pipe buffering and
+ # guarantee a broken pipe error. Data is held in BufferedWriter
+ # buffer until closed.
+ proc.stdin.write(b'x' * support.PIPE_MAX_SIZE)
+ self.assertIsNone(proc.returncode)
+ # EPIPE expected under POSIX; EINVAL under Windows
+ self.assertRaises(OSError, proc.__exit__, None, None, None)
+ self.assertEqual(proc.returncode, 0)
+ self.assertTrue(proc.stdin.closed)
+
def test_main():
unit_tests = (ProcessTestCase,