summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_telnetlib.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_telnetlib.py')
-rw-r--r--Lib/test/test_telnetlib.py127
1 files changed, 106 insertions, 21 deletions
diff --git a/Lib/test/test_telnetlib.py b/Lib/test/test_telnetlib.py
index c1eea06..95eebf3 100644
--- a/Lib/test/test_telnetlib.py
+++ b/Lib/test/test_telnetlib.py
@@ -1,12 +1,13 @@
import socket
import select
-import threading
import telnetlib
import time
import contextlib
+import unittest
from unittest import TestCase
from test import support
+threading = support.import_module('threading')
HOST = support.HOST
@@ -15,28 +16,25 @@ def server(evt, serv):
evt.set()
try:
conn, addr = serv.accept()
+ conn.close()
except socket.timeout:
pass
finally:
serv.close()
- conn.close()
- evt.set()
class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.settimeout(3)
+ self.sock.settimeout(60) # Safety net. Look issue 11812
self.port = support.bind_port(self.sock)
self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
+ self.thread.setDaemon(True)
self.thread.start()
self.evt.wait()
- self.evt.clear()
- time.sleep(.1)
def tearDown(self):
- self.evt.wait()
self.thread.join()
def testBasic(self):
@@ -48,7 +46,7 @@ class GeneralTests(TestCase):
self.assertTrue(socket.getdefaulttimeout() is None)
socket.setdefaulttimeout(30)
try:
- telnet = telnetlib.Telnet("localhost", self.port)
+ telnet = telnetlib.Telnet(HOST, self.port)
finally:
socket.setdefaulttimeout(None)
self.assertEqual(telnet.sock.gettimeout(), 30)
@@ -66,20 +64,20 @@ class GeneralTests(TestCase):
telnet.sock.close()
def testTimeoutValue(self):
- telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
+ telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close()
def testTimeoutOpen(self):
telnet = telnetlib.Telnet()
- telnet.open("localhost", self.port, timeout=30)
+ telnet.open(HOST, self.port, timeout=30)
self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close()
class SocketStub(object):
''' a socket proxy that re-defines sendall() '''
- def __init__(self, reads=[]):
- self.reads = reads
+ def __init__(self, reads=()):
+ self.reads = list(reads) # Intentionally make a copy.
self.writes = []
self.block = False
def sendall(self, data):
@@ -105,7 +103,7 @@ class TelnetAlike(telnetlib.Telnet):
self._messages += out.getvalue()
return
-def new_select(*s_args):
+def mock_select(*s_args):
block = False
for l in s_args:
for fob in l:
@@ -116,6 +114,30 @@ def new_select(*s_args):
else:
return s_args
+class MockPoller(object):
+ test_case = None # Set during TestCase setUp.
+
+ def __init__(self):
+ self._file_objs = []
+
+ def register(self, fd, eventmask):
+ self.test_case.assertTrue(hasattr(fd, 'fileno'), fd)
+ self.test_case.assertEqual(eventmask, select.POLLIN|select.POLLPRI)
+ self._file_objs.append(fd)
+
+ def poll(self, timeout=None):
+ block = False
+ for fob in self._file_objs:
+ if isinstance(fob, TelnetAlike):
+ block = fob.sock.block
+ if block:
+ return []
+ else:
+ return zip(self._file_objs, [select.POLLIN]*len(self._file_objs))
+
+ def unregister(self, fd):
+ self._file_objs.remove(fd)
+
@contextlib.contextmanager
def test_socket(reads):
def new_conn(*ignored):
@@ -128,7 +150,7 @@ def test_socket(reads):
socket.create_connection = old_conn
return
-def test_telnet(reads=[], cls=TelnetAlike):
+def test_telnet(reads=(), cls=TelnetAlike, use_poll=None):
''' return a telnetlib.Telnet object that uses a SocketStub with
reads queued up to be read '''
for x in reads:
@@ -136,15 +158,31 @@ def test_telnet(reads=[], cls=TelnetAlike):
with test_socket(reads):
telnet = cls('dummy', 0)
telnet._messages = '' # debuglevel output
+ if use_poll is not None:
+ if use_poll and not telnet._has_poll:
+ raise unittest.SkipTest('select.poll() required.')
+ telnet._has_poll = use_poll
return telnet
-class ReadTests(TestCase):
+
+class ExpectAndReadTestCase(TestCase):
def setUp(self):
self.old_select = select.select
- select.select = new_select
+ select.select = mock_select
+ self.old_poll = False
+ if hasattr(select, 'poll'):
+ self.old_poll = select.poll
+ select.poll = MockPoller
+ MockPoller.test_case = self
+
def tearDown(self):
+ if self.old_poll:
+ MockPoller.test_case = None
+ select.poll = self.old_poll
select.select = self.old_select
+
+class ReadTests(ExpectAndReadTestCase):
def test_read_until(self):
"""
read_until(expected, timeout=None)
@@ -161,6 +199,22 @@ class ReadTests(TestCase):
data = telnet.read_until(b'match')
self.assertEqual(data, expect)
+ def test_read_until_with_poll(self):
+ """Use select.poll() to implement telnet.read_until()."""
+ want = [b'x' * 10, b'match', b'y' * 10]
+ telnet = test_telnet(want, use_poll=True)
+ select.select = lambda *_: self.fail('unexpected select() call.')
+ data = telnet.read_until(b'match')
+ self.assertEqual(data, b''.join(want[:-1]))
+
+ def test_read_until_with_select(self):
+ """Use select.select() to implement telnet.read_until()."""
+ want = [b'x' * 10, b'match', b'y' * 10]
+ telnet = test_telnet(want, use_poll=False)
+ if self.old_poll:
+ select.poll = lambda *_: self.fail('unexpected poll() call.')
+ data = telnet.read_until(b'match')
+ self.assertEqual(data, b''.join(want[:-1]))
def test_read_all(self):
"""
@@ -285,7 +339,7 @@ class OptionTests(TestCase):
txt = telnet.read_all()
cmd = nego.seen
self.assertTrue(len(cmd) > 0) # we expect at least one command
- self.assertTrue(cmd[:1] in self.cmds)
+ self.assertIn(cmd[:1], self.cmds)
self.assertEqual(cmd[1:2], tl.NOOPT)
self.assertEqual(data_len, len(txt + cmd))
nego.sb_getter = None # break the nego => telnet cycle
@@ -332,7 +386,7 @@ class OptionTests(TestCase):
telnet = test_telnet([a])
telnet.set_debuglevel(1)
txt = telnet.read_all()
- self.assertTrue(b in telnet._messages)
+ self.assertIn(b, telnet._messages)
return
def test_debuglevel_write(self):
@@ -340,7 +394,7 @@ class OptionTests(TestCase):
telnet.set_debuglevel(1)
telnet.write(b'xxx')
expected = "send b'xxx'\n"
- self.assertTrue(expected in telnet._messages)
+ self.assertIn(expected, telnet._messages)
def test_debug_accepts_str_port(self):
# Issue 10695
@@ -349,11 +403,42 @@ class OptionTests(TestCase):
telnet._messages = ''
telnet.set_debuglevel(1)
telnet.msg('test')
- self.assertRegexpMatches(telnet._messages, r'0.*test')
+ self.assertRegex(telnet._messages, r'0.*test')
+
+
+class ExpectTests(ExpectAndReadTestCase):
+ def test_expect(self):
+ """
+ expect(expected, [timeout])
+ Read until the expected string has been seen, or a timeout is
+ hit (default is no timeout); may block.
+ """
+ want = [b'x' * 10, b'match', b'y' * 10]
+ telnet = test_telnet(want)
+ (_,_,data) = telnet.expect([b'match'])
+ self.assertEqual(data, b''.join(want[:-1]))
+
+ def test_expect_with_poll(self):
+ """Use select.poll() to implement telnet.expect()."""
+ want = [b'x' * 10, b'match', b'y' * 10]
+ telnet = test_telnet(want, use_poll=True)
+ select.select = lambda *_: self.fail('unexpected select() call.')
+ (_,_,data) = telnet.expect([b'match'])
+ self.assertEqual(data, b''.join(want[:-1]))
+
+ def test_expect_with_select(self):
+ """Use select.select() to implement telnet.expect()."""
+ want = [b'x' * 10, b'match', b'y' * 10]
+ telnet = test_telnet(want, use_poll=False)
+ if self.old_poll:
+ select.poll = lambda *_: self.fail('unexpected poll() call.')
+ (_,_,data) = telnet.expect([b'match'])
+ self.assertEqual(data, b''.join(want[:-1]))
def test_main(verbose=None):
- support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests)
+ support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests,
+ ExpectTests)
if __name__ == '__main__':
test_main()