summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/subprocess.py131
-rw-r--r--Lib/test/test_subprocess.py18
2 files changed, 113 insertions, 36 deletions
diff --git a/Lib/subprocess.py b/Lib/subprocess.py
index 6daeb04..c8dcb56 100644
--- a/Lib/subprocess.py
+++ b/Lib/subprocess.py
@@ -413,6 +413,7 @@ if mswindows:
error = IOError
else:
import select
+ _has_poll = hasattr(select, 'poll')
import errno
import fcntl
import pickle
@@ -425,12 +426,10 @@ try:
except:
MAXFD = 256
-# True/False does not exist on 2.2.0
-#try:
-# False
-#except NameError:
-# False = 0
-# True = 1
+# When select or poll has indicated that the file is writable,
+# we can write up to _PIPE_BUF bytes without risk of blocking.
+# POSIX defines PIPE_BUF as >= 512.
+_PIPE_BUF = getattr(select, 'PIPE_BUF', 512)
_active = []
@@ -1191,19 +1190,100 @@ class Popen(object):
def _communicate(self, input):
- read_set = []
- write_set = []
- stdout = None # Return
- stderr = None # Return
-
if self.stdin:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
self.stdin.flush()
- if input:
- write_set.append(self.stdin)
- else:
+ if not input:
self.stdin.close()
+
+ if _has_poll:
+ stdout, stderr = self._communicate_with_poll(input)
+ else:
+ stdout, stderr = self._communicate_with_select(input)
+
+ # All data exchanged. Translate lists into strings.
+ if stdout is not None:
+ stdout = ''.join(stdout)
+ if stderr is not None:
+ stderr = ''.join(stderr)
+
+ # Translate newlines, if requested. We cannot let the file
+ # object do the translation: It is based on stdio, which is
+ # impossible to combine with select (unless forcing no
+ # buffering).
+ if self.universal_newlines and hasattr(file, 'newlines'):
+ if stdout:
+ stdout = self._translate_newlines(stdout)
+ if stderr:
+ stderr = self._translate_newlines(stderr)
+
+ self.wait()
+ return (stdout, stderr)
+
+
+ def _communicate_with_poll(self, input):
+ stdout = None # Return
+ stderr = None # Return
+ fd2file = {}
+ fd2output = {}
+
+ poller = select.poll()
+ def register_and_append(file_obj, eventmask):
+ poller.register(file_obj.fileno(), eventmask)
+ fd2file[file_obj.fileno()] = file_obj
+
+ def close_unregister_and_remove(fd):
+ poller.unregister(fd)
+ fd2file[fd].close()
+ fd2file.pop(fd)
+
+ if self.stdin and input:
+ register_and_append(self.stdin, select.POLLOUT)
+
+ select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
+ if self.stdout:
+ register_and_append(self.stdout, select_POLLIN_POLLPRI)
+ fd2output[self.stdout.fileno()] = stdout = []
+ if self.stderr:
+ register_and_append(self.stderr, select_POLLIN_POLLPRI)
+ fd2output[self.stderr.fileno()] = stderr = []
+
+ input_offset = 0
+ while fd2file:
+ try:
+ ready = poller.poll()
+ except select.error, e:
+ if e.args[0] == errno.EINTR:
+ continue
+ raise
+
+ for fd, mode in ready:
+ if mode & select.POLLOUT:
+ chunk = input[input_offset : input_offset + _PIPE_BUF]
+ input_offset += os.write(fd, chunk)
+ if input_offset >= len(input):
+ close_unregister_and_remove(fd)
+ elif mode & select_POLLIN_POLLPRI:
+ data = os.read(fd, 4096)
+ if not data:
+ close_unregister_and_remove(fd)
+ fd2output[fd].append(data)
+ else:
+ # Ignore hang up or errors.
+ close_unregister_and_remove(fd)
+
+ return (stdout, stderr)
+
+
+ def _communicate_with_select(self, input):
+ read_set = []
+ write_set = []
+ stdout = None # Return
+ stderr = None # Return
+
+ if self.stdin and input:
+ write_set.append(self.stdin)
if self.stdout:
read_set.append(self.stdout)
stdout = []
@@ -1221,10 +1301,7 @@ class Popen(object):
raise
if self.stdin in wlist:
- # When select has indicated that the file is writable,
- # we can write up to PIPE_BUF bytes without risk
- # blocking. POSIX defines PIPE_BUF >= 512
- chunk = input[input_offset : input_offset + 512]
+ chunk = input[input_offset : input_offset + _PIPE_BUF]
bytes_written = os.write(self.stdin.fileno(), chunk)
input_offset += bytes_written
if input_offset >= len(input):
@@ -1245,25 +1322,9 @@ class Popen(object):
read_set.remove(self.stderr)
stderr.append(data)
- # All data exchanged. Translate lists into strings.
- if stdout is not None:
- stdout = ''.join(stdout)
- if stderr is not None:
- stderr = ''.join(stderr)
-
- # Translate newlines, if requested. We cannot let the file
- # object do the translation: It is based on stdio, which is
- # impossible to combine with select (unless forcing no
- # buffering).
- if self.universal_newlines and hasattr(file, 'newlines'):
- if stdout:
- stdout = self._translate_newlines(stdout)
- if stderr:
- stderr = self._translate_newlines(stderr)
-
- self.wait()
return (stdout, stderr)
+
def send_signal(self, sig):
"""Send a signal to the process
"""
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
index c441cdc..5add023 100644
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -766,8 +766,24 @@ class ProcessTestCase(unittest.TestCase):
p.terminate()
self.assertNotEqual(p.wait(), 0)
+
+unit_tests = [ProcessTestCase]
+
+if subprocess._has_poll:
+ class ProcessTestCaseNoPoll(ProcessTestCase):
+ def setUp(self):
+ subprocess._has_poll = False
+ ProcessTestCase.setUp(self)
+
+ def tearDown(self):
+ subprocess._has_poll = True
+ ProcessTestCase.tearDown(self)
+
+ unit_tests.append(ProcessTestCaseNoPoll)
+
+
def test_main():
- test_support.run_unittest(ProcessTestCase)
+ test_support.run_unittest(*unit_tests)
if hasattr(test_support, "reap_children"):
test_support.reap_children()