From dd7ca24eb5c68ae72df13b8b987bff7b90235044 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith" Date: Sat, 4 Jul 2009 01:49:29 +0000 Subject: Use select.poll() in subprocess, when available, rather than select() so that it does not fail when file descriptors are large. Fixes issue3392. Patch largely contributed by Frank Chu (fpmc) with some improvements by me. See http://bugs.python.org/issue3392. Candidate for backporting to release26-maint as it is a bug fix and changes no public API. --- Lib/subprocess.py | 131 ++++++++++++++++++++++++++++++++------------ Lib/test/test_subprocess.py | 18 +++++- 2 files changed, 113 insertions(+), 36 deletions(-) diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 6daeb04..c8dcb56 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -413,6 +413,7 @@ if mswindows: error = IOError else: import select + _has_poll = hasattr(select, 'poll') import errno import fcntl import pickle @@ -425,12 +426,10 @@ try: except: MAXFD = 256 -# True/False does not exist on 2.2.0 -#try: -# False -#except NameError: -# False = 0 -# True = 1 +# When select or poll has indicated that the file is writable, +# we can write up to _PIPE_BUF bytes without risk of blocking. +# POSIX defines PIPE_BUF as >= 512. +_PIPE_BUF = getattr(select, 'PIPE_BUF', 512) _active = [] @@ -1191,19 +1190,100 @@ class Popen(object): def _communicate(self, input): - read_set = [] - write_set = [] - stdout = None # Return - stderr = None # Return - if self.stdin: # Flush stdio buffer. This might block, if the user has # been writing to .stdin in an uncontrolled fashion. self.stdin.flush() - if input: - write_set.append(self.stdin) - else: + if not input: self.stdin.close() + + if _has_poll: + stdout, stderr = self._communicate_with_poll(input) + else: + stdout, stderr = self._communicate_with_select(input) + + # All data exchanged. Translate lists into strings. + if stdout is not None: + stdout = ''.join(stdout) + if stderr is not None: + stderr = ''.join(stderr) + + # Translate newlines, if requested. We cannot let the file + # object do the translation: It is based on stdio, which is + # impossible to combine with select (unless forcing no + # buffering). + if self.universal_newlines and hasattr(file, 'newlines'): + if stdout: + stdout = self._translate_newlines(stdout) + if stderr: + stderr = self._translate_newlines(stderr) + + self.wait() + return (stdout, stderr) + + + def _communicate_with_poll(self, input): + stdout = None # Return + stderr = None # Return + fd2file = {} + fd2output = {} + + poller = select.poll() + def register_and_append(file_obj, eventmask): + poller.register(file_obj.fileno(), eventmask) + fd2file[file_obj.fileno()] = file_obj + + def close_unregister_and_remove(fd): + poller.unregister(fd) + fd2file[fd].close() + fd2file.pop(fd) + + if self.stdin and input: + register_and_append(self.stdin, select.POLLOUT) + + select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI + if self.stdout: + register_and_append(self.stdout, select_POLLIN_POLLPRI) + fd2output[self.stdout.fileno()] = stdout = [] + if self.stderr: + register_and_append(self.stderr, select_POLLIN_POLLPRI) + fd2output[self.stderr.fileno()] = stderr = [] + + input_offset = 0 + while fd2file: + try: + ready = poller.poll() + except select.error, e: + if e.args[0] == errno.EINTR: + continue + raise + + for fd, mode in ready: + if mode & select.POLLOUT: + chunk = input[input_offset : input_offset + _PIPE_BUF] + input_offset += os.write(fd, chunk) + if input_offset >= len(input): + close_unregister_and_remove(fd) + elif mode & select_POLLIN_POLLPRI: + data = os.read(fd, 4096) + if not data: + close_unregister_and_remove(fd) + fd2output[fd].append(data) + else: + # Ignore hang up or errors. + close_unregister_and_remove(fd) + + return (stdout, stderr) + + + def _communicate_with_select(self, input): + read_set = [] + write_set = [] + stdout = None # Return + stderr = None # Return + + if self.stdin and input: + write_set.append(self.stdin) if self.stdout: read_set.append(self.stdout) stdout = [] @@ -1221,10 +1301,7 @@ class Popen(object): raise if self.stdin in wlist: - # When select has indicated that the file is writable, - # we can write up to PIPE_BUF bytes without risk - # blocking. POSIX defines PIPE_BUF >= 512 - chunk = input[input_offset : input_offset + 512] + chunk = input[input_offset : input_offset + _PIPE_BUF] bytes_written = os.write(self.stdin.fileno(), chunk) input_offset += bytes_written if input_offset >= len(input): @@ -1245,25 +1322,9 @@ class Popen(object): read_set.remove(self.stderr) stderr.append(data) - # All data exchanged. Translate lists into strings. - if stdout is not None: - stdout = ''.join(stdout) - if stderr is not None: - stderr = ''.join(stderr) - - # Translate newlines, if requested. We cannot let the file - # object do the translation: It is based on stdio, which is - # impossible to combine with select (unless forcing no - # buffering). - if self.universal_newlines and hasattr(file, 'newlines'): - if stdout: - stdout = self._translate_newlines(stdout) - if stderr: - stderr = self._translate_newlines(stderr) - - self.wait() return (stdout, stderr) + def send_signal(self, sig): """Send a signal to the process """ diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index c441cdc..5add023 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -766,8 +766,24 @@ class ProcessTestCase(unittest.TestCase): p.terminate() self.assertNotEqual(p.wait(), 0) + +unit_tests = [ProcessTestCase] + +if subprocess._has_poll: + class ProcessTestCaseNoPoll(ProcessTestCase): + def setUp(self): + subprocess._has_poll = False + ProcessTestCase.setUp(self) + + def tearDown(self): + subprocess._has_poll = True + ProcessTestCase.tearDown(self) + + unit_tests.append(ProcessTestCaseNoPoll) + + def test_main(): - test_support.run_unittest(ProcessTestCase) + test_support.run_unittest(*unit_tests) if hasattr(test_support, "reap_children"): test_support.reap_children() -- cgit v0.12