diff options
author | Gregory P. Smith <greg@krypto.org> | 2012-02-16 08:35:43 (GMT) |
---|---|---|
committer | Gregory P. Smith <greg@krypto.org> | 2012-02-16 08:35:43 (GMT) |
commit | 5b791fb548ac5ec6f17cf652d6d96f99d8e0f4ef (patch) | |
tree | f7aa47786a1f9ba1f4f2b34b3bd5611352f06a2d /Lib/test/test_pty.py | |
parent | da57819efa3283a2bb86ad2629286bc5477ab0e0 (diff) | |
parent | 58e7c1dc024345bbd1a37dc9acbaca6af929d527 (diff) | |
download | cpython-5b791fb548ac5ec6f17cf652d6d96f99d8e0f4ef.zip cpython-5b791fb548ac5ec6f17cf652d6d96f99d8e0f4ef.tar.gz cpython-5b791fb548ac5ec6f17cf652d6d96f99d8e0f4ef.tar.bz2 |
Issue #2489: Fix bug in _copy loop that could consume 100% cpu on EOF.
Diffstat (limited to 'Lib/test/test_pty.py')
-rw-r--r-- | Lib/test/test_pty.py | 91 |
1 files changed, 90 insertions, 1 deletions
diff --git a/Lib/test/test_pty.py b/Lib/test/test_pty.py index c6fc5e7..4f1251c 100644 --- a/Lib/test/test_pty.py +++ b/Lib/test/test_pty.py @@ -8,7 +8,9 @@ import errno import pty import os import sys +import select import signal +import socket import unittest TEST_STRING_1 = b"I wish to buy a fish license.\n" @@ -194,9 +196,96 @@ class PtyTest(unittest.TestCase): # pty.fork() passed. + +class SmallPtyTests(unittest.TestCase): + """These tests don't spawn children or hang.""" + + def setUp(self): + self.orig_stdin_fileno = pty.STDIN_FILENO + self.orig_stdout_fileno = pty.STDOUT_FILENO + self.orig_pty_select = pty.select + self.fds = [] # A list of file descriptors to close. + self.select_rfds_lengths = [] + self.select_rfds_results = [] + + def tearDown(self): + pty.STDIN_FILENO = self.orig_stdin_fileno + pty.STDOUT_FILENO = self.orig_stdout_fileno + pty.select = self.orig_pty_select + for fd in self.fds: + try: + os.close(fd) + except: + pass + + def _pipe(self): + pipe_fds = os.pipe() + self.fds.extend(pipe_fds) + return pipe_fds + + def _mock_select(self, rfds, wfds, xfds): + # This will raise IndexError when no more expected calls exist. + self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds)) + return self.select_rfds_results.pop(0), [], [] + + def test__copy_to_each(self): + """Test the normal data case on both master_fd and stdin.""" + read_from_stdout_fd, mock_stdout_fd = self._pipe() + pty.STDOUT_FILENO = mock_stdout_fd + mock_stdin_fd, write_to_stdin_fd = self._pipe() + pty.STDIN_FILENO = mock_stdin_fd + socketpair = socket.socketpair() + masters = [s.fileno() for s in socketpair] + self.fds.extend(masters) + + # Feed data. Smaller than PIPEBUF. These writes will not block. + os.write(masters[1], b'from master') + os.write(write_to_stdin_fd, b'from stdin') + + # Expect two select calls, the last one will cause IndexError + pty.select = self._mock_select + self.select_rfds_lengths.append(2) + self.select_rfds_results.append([mock_stdin_fd, masters[0]]) + self.select_rfds_lengths.append(2) + + with self.assertRaises(IndexError): + pty._copy(masters[0]) + + # Test that the right data went to the right places. + rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0] + self.assertEqual([read_from_stdout_fd, masters[1]], rfds) + self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') + self.assertEqual(os.read(masters[1], 20), b'from stdin') + + def test__copy_eof_on_all(self): + """Test the empty read EOF case on both master_fd and stdin.""" + read_from_stdout_fd, mock_stdout_fd = self._pipe() + pty.STDOUT_FILENO = mock_stdout_fd + mock_stdin_fd, write_to_stdin_fd = self._pipe() + pty.STDIN_FILENO = mock_stdin_fd + socketpair = socket.socketpair() + masters = [s.fileno() for s in socketpair] + self.fds.extend(masters) + + os.close(masters[1]) + socketpair[1].close() + os.close(write_to_stdin_fd) + + # Expect two select calls, the last one will cause IndexError + pty.select = self._mock_select + self.select_rfds_lengths.append(2) + self.select_rfds_results.append([mock_stdin_fd, masters[0]]) + # We expect that both fds were removed from the fds list as they + # both encountered an EOF before the second select call. + self.select_rfds_lengths.append(0) + + with self.assertRaises(IndexError): + pty._copy(masters[0]) + + def test_main(verbose=None): try: - run_unittest(PtyTest) + run_unittest(SmallPtyTests, PtyTest) finally: reap_children() |