diff options
Diffstat (limited to 'Lib/test/test_subprocess.py')
| -rw-r--r-- | Lib/test/test_subprocess.py | 475 |
1 files changed, 365 insertions, 110 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index f5a70a3..4719cc0 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -11,6 +11,7 @@ import errno import tempfile import time import re +import selectors import sysconfig import warnings import select @@ -19,10 +20,6 @@ import gc import textwrap try: - import resource -except ImportError: - resource = None -try: import threading except ImportError: threading = None @@ -40,16 +37,6 @@ else: SETBINARY = '' -try: - mkstemp = tempfile.mkstemp -except AttributeError: - # tempfile.mkstemp is not available - def mkstemp(): - """Replacement for mkstemp, calling mktemp.""" - fname = tempfile.mktemp() - return os.open(fname, os.O_RDWR|os.O_CREAT), fname - - class BaseTestCase(unittest.TestCase): def setUp(self): # Try to minimize the number of children we have so this test @@ -162,8 +149,28 @@ class ProcessTestCase(BaseTestCase): stderr=subprocess.STDOUT) self.assertIn(b'BDFL', output) + def test_check_output_stdin_arg(self): + # check_output() can be called with stdin set to a file + tf = tempfile.TemporaryFile() + self.addCleanup(tf.close) + tf.write(b'pear') + tf.seek(0) + output = subprocess.check_output( + [sys.executable, "-c", + "import sys; sys.stdout.write(sys.stdin.read().upper())"], + stdin=tf) + self.assertIn(b'PEAR', output) + + def test_check_output_input_arg(self): + # check_output() can be called with input set to a string + output = subprocess.check_output( + [sys.executable, "-c", + "import sys; sys.stdout.write(sys.stdin.read().upper())"], + input=b'pear') + self.assertIn(b'PEAR', output) + def test_check_output_stdout_arg(self): - # check_output() function stderr redirected to stdout + # check_output() refuses to accept 'stdout' argument with self.assertRaises(ValueError) as c: output = subprocess.check_output( [sys.executable, "-c", "print('will not be run')"], @@ -171,6 +178,20 @@ class ProcessTestCase(BaseTestCase): self.fail("Expected ValueError when stdout arg supplied.") self.assertIn('stdout', c.exception.args[0]) + def test_check_output_stdin_with_input_arg(self): + # check_output() refuses to accept 'stdin' with 'input' + tf = tempfile.TemporaryFile() + self.addCleanup(tf.close) + tf.write(b'pear') + tf.seek(0) + with self.assertRaises(ValueError) as c: + output = subprocess.check_output( + [sys.executable, "-c", "print('will not be run')"], + stdin=tf, input=b'hare') + self.fail("Expected ValueError when stdin and input args supplied.") + self.assertIn('stdin', c.exception.args[0]) + self.assertIn('input', c.exception.args[0]) + def test_check_output_timeout(self): # check_output() function with timeout arg with self.assertRaises(subprocess.TimeoutExpired) as c: @@ -286,11 +307,8 @@ class ProcessTestCase(BaseTestCase): # Normalize an expected cwd (for Tru64 support). # We can't use os.path.realpath since it doesn't expand Tru64 {memb} # strings. See bug #1063571. - original_cwd = os.getcwd() - os.chdir(cwd) - cwd = os.getcwd() - os.chdir(original_cwd) - return cwd + with support.change_cwd(cwd): + return os.getcwd() # For use in the test_cwd* tests below. def _split_python_path(self): @@ -755,6 +773,7 @@ class ProcessTestCase(BaseTestCase): stdout=subprocess.PIPE, universal_newlines=1) p.stdin.write("line1\n") + p.stdin.flush() self.assertEqual(p.stdout.readline(), "line1\n") p.stdin.write("line3\n") p.stdin.close() @@ -853,8 +872,9 @@ class ProcessTestCase(BaseTestCase): # # UTF-16 and UTF-32-BE are sufficient to check both with BOM and # without, and UTF-16 and UTF-32. + import _bootlocale for encoding in ['utf-16', 'utf-32-be']: - old_getpreferredencoding = locale.getpreferredencoding + old_getpreferredencoding = _bootlocale.getpreferredencoding # Indirectly via io.TextIOWrapper, Popen() defaults to # locale.getpreferredencoding(False) and earlier in Python 3.2 to # locale.getpreferredencoding(). @@ -865,7 +885,7 @@ class ProcessTestCase(BaseTestCase): encoding) args = [sys.executable, '-c', code] try: - locale.getpreferredencoding = getpreferredencoding + _bootlocale.getpreferredencoding = getpreferredencoding # We set stdin to be non-None because, as of this writing, # a different code path is used when the number of pipes is # zero or one. @@ -874,7 +894,7 @@ class ProcessTestCase(BaseTestCase): stdout=subprocess.PIPE) stdout, stderr = popen.communicate(input='') finally: - locale.getpreferredencoding = old_getpreferredencoding + _bootlocale.getpreferredencoding = old_getpreferredencoding self.assertEqual(stdout, '1\n2\n3\n4') def test_no_leaking(self): @@ -975,6 +995,39 @@ class ProcessTestCase(BaseTestCase): p = subprocess.Popen([sys.executable, "-c", "pass"], bufsize=None) self.assertEqual(p.wait(), 0) + def _test_bufsize_equal_one(self, line, expected, universal_newlines): + # subprocess may deadlock with bufsize=1, see issue #21332 + with subprocess.Popen([sys.executable, "-c", "import sys;" + "sys.stdout.write(sys.stdin.readline());" + "sys.stdout.flush()"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + bufsize=1, + universal_newlines=universal_newlines) as p: + p.stdin.write(line) # expect that it flushes the line in text mode + os.close(p.stdin.fileno()) # close it without flushing the buffer + read_line = p.stdout.readline() + try: + p.stdin.close() + except OSError: + pass + p.stdin = None + self.assertEqual(p.returncode, 0) + self.assertEqual(read_line, expected) + + def test_bufsize_equal_one_text_mode(self): + # line is flushed in text mode with bufsize=1. + # we should get the full line in return + line = "line\n" + self._test_bufsize_equal_one(line, line, universal_newlines=True) + + def test_bufsize_equal_one_binary_mode(self): + # line is not flushed in binary mode with bufsize=1. + # we should get empty response + line = b'line' + os.linesep.encode() # assume ascii-based locale + self._test_bufsize_equal_one(line, b'', universal_newlines=False) + def test_leaking_fds_on_error(self): # see bug #5179: Popen leaks file descriptors to PIPEs if # the child fails to execute; this will eventually exhaust @@ -982,8 +1035,7 @@ class ProcessTestCase(BaseTestCase): # value for that limit, but Windows has 2048, so we loop # 1024 times (each call leaked two fds). for i in range(1024): - # Windows raises IOError. Others raise OSError. - with self.assertRaises(EnvironmentError) as c: + with self.assertRaises(OSError) as c: subprocess.Popen(['nonexisting_i_hope'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -1021,6 +1073,59 @@ class ProcessTestCase(BaseTestCase): if exc is not None: raise exc + @unittest.skipIf(threading is None, "threading required") + def test_threadsafe_wait(self): + """Issue21291: Popen.wait() needs to be threadsafe for returncode.""" + proc = subprocess.Popen([sys.executable, '-c', + 'import time; time.sleep(12)']) + self.assertEqual(proc.returncode, None) + results = [] + + def kill_proc_timer_thread(): + results.append(('thread-start-poll-result', proc.poll())) + # terminate it from the thread and wait for the result. + proc.kill() + proc.wait() + results.append(('thread-after-kill-and-wait', proc.returncode)) + # this wait should be a no-op given the above. + proc.wait() + results.append(('thread-after-second-wait', proc.returncode)) + + # This is a timing sensitive test, the failure mode is + # triggered when both the main thread and this thread are in + # the wait() call at once. The delay here is to allow the + # main thread to most likely be blocked in its wait() call. + t = threading.Timer(0.2, kill_proc_timer_thread) + t.start() + + if mswindows: + expected_errorcode = 1 + else: + # Should be -9 because of the proc.kill() from the thread. + expected_errorcode = -9 + + # Wait for the process to finish; the thread should kill it + # long before it finishes on its own. Supplying a timeout + # triggers a different code path for better coverage. + proc.wait(timeout=20) + self.assertEqual(proc.returncode, expected_errorcode, + msg="unexpected result in wait from main thread") + + # This should be a no-op with no change in returncode. + proc.wait() + self.assertEqual(proc.returncode, expected_errorcode, + msg="unexpected result in second main wait.") + + t.join() + # Ensure that all of the thread results are as expected. + # When a race condition occurs in wait(), the returncode could + # be set by the wrong thread that doesn't actually have it + # leading to an incorrect value. + self.assertEqual([('thread-start-poll-result', None), + ('thread-after-kill-and-wait', expected_errorcode), + ('thread-after-second-wait', expected_errorcode)], + results) + def test_issue8780(self): # Ensure that stdout is inherited from the parent # if stdout=PIPE is not used @@ -1035,9 +1140,9 @@ class ProcessTestCase(BaseTestCase): def test_handles_closed_on_exception(self): # If CreateProcess exits with an error, ensure the # duplicate output handles are released - ifhandle, ifname = mkstemp() - ofhandle, ofname = mkstemp() - efhandle, efname = mkstemp() + ifhandle, ifname = tempfile.mkstemp() + ofhandle, ofname = tempfile.mkstemp() + efhandle, efname = tempfile.mkstemp() try: subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, stderr=efhandle) @@ -1114,47 +1219,6 @@ class ProcessTestCase(BaseTestCase): fds_after_exception = os.listdir(fd_directory) self.assertEqual(fds_before_popen, fds_after_exception) - -# context manager -class _SuppressCoreFiles(object): - """Try to prevent core files from being created.""" - old_limit = None - - def __enter__(self): - """Try to save previous ulimit, then set it to (0, 0).""" - if resource is not None: - try: - self.old_limit = resource.getrlimit(resource.RLIMIT_CORE) - resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) - except (ValueError, resource.error): - pass - - if sys.platform == 'darwin': - # Check if the 'Crash Reporter' on OSX was configured - # in 'Developer' mode and warn that it will get triggered - # when it is. - # - # This assumes that this context manager is used in tests - # that might trigger the next manager. - value = subprocess.Popen(['/usr/bin/defaults', 'read', - 'com.apple.CrashReporter', 'DialogType'], - stdout=subprocess.PIPE).communicate()[0] - if value.strip() == b'developer': - print("this tests triggers the Crash Reporter, " - "that is intentional", end='') - sys.stdout.flush() - - def __exit__(self, *args): - """Return core file behavior to default.""" - if self.old_limit is None: - return - if resource is not None: - try: - resource.setrlimit(resource.RLIMIT_CORE, self.old_limit) - except (ValueError, resource.error): - pass - - @unittest.skipIf(mswindows, "POSIX specific tests") class POSIXProcessTestCase(BaseTestCase): @@ -1243,7 +1307,7 @@ class POSIXProcessTestCase(BaseTestCase): def test_run_abort(self): # returncode handles signal termination - with _SuppressCoreFiles(): + with support.SuppressCrashReport(): p = subprocess.Popen([sys.executable, "-c", 'import os; os.abort()']) p.wait() @@ -1266,7 +1330,7 @@ class POSIXProcessTestCase(BaseTestCase): try: p = subprocess.Popen([sys.executable, "-c", ""], preexec_fn=raise_it) - except RuntimeError as e: + except subprocess.SubprocessError as e: self.assertTrue( subprocess._posixsubprocess, "Expected a ValueError from the preexec_fn") @@ -1306,9 +1370,10 @@ class POSIXProcessTestCase(BaseTestCase): """Issue16140: Don't double close pipes on preexec error.""" def raise_it(): - raise RuntimeError("force the _execute_child() errpipe_data path.") + raise subprocess.SubprocessError( + "force the _execute_child() errpipe_data path.") - with self.assertRaises(RuntimeError): + with self.assertRaises(subprocess.SubprocessError): self._TestExecuteChildPopen( self, [sys.executable, "-c", "pass"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, @@ -1351,9 +1416,27 @@ class POSIXProcessTestCase(BaseTestCase): if not enabled: gc.disable() + @unittest.skipIf( + sys.platform == 'darwin', 'setrlimit() seems to fail on OS X') + def test_preexec_fork_failure(self): + # The internal code did not preserve the previous exception when + # re-enabling garbage collection + try: + from resource import getrlimit, setrlimit, RLIMIT_NPROC + except ImportError as err: + self.skipTest(err) # RLIMIT_NPROC is specific to Linux and BSD + limits = getrlimit(RLIMIT_NPROC) + [_, hard] = limits + setrlimit(RLIMIT_NPROC, (0, hard)) + self.addCleanup(setrlimit, RLIMIT_NPROC, limits) + # Forking should raise EAGAIN, translated to BlockingIOError + with self.assertRaises(BlockingIOError): + subprocess.call([sys.executable, '-c', ''], + preexec_fn=lambda: None) + def test_args_string(self): # args is a string - fd, fname = mkstemp() + fd, fname = tempfile.mkstemp() # reopen in text mode with open(fd, "w", errors="surrogateescape") as fobj: fobj.write("#!/bin/sh\n") @@ -1398,7 +1481,7 @@ class POSIXProcessTestCase(BaseTestCase): def test_call_string(self): # call() function with string argument on UNIX - fd, fname = mkstemp() + fd, fname = tempfile.mkstemp() # reopen in text mode with open(fd, "w", errors="surrogateescape") as fobj: fobj.write("#!/bin/sh\n") @@ -1507,16 +1590,28 @@ class POSIXProcessTestCase(BaseTestCase): # Terminating a dead process self._kill_dead_process('terminate') + def _save_fds(self, save_fds): + fds = [] + for fd in save_fds: + inheritable = os.get_inheritable(fd) + saved = os.dup(fd) + fds.append((fd, saved, inheritable)) + return fds + + def _restore_fds(self, fds): + for fd, saved, inheritable in fds: + os.dup2(saved, fd, inheritable=inheritable) + os.close(saved) + def check_close_std_fds(self, fds): # Issue #9905: test that subprocess pipes still work properly with # some standard fds closed stdin = 0 - newfds = [] - for a in fds: - b = os.dup(a) - newfds.append(b) - if a == 0: - stdin = b + saved_fds = self._save_fds(fds) + for fd, saved, inheritable in saved_fds: + if fd == 0: + stdin = saved + break try: for fd in fds: os.close(fd) @@ -1531,10 +1626,7 @@ class POSIXProcessTestCase(BaseTestCase): err = support.strip_python_stderr(err) self.assertEqual((out, err), (b'apple', b'orange')) finally: - for b, a in zip(newfds, fds): - os.dup2(b, a) - for b in newfds: - os.close(b) + self._restore_fds(saved_fds) def test_close_fd_0(self): self.check_close_std_fds([0]) @@ -1582,7 +1674,7 @@ class POSIXProcessTestCase(BaseTestCase): def test_remapping_std_fds(self): # open up some temporary files - temps = [mkstemp() for i in range(3)] + temps = [tempfile.mkstemp() for i in range(3)] try: temp_fds = [fd for fd, fname in temps] @@ -1595,7 +1687,7 @@ class POSIXProcessTestCase(BaseTestCase): os.lseek(temp_fds[1], 0, 0) # move the standard file descriptors out of the way - saved_fds = [os.dup(fd) for fd in range(3)] + saved_fds = self._save_fds(range(3)) try: # duplicate the file objects over the standard fd's for fd, temp_fd in enumerate(temp_fds): @@ -1611,10 +1703,7 @@ class POSIXProcessTestCase(BaseTestCase): stderr=temp_fds[0]) p.wait() finally: - # restore the original fd's underneath sys.stdin, etc. - for std, saved in enumerate(saved_fds): - os.dup2(saved, std) - os.close(saved) + self._restore_fds(saved_fds) for fd in temp_fds: os.lseek(fd, 0, 0) @@ -1630,7 +1719,7 @@ class POSIXProcessTestCase(BaseTestCase): def check_swap_fds(self, stdin_no, stdout_no, stderr_no): # open up some temporary files - temps = [mkstemp() for i in range(3)] + temps = [tempfile.mkstemp() for i in range(3)] temp_fds = [fd for fd, fname in temps] try: # unlink the files -- we won't need to reopen them @@ -1638,7 +1727,7 @@ class POSIXProcessTestCase(BaseTestCase): os.unlink(fname) # save a copy of the standard file descriptors - saved_fds = [os.dup(fd) for fd in range(3)] + saved_fds = self._save_fds(range(3)) try: # duplicate the temp files over the standard fd's 0, 1, 2 for fd, temp_fd in enumerate(temp_fds): @@ -1664,9 +1753,7 @@ class POSIXProcessTestCase(BaseTestCase): out = os.read(stdout_no, 1024) err = support.strip_python_stderr(os.read(stderr_no, 1024)) finally: - for std, saved in enumerate(saved_fds): - os.dup2(saved, std) - os.close(saved) + self._restore_fds(saved_fds) self.assertEqual(out, b"got STDIN") self.assertEqual(err, b"err") @@ -1698,40 +1785,47 @@ class POSIXProcessTestCase(BaseTestCase): # Pure Python implementations keeps the message self.assertIsNone(subprocess._posixsubprocess) self.assertEqual(str(err), "surrogate:\uDCff") - except RuntimeError as err: + except subprocess.SubprocessError as err: # _posixsubprocess uses a default message self.assertIsNotNone(subprocess._posixsubprocess) self.assertEqual(str(err), "Exception occurred in preexec_fn.") else: - self.fail("Expected ValueError or RuntimeError") + self.fail("Expected ValueError or subprocess.SubprocessError") def test_undecodable_env(self): for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')): + encoded_value = value.encode("ascii", "surrogateescape") + # test str with surrogates script = "import os; print(ascii(os.getenv(%s)))" % repr(key) env = os.environ.copy() env[key] = value - # Use C locale to get ascii for the locale encoding to force + # Use C locale to get ASCII for the locale encoding to force # surrogate-escaping of \xFF in the child process; otherwise it can # be decoded as-is if the default locale is latin-1. env['LC_ALL'] = 'C' + if sys.platform.startswith("aix"): + # On AIX, the C locale uses the Latin1 encoding + decoded_value = encoded_value.decode("latin1", "surrogateescape") + else: + # On other UNIXes, the C locale uses the ASCII encoding + decoded_value = value stdout = subprocess.check_output( [sys.executable, "-c", script], env=env) stdout = stdout.rstrip(b'\n\r') - self.assertEqual(stdout.decode('ascii'), ascii(value)) + self.assertEqual(stdout.decode('ascii'), ascii(decoded_value)) # test bytes key = key.encode("ascii", "surrogateescape") - value = value.encode("ascii", "surrogateescape") script = "import os; print(ascii(os.getenvb(%s)))" % repr(key) env = os.environ.copy() - env[key] = value + env[key] = encoded_value stdout = subprocess.check_output( [sys.executable, "-c", script], env=env) stdout = stdout.rstrip(b'\n\r') - self.assertEqual(stdout.decode('ascii'), ascii(value)) + self.assertEqual(stdout.decode('ascii'), ascii(encoded_value)) def test_bytes_program(self): abs_program = os.fsencode(sys.executable) @@ -1833,10 +1927,13 @@ class POSIXProcessTestCase(BaseTestCase): open_fds = set(fds) # add a bunch more fds for _ in range(9): - fd = os.open("/dev/null", os.O_RDONLY) + fd = os.open(os.devnull, os.O_RDONLY) self.addCleanup(os.close, fd) open_fds.add(fd) + for fd in open_fds: + os.set_inheritable(fd, True) + p = subprocess.Popen([sys.executable, fd_status], stdout=subprocess.PIPE, close_fds=False) output, ignored = p.communicate() @@ -1867,6 +1964,86 @@ class POSIXProcessTestCase(BaseTestCase): "Some fds not in pass_fds were left open") self.assertIn(1, remaining_fds, "Subprocess failed") + + @unittest.skipIf(sys.platform.startswith("freebsd") and + os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev, + "Requires fdescfs mounted on /dev/fd on FreeBSD.") + def test_close_fds_when_max_fd_is_lowered(self): + """Confirm that issue21618 is fixed (may fail under valgrind).""" + fd_status = support.findfile("fd_status.py", subdir="subprocessdata") + + # This launches the meat of the test in a child process to + # avoid messing with the larger unittest processes maximum + # number of file descriptors. + # This process launches: + # +--> Process that lowers its RLIMIT_NOFILE aftr setting up + # a bunch of high open fds above the new lower rlimit. + # Those are reported via stdout before launching a new + # process with close_fds=False to run the actual test: + # +--> The TEST: This one launches a fd_status.py + # subprocess with close_fds=True so we can find out if + # any of the fds above the lowered rlimit are still open. + p = subprocess.Popen([sys.executable, '-c', textwrap.dedent( + ''' + import os, resource, subprocess, sys, textwrap + open_fds = set() + # Add a bunch more fds to pass down. + for _ in range(40): + fd = os.open(os.devnull, os.O_RDONLY) + open_fds.add(fd) + + # Leave a two pairs of low ones available for use by the + # internal child error pipe and the stdout pipe. + # We also leave 10 more open as some Python buildbots run into + # "too many open files" errors during the test if we do not. + for fd in sorted(open_fds)[:14]: + os.close(fd) + open_fds.remove(fd) + + for fd in open_fds: + #self.addCleanup(os.close, fd) + os.set_inheritable(fd, True) + + max_fd_open = max(open_fds) + + # Communicate the open_fds to the parent unittest.TestCase process. + print(','.join(map(str, sorted(open_fds)))) + sys.stdout.flush() + + rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE) + try: + # 29 is lower than the highest fds we are leaving open. + resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max)) + # Launch a new Python interpreter with our low fd rlim_cur that + # inherits open fds above that limit. It then uses subprocess + # with close_fds=True to get a report of open fds in the child. + # An explicit list of fds to check is passed to fd_status.py as + # letting fd_status rely on its default logic would miss the + # fds above rlim_cur as it normally only checks up to that limit. + subprocess.Popen( + [sys.executable, '-c', + textwrap.dedent(""" + import subprocess, sys + subprocess.Popen([sys.executable, %r] + + [str(x) for x in range({max_fd})], + close_fds=True).wait() + """.format(max_fd=max_fd_open+1))], + close_fds=False).wait() + finally: + resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max)) + ''' % fd_status)], stdout=subprocess.PIPE) + + output, unused_stderr = p.communicate() + output_lines = output.splitlines() + self.assertEqual(len(output_lines), 2, + msg="expected exactly two lines of output:\n%r" % output) + opened_fds = set(map(int, output_lines[0].strip().split(b','))) + remaining_fds = set(map(int, output_lines[1].strip().split(b','))) + + self.assertFalse(remaining_fds & opened_fds, + msg="Some fds were left open.") + + # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file # descriptor of a pipe closed in the parent process is valid in the # child process according to fstat(), but the mode of the file @@ -1881,6 +2058,8 @@ class POSIXProcessTestCase(BaseTestCase): fds = os.pipe() self.addCleanup(os.close, fds[0]) self.addCleanup(os.close, fds[1]) + os.set_inheritable(fds[0], True) + os.set_inheritable(fds[1], True) open_fds.update(fds) for fd in open_fds: @@ -1903,6 +2082,32 @@ class POSIXProcessTestCase(BaseTestCase): close_fds=False, pass_fds=(fd, ))) self.assertIn('overriding close_fds', str(context.warning)) + def test_pass_fds_inheritable(self): + script = support.findfile("fd_status.py", subdir="subprocessdata") + + inheritable, non_inheritable = os.pipe() + self.addCleanup(os.close, inheritable) + self.addCleanup(os.close, non_inheritable) + os.set_inheritable(inheritable, True) + os.set_inheritable(non_inheritable, False) + pass_fds = (inheritable, non_inheritable) + args = [sys.executable, script] + args += list(map(str, pass_fds)) + + p = subprocess.Popen(args, + stdout=subprocess.PIPE, close_fds=True, + pass_fds=pass_fds) + output, ignored = p.communicate() + fds = set(map(int, output.split(b','))) + + # the inheritable file descriptor must be inherited, so its inheritable + # flag must be set in the child process after fork() and before exec() + self.assertEqual(fds, set(pass_fds), "output=%a" % output) + + # inheritable flag must not be changed in the parent process + self.assertEqual(os.get_inheritable(inheritable), True) + self.assertEqual(os.get_inheritable(non_inheritable), False) + def test_stdout_stdin_are_single_inout_fd(self): with io.open(os.devnull, "r+") as inout: p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"], @@ -1990,7 +2195,7 @@ class POSIXProcessTestCase(BaseTestCase): # let some time for the process to exit, and create a new Popen: this # should trigger the wait() of p time.sleep(0.2) - with self.assertRaises(EnvironmentError) as c: + with self.assertRaises(OSError) as c: with subprocess.Popen(['nonexisting_i_hope'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc: @@ -2016,6 +2221,39 @@ class POSIXProcessTestCase(BaseTestCase): self.assertNotIn(fd, remaining_fds) + @support.cpython_only + def test_fork_exec(self): + # Issue #22290: fork_exec() must not crash on memory allocation failure + # or other errors + import _posixsubprocess + gc_enabled = gc.isenabled() + try: + # Use a preexec function and enable the garbage collector + # to force fork_exec() to re-enable the garbage collector + # on error. + func = lambda: None + gc.enable() + + executable_list = "exec" # error: must be a sequence + + for args, exe_list, cwd, env_list in ( + (123, [b"exe"], None, [b"env"]), + ([b"arg"], 123, None, [b"env"]), + ([b"arg"], [b"exe"], 123, [b"env"]), + ([b"arg"], [b"exe"], None, 123), + ): + with self.assertRaises(TypeError): + _posixsubprocess.fork_exec( + args, exe_list, + True, [], cwd, env_list, + -1, -1, -1, -1, + 1, 2, 3, 4, + True, True, func) + finally: + if not gc_enabled: + gc.disable() + + @unittest.skipUnless(mswindows, "Windows specific tests") class Win32ProcessTestCase(BaseTestCase): @@ -2175,15 +2413,16 @@ class CommandTests(unittest.TestCase): os.rmdir(dir) -@unittest.skipUnless(getattr(subprocess, '_has_poll', False), - "poll system call not supported") +@unittest.skipUnless(hasattr(selectors, 'PollSelector'), + "Test needs selectors.PollSelector") class ProcessTestCaseNoPoll(ProcessTestCase): def setUp(self): - subprocess._has_poll = False + self.orig_selector = subprocess._PopenSelector + subprocess._PopenSelector = selectors.SelectSelector ProcessTestCase.setUp(self) def tearDown(self): - subprocess._has_poll = True + subprocess._PopenSelector = self.orig_selector ProcessTestCase.tearDown(self) @@ -2211,7 +2450,7 @@ class CommandsWithSpaces (BaseTestCase): def setUp(self): super().setUp() - f, fname = mkstemp(".py", "te st") + f, fname = tempfile.mkstemp(".py", "te st") self.fname = fname.lower () os.write(f, b"import sys;" b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" @@ -2287,6 +2526,22 @@ class ContextManagerTests(BaseTestCase): stderr=subprocess.PIPE) as proc: pass + def test_broken_pipe_cleanup(self): + """Broken pipe error should not prevent wait() (Issue 21619)""" + proc = subprocess.Popen([sys.executable, '-c', 'pass'], + stdin=subprocess.PIPE, + bufsize=support.PIPE_MAX_SIZE*2) + proc = proc.__enter__() + # Prepare to send enough data to overflow any OS pipe buffering and + # guarantee a broken pipe error. Data is held in BufferedWriter + # buffer until closed. + proc.stdin.write(b'x' * support.PIPE_MAX_SIZE) + self.assertIsNone(proc.returncode) + # EPIPE expected under POSIX; EINVAL under Windows + self.assertRaises(OSError, proc.__exit__, None, None, None) + self.assertEqual(proc.returncode, 0) + self.assertTrue(proc.stdin.closed) + def test_main(): unit_tests = (ProcessTestCase, |
