diff options
author | Jack Diederich <jackdied@gmail.com> | 2009-04-07 20:22:59 (GMT) |
---|---|---|
committer | Jack Diederich <jackdied@gmail.com> | 2009-04-07 20:22:59 (GMT) |
commit | 3b2312ee5c7e5fd737298e0bc5a9e259fd0e2c5a (patch) | |
tree | e4277bd04ec93ed45004a0193208f5f9fe0b800d /Lib | |
parent | 14bf0a0a379003bae2c997a98a40c6c9f9553157 (diff) | |
download | cpython-3b2312ee5c7e5fd737298e0bc5a9e259fd0e2c5a.zip cpython-3b2312ee5c7e5fd737298e0bc5a9e259fd0e2c5a.tar.gz cpython-3b2312ee5c7e5fd737298e0bc5a9e259fd0e2c5a.tar.bz2 |
eliminate more race conditions in telnetlib tests
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/test/test_telnetlib.py | 109 |
1 files changed, 58 insertions, 51 deletions
diff --git a/Lib/test/test_telnetlib.py b/Lib/test/test_telnetlib.py index 16f7f93..d6214be 100644 --- a/Lib/test/test_telnetlib.py +++ b/Lib/test/test_telnetlib.py @@ -13,8 +13,8 @@ EOF_sigil = object() def server(evt, serv, dataq=None): """ Open a tcp server in three steps 1) set evt to true to let the parent know we are ready - 2) [optional] write all the data in dataq to the socket - terminate when dataq.get() returns EOF_sigil + 2) [optional] if is not False, write the list of data from dataq.get() + to the socket. 3) set evt to true to let the parent know we're done """ serv.listen(5) @@ -23,15 +23,19 @@ def server(evt, serv, dataq=None): conn, addr = serv.accept() if dataq: data = '' - new_data = dataq.get(True, 0.5) - while new_data is not EOF_sigil: - if type(new_data) == str: - data += new_data - elif type(new_data) in [int, float]: + done = False + for new_data in dataq.get(True, 0.5): + if not done: + dataq.task_done() + done = True + if new_data == EOF_sigil: + break + if type(new_data) in [int, float]: time.sleep(new_data) + else: + data += new_data written = conn.send(data) data = data[written:] - new_data = dataq.get(True, 0.5) except socket.timeout: pass finally: @@ -98,7 +102,7 @@ class GeneralTests(TestCase): def _read_setUp(self): # the blocking constant should be tuned! - self.blocking_timeout = 0.0 + self.blocking_timeout = 0.3 self.evt = threading.Event() self.dataq = Queue.Queue() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -114,19 +118,10 @@ def _read_tearDown(self): self.evt.wait() self.thread.join() - class ReadTests(TestCase): setUp = _read_setUp tearDown = _read_tearDown - def _test_blocking(self, func): - start = time.time() - self.dataq.put(self.blocking_timeout) - self.dataq.put(EOF_sigil) - data = func() - low, high = wibble_float(self.blocking_timeout) - self.assertTrue(time.time() - start >= low) - def test_read_until_A(self): """ read_until(expected, [timeout]) @@ -134,23 +129,19 @@ class ReadTests(TestCase): hit (default is no timeout); may block. """ want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] - for item in want: - self.dataq.put(item) + self.dataq.put(want) telnet = telnetlib.Telnet(HOST, self.port) + self.dataq.join() data = telnet.read_until('match') self.assertEqual(data, ''.join(want[:-2])) def test_read_until_B(self): # test the timeout - it does NOT raise socket.timeout want = ['hello', self.blocking_timeout, EOF_sigil] - for item in want: - self.dataq.put(item) + self.dataq.put(want) telnet = telnetlib.Telnet(HOST, self.port) - start = time.time() - timeout = self.blocking_timeout / 2 - data = telnet.read_until('not seen', timeout) - low, high = wibble_float(timeout) - self.assertTrue(low <= time.time() - high) + self.dataq.join() + data = telnet.read_until('not seen') self.assertEqual(data, want[0]) def test_read_all_A(self): @@ -159,16 +150,31 @@ class ReadTests(TestCase): Read all data until EOF; may block. """ want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil] - for item in want: - self.dataq.put(item) + self.dataq.put(want) telnet = telnetlib.Telnet(HOST, self.port) + self.dataq.join() data = telnet.read_all() self.assertEqual(data, ''.join(want[:-1])) return + def _test_blocking(self, func): + self.dataq.put([self.blocking_timeout, EOF_sigil]) + self.dataq.join() + start = time.time() + data = func() + low, high = wibble_float(self.blocking_timeout) + self.assertTrue(low <= time.time() - start) + def test_read_all_B(self): self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all) + def test_read_all_C(self): + self.dataq.put([EOF_sigil]) + telnet = telnetlib.Telnet(HOST, self.port) + self.dataq.join() + telnet.read_all() + telnet.read_all() # shouldn't raise + def test_read_some_A(self): """ read_some() @@ -176,19 +182,20 @@ class ReadTests(TestCase): """ # test 'at least one byte' want = ['x' * 500, EOF_sigil] - for item in want: - self.dataq.put(item) + self.dataq.put(want) telnet = telnetlib.Telnet(HOST, self.port) + self.dataq.join() data = telnet.read_all() self.assertTrue(len(data) >= 1) def test_read_some_B(self): # test EOF - self.dataq.put(EOF_sigil) + self.dataq.put([EOF_sigil]) telnet = telnetlib.Telnet(HOST, self.port) + self.dataq.join() self.assertEqual('', telnet.read_some()) - def test_read_all_C(self): + def test_read_some_C(self): self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some) def _test_read_any_eager_A(self, func_name): @@ -200,26 +207,24 @@ class ReadTests(TestCase): # this never blocks so it should return eat part in turn want = ['x' * 100, self.blocking_timeout/2, 'y' * 100, EOF_sigil] expects = want[0] + want[2] - for item in want: - self.dataq.put(item) + self.dataq.put(want) telnet = telnetlib.Telnet(HOST, self.port) + self.dataq.join() func = getattr(telnet, func_name) - time.sleep(self.blocking_timeout/10) data = '' while True: try: data += func() self.assertTrue(expects.startswith(data)) - time.sleep(self.blocking_timeout) except EOFError: break self.assertEqual(expects, data) def _test_read_any_eager_B(self, func_name): # test EOF - self.dataq.put(EOF_sigil) - time.sleep(self.blocking_timeout / 10) + self.dataq.put([EOF_sigil]) telnet = telnetlib.Telnet(HOST, self.port) + self.dataq.join() func = getattr(telnet, func_name) self.assertRaises(EOFError, func) @@ -238,14 +243,14 @@ class ReadTests(TestCase): def _test_read_any_lazy_A(self, func_name): want = [self.blocking_timeout/2, 'x' * 100, EOF_sigil] - for item in want: - self.dataq.put(item) + self.dataq.put(want) telnet = telnetlib.Telnet(HOST, self.port) + self.dataq.join() func = getattr(telnet, func_name) self.assertEqual('', func()) data = '' while True: - time.sleep(self.blocking_timeout) + time.sleep(0.0) try: telnet.fill_rawq() data += func() @@ -257,10 +262,11 @@ class ReadTests(TestCase): return data, want[1] def _test_read_any_lazy_B(self, func_name): - self.dataq.put(EOF_sigil) + self.dataq.put([EOF_sigil]) telnet = telnetlib.Telnet(HOST, self.port) + self.dataq.join() func = getattr(telnet, func_name) - time.sleep(self.blocking_timeout/10) + time.sleep(0.0) telnet.fill_rawq() self.assertRaises(EOFError, func) @@ -298,12 +304,11 @@ class OptionTests(TestCase): def _test_command(self, data): """ helper for testing IAC + cmd """ self.setUp() - for item in data: - self.dataq.put(item) + self.dataq.put(data) telnet = telnetlib.Telnet(HOST, self.port) + self.dataq.join() nego = nego_collector() telnet.set_option_negotiation_callback(nego.do_nego) - time.sleep(self.blocking_timeout/10) txt = telnet.read_all() cmd = nego.seen self.assertTrue(len(cmd) > 0) # we expect at least one command @@ -314,8 +319,9 @@ class OptionTests(TestCase): def test_IAC_commands(self): # reset our setup - self.dataq.put(EOF_sigil) + self.dataq.put([EOF_sigil]) telnet = telnetlib.Telnet(HOST, self.port) + self.dataq.join() self.tearDown() for cmd in self.cmds: @@ -324,6 +330,7 @@ class OptionTests(TestCase): self._test_command([tl.IAC + cmd, EOF_sigil]) # all at once self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil]) + self.assertEqual('', telnet.read_sb_data()) def test_SB_commands(self): # RFC 855, subnegotiations portion @@ -334,16 +341,16 @@ class OptionTests(TestCase): tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE, EOF_sigil, ] - for item in send: - self.dataq.put(item) + self.dataq.put(send) telnet = telnetlib.Telnet(HOST, self.port) + self.dataq.join() nego = nego_collector(telnet.read_sb_data) telnet.set_option_negotiation_callback(nego.do_nego) - time.sleep(self.blocking_timeout/10) txt = telnet.read_all() self.assertEqual(txt, '') want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd' self.assertEqual(nego.sb_seen, want_sb_data) + self.assertEqual('', telnet.read_sb_data()) def test_main(verbose=None): test_support.run_unittest(GeneralTests, ReadTests, OptionTests) |