summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_pty.py
diff options
context:
space:
mode:
authorSoumendra Ganguly <67527439+8vasu@users.noreply.github.com>2020-11-25 13:41:25 (GMT)
committerGitHub <noreply@github.com>2020-11-25 13:41:25 (GMT)
commitc13d89955d9a2942c6355d6839d7096323244136 (patch)
tree1a3cc01a9a111caa9eaa550963f9319e37564256 /Lib/test/test_pty.py
parentbe319c0c108e308fb7ed6ec9522e969fdffd1253 (diff)
downloadcpython-c13d89955d9a2942c6355d6839d7096323244136.zip
cpython-c13d89955d9a2942c6355d6839d7096323244136.tar.gz
cpython-c13d89955d9a2942c6355d6839d7096323244136.tar.bz2
bpo-41818: Updated tests for the standard pty library (GH-22962)
Diffstat (limited to 'Lib/test/test_pty.py')
-rw-r--r--Lib/test/test_pty.py122
1 files changed, 111 insertions, 11 deletions
diff --git a/Lib/test/test_pty.py b/Lib/test/test_pty.py
index 7ca0557..7de5688 100644
--- a/Lib/test/test_pty.py
+++ b/Lib/test/test_pty.py
@@ -14,9 +14,22 @@ import socket
import io # readline
import unittest
+import struct
+import tty
+import fcntl
+import platform
+import warnings
+
TEST_STRING_1 = b"I wish to buy a fish license.\n"
TEST_STRING_2 = b"For my pet fish, Eric.\n"
+try:
+ _TIOCGWINSZ = tty.TIOCGWINSZ
+ _TIOCSWINSZ = tty.TIOCSWINSZ
+ _HAVE_WINSZ = True
+except AttributeError:
+ _HAVE_WINSZ = False
+
if verbose:
def debug(msg):
print(msg)
@@ -60,6 +73,27 @@ def _readline(fd):
reader = io.FileIO(fd, mode='rb', closefd=False)
return reader.readline()
+def expectedFailureIfStdinIsTTY(fun):
+ # avoid isatty() for now
+ try:
+ tty.tcgetattr(pty.STDIN_FILENO)
+ return unittest.expectedFailure(fun)
+ except tty.error:
+ pass
+ return fun
+
+def expectedFailureOnBSD(fun):
+ PLATFORM = platform.system()
+ if PLATFORM.endswith("BSD") or PLATFORM == "Darwin":
+ return unittest.expectedFailure(fun)
+ return fun
+
+def _get_term_winsz(fd):
+ s = struct.pack("HHHH", 0, 0, 0, 0)
+ return fcntl.ioctl(fd, _TIOCGWINSZ, s)
+
+def _set_term_winsz(fd, winsz):
+ fcntl.ioctl(fd, _TIOCSWINSZ, winsz)
# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
@@ -78,6 +112,20 @@ class PtyTest(unittest.TestCase):
self.addCleanup(signal.alarm, 0)
signal.alarm(10)
+ # Save original stdin window size
+ self.stdin_rows = None
+ self.stdin_cols = None
+ if _HAVE_WINSZ:
+ try:
+ stdin_dim = os.get_terminal_size(pty.STDIN_FILENO)
+ self.stdin_rows = stdin_dim.lines
+ self.stdin_cols = stdin_dim.columns
+ old_stdin_winsz = struct.pack("HHHH", self.stdin_rows,
+ self.stdin_cols, 0, 0)
+ self.addCleanup(_set_term_winsz, pty.STDIN_FILENO, old_stdin_winsz)
+ except OSError:
+ pass
+
def handle_sig(self, sig, frame):
self.fail("isatty hung")
@@ -86,26 +134,65 @@ class PtyTest(unittest.TestCase):
# bpo-38547: if the process is the session leader, os.close(master_fd)
# of "master_fd, slave_name = pty.master_open()" raises SIGHUP
# signal: just ignore the signal.
+ #
+ # NOTE: the above comment is from an older version of the test;
+ # master_open() is not being used anymore.
pass
- def test_basic(self):
+ @expectedFailureIfStdinIsTTY
+ def test_openpty(self):
try:
- debug("Calling master_open()")
- master_fd, slave_name = pty.master_open()
- debug("Got master_fd '%d', slave_name '%s'" %
- (master_fd, slave_name))
- debug("Calling slave_open(%r)" % (slave_name,))
- slave_fd = pty.slave_open(slave_name)
- debug("Got slave_fd '%d'" % slave_fd)
+ mode = tty.tcgetattr(pty.STDIN_FILENO)
+ except tty.error:
+ # not a tty or bad/closed fd
+ debug("tty.tcgetattr(pty.STDIN_FILENO) failed")
+ mode = None
+
+ new_stdin_winsz = None
+ if self.stdin_rows != None and self.stdin_cols != None:
+ try:
+ debug("Setting pty.STDIN_FILENO window size")
+ # Set number of columns and rows to be the
+ # floors of 1/5 of respective original values
+ target_stdin_winsz = struct.pack("HHHH", self.stdin_rows//5,
+ self.stdin_cols//5, 0, 0)
+ _set_term_winsz(pty.STDIN_FILENO, target_stdin_winsz)
+
+ # Were we able to set the window size
+ # of pty.STDIN_FILENO successfully?
+ new_stdin_winsz = _get_term_winsz(pty.STDIN_FILENO)
+ self.assertEqual(new_stdin_winsz, target_stdin_winsz,
+ "pty.STDIN_FILENO window size unchanged")
+ except OSError:
+ warnings.warn("Failed to set pty.STDIN_FILENO window size")
+ pass
+
+ try:
+ debug("Calling pty.openpty()")
+ try:
+ master_fd, slave_fd = pty.openpty(mode, new_stdin_winsz)
+ except TypeError:
+ master_fd, slave_fd = pty.openpty()
+ debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")
except OSError:
# " An optional feature could not be imported " ... ?
raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.")
- self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
+ self.assertTrue(os.isatty(slave_fd), "slave_fd is not a tty")
+
+ if mode:
+ self.assertEqual(tty.tcgetattr(slave_fd), mode,
+ "openpty() failed to set slave termios")
+ if new_stdin_winsz:
+ self.assertEqual(_get_term_winsz(slave_fd), new_stdin_winsz,
+ "openpty() failed to set slave window size")
# Solaris requires reading the fd before anything is returned.
# My guess is that since we open and close the slave fd
# in master_open(), we need to read the EOF.
+ #
+ # NOTE: the above comment is from an older version of the test;
+ # master_open() is not being used anymore.
# Ensure the fd is non-blocking in case there's nothing to read.
blocking = os.get_blocking(master_fd)
@@ -222,8 +309,20 @@ class PtyTest(unittest.TestCase):
os.close(master_fd)
- # pty.fork() passed.
+ @expectedFailureOnBSD
+ def test_master_read(self):
+ debug("Calling pty.openpty()")
+ master_fd, slave_fd = pty.openpty()
+ debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")
+
+ debug("Closing slave_fd")
+ os.close(slave_fd)
+ debug("Reading from master_fd")
+ with self.assertRaises(OSError):
+ os.read(master_fd, 1)
+
+ os.close(master_fd)
class SmallPtyTests(unittest.TestCase):
"""These tests don't spawn children or hang."""
@@ -262,8 +361,9 @@ class SmallPtyTests(unittest.TestCase):
self.files.extend(socketpair)
return socketpair
- def _mock_select(self, rfds, wfds, xfds):
+ def _mock_select(self, rfds, wfds, xfds, timeout=0):
# This will raise IndexError when no more expected calls exist.
+ # This ignores the timeout
self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
return self.select_rfds_results.pop(0), [], []