diff options
Diffstat (limited to 'Lib/test/test_telnetlib.py')
| -rw-r--r-- | Lib/test/test_telnetlib.py | 127 |
1 files changed, 106 insertions, 21 deletions
diff --git a/Lib/test/test_telnetlib.py b/Lib/test/test_telnetlib.py index c1eea06..95eebf3 100644 --- a/Lib/test/test_telnetlib.py +++ b/Lib/test/test_telnetlib.py @@ -1,12 +1,13 @@ import socket import select -import threading import telnetlib import time import contextlib +import unittest from unittest import TestCase from test import support +threading = support.import_module('threading') HOST = support.HOST @@ -15,28 +16,25 @@ def server(evt, serv): evt.set() try: conn, addr = serv.accept() + conn.close() except socket.timeout: pass finally: serv.close() - conn.close() - evt.set() class GeneralTests(TestCase): def setUp(self): self.evt = threading.Event() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.settimeout(3) + self.sock.settimeout(60) # Safety net. Look issue 11812 self.port = support.bind_port(self.sock) self.thread = threading.Thread(target=server, args=(self.evt,self.sock)) + self.thread.setDaemon(True) self.thread.start() self.evt.wait() - self.evt.clear() - time.sleep(.1) def tearDown(self): - self.evt.wait() self.thread.join() def testBasic(self): @@ -48,7 +46,7 @@ class GeneralTests(TestCase): self.assertTrue(socket.getdefaulttimeout() is None) socket.setdefaulttimeout(30) try: - telnet = telnetlib.Telnet("localhost", self.port) + telnet = telnetlib.Telnet(HOST, self.port) finally: socket.setdefaulttimeout(None) self.assertEqual(telnet.sock.gettimeout(), 30) @@ -66,20 +64,20 @@ class GeneralTests(TestCase): telnet.sock.close() def testTimeoutValue(self): - telnet = telnetlib.Telnet("localhost", self.port, timeout=30) + telnet = telnetlib.Telnet(HOST, self.port, timeout=30) self.assertEqual(telnet.sock.gettimeout(), 30) telnet.sock.close() def testTimeoutOpen(self): telnet = telnetlib.Telnet() - telnet.open("localhost", self.port, timeout=30) + telnet.open(HOST, self.port, timeout=30) self.assertEqual(telnet.sock.gettimeout(), 30) telnet.sock.close() class SocketStub(object): ''' a socket proxy that re-defines sendall() ''' - def __init__(self, reads=[]): - self.reads = reads + def __init__(self, reads=()): + self.reads = list(reads) # Intentionally make a copy. self.writes = [] self.block = False def sendall(self, data): @@ -105,7 +103,7 @@ class TelnetAlike(telnetlib.Telnet): self._messages += out.getvalue() return -def new_select(*s_args): +def mock_select(*s_args): block = False for l in s_args: for fob in l: @@ -116,6 +114,30 @@ def new_select(*s_args): else: return s_args +class MockPoller(object): + test_case = None # Set during TestCase setUp. + + def __init__(self): + self._file_objs = [] + + def register(self, fd, eventmask): + self.test_case.assertTrue(hasattr(fd, 'fileno'), fd) + self.test_case.assertEqual(eventmask, select.POLLIN|select.POLLPRI) + self._file_objs.append(fd) + + def poll(self, timeout=None): + block = False + for fob in self._file_objs: + if isinstance(fob, TelnetAlike): + block = fob.sock.block + if block: + return [] + else: + return zip(self._file_objs, [select.POLLIN]*len(self._file_objs)) + + def unregister(self, fd): + self._file_objs.remove(fd) + @contextlib.contextmanager def test_socket(reads): def new_conn(*ignored): @@ -128,7 +150,7 @@ def test_socket(reads): socket.create_connection = old_conn return -def test_telnet(reads=[], cls=TelnetAlike): +def test_telnet(reads=(), cls=TelnetAlike, use_poll=None): ''' return a telnetlib.Telnet object that uses a SocketStub with reads queued up to be read ''' for x in reads: @@ -136,15 +158,31 @@ def test_telnet(reads=[], cls=TelnetAlike): with test_socket(reads): telnet = cls('dummy', 0) telnet._messages = '' # debuglevel output + if use_poll is not None: + if use_poll and not telnet._has_poll: + raise unittest.SkipTest('select.poll() required.') + telnet._has_poll = use_poll return telnet -class ReadTests(TestCase): + +class ExpectAndReadTestCase(TestCase): def setUp(self): self.old_select = select.select - select.select = new_select + select.select = mock_select + self.old_poll = False + if hasattr(select, 'poll'): + self.old_poll = select.poll + select.poll = MockPoller + MockPoller.test_case = self + def tearDown(self): + if self.old_poll: + MockPoller.test_case = None + select.poll = self.old_poll select.select = self.old_select + +class ReadTests(ExpectAndReadTestCase): def test_read_until(self): """ read_until(expected, timeout=None) @@ -161,6 +199,22 @@ class ReadTests(TestCase): data = telnet.read_until(b'match') self.assertEqual(data, expect) + def test_read_until_with_poll(self): + """Use select.poll() to implement telnet.read_until().""" + want = [b'x' * 10, b'match', b'y' * 10] + telnet = test_telnet(want, use_poll=True) + select.select = lambda *_: self.fail('unexpected select() call.') + data = telnet.read_until(b'match') + self.assertEqual(data, b''.join(want[:-1])) + + def test_read_until_with_select(self): + """Use select.select() to implement telnet.read_until().""" + want = [b'x' * 10, b'match', b'y' * 10] + telnet = test_telnet(want, use_poll=False) + if self.old_poll: + select.poll = lambda *_: self.fail('unexpected poll() call.') + data = telnet.read_until(b'match') + self.assertEqual(data, b''.join(want[:-1])) def test_read_all(self): """ @@ -285,7 +339,7 @@ class OptionTests(TestCase): txt = telnet.read_all() cmd = nego.seen self.assertTrue(len(cmd) > 0) # we expect at least one command - self.assertTrue(cmd[:1] in self.cmds) + self.assertIn(cmd[:1], self.cmds) self.assertEqual(cmd[1:2], tl.NOOPT) self.assertEqual(data_len, len(txt + cmd)) nego.sb_getter = None # break the nego => telnet cycle @@ -332,7 +386,7 @@ class OptionTests(TestCase): telnet = test_telnet([a]) telnet.set_debuglevel(1) txt = telnet.read_all() - self.assertTrue(b in telnet._messages) + self.assertIn(b, telnet._messages) return def test_debuglevel_write(self): @@ -340,7 +394,7 @@ class OptionTests(TestCase): telnet.set_debuglevel(1) telnet.write(b'xxx') expected = "send b'xxx'\n" - self.assertTrue(expected in telnet._messages) + self.assertIn(expected, telnet._messages) def test_debug_accepts_str_port(self): # Issue 10695 @@ -349,11 +403,42 @@ class OptionTests(TestCase): telnet._messages = '' telnet.set_debuglevel(1) telnet.msg('test') - self.assertRegexpMatches(telnet._messages, r'0.*test') + self.assertRegex(telnet._messages, r'0.*test') + + +class ExpectTests(ExpectAndReadTestCase): + def test_expect(self): + """ + expect(expected, [timeout]) + Read until the expected string has been seen, or a timeout is + hit (default is no timeout); may block. + """ + want = [b'x' * 10, b'match', b'y' * 10] + telnet = test_telnet(want) + (_,_,data) = telnet.expect([b'match']) + self.assertEqual(data, b''.join(want[:-1])) + + def test_expect_with_poll(self): + """Use select.poll() to implement telnet.expect().""" + want = [b'x' * 10, b'match', b'y' * 10] + telnet = test_telnet(want, use_poll=True) + select.select = lambda *_: self.fail('unexpected select() call.') + (_,_,data) = telnet.expect([b'match']) + self.assertEqual(data, b''.join(want[:-1])) + + def test_expect_with_select(self): + """Use select.select() to implement telnet.expect().""" + want = [b'x' * 10, b'match', b'y' * 10] + telnet = test_telnet(want, use_poll=False) + if self.old_poll: + select.poll = lambda *_: self.fail('unexpected poll() call.') + (_,_,data) = telnet.expect([b'match']) + self.assertEqual(data, b''.join(want[:-1])) def test_main(verbose=None): - support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests) + support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests, + ExpectTests) if __name__ == '__main__': test_main() |
