diff options
author | Facundo Batista <facundobatista@gmail.com> | 2007-07-29 14:23:08 (GMT) |
---|---|---|
committer | Facundo Batista <facundobatista@gmail.com> | 2007-07-29 14:23:08 (GMT) |
commit | ec62423be4a84a056a9a0a1203875b8a29279365 (patch) | |
tree | dea0d3e7007b85361ab4c9b98874e5a72e94c65a /Lib | |
parent | 1ae6856522ef7ab87d96d75aad49ef982aea6e91 (diff) | |
download | cpython-ec62423be4a84a056a9a0a1203875b8a29279365.zip cpython-ec62423be4a84a056a9a0a1203875b8a29279365.tar.gz cpython-ec62423be4a84a056a9a0a1203875b8a29279365.tar.bz2 |
Added tests for asynchat classes simple_producer & fifo, and the
find_prefix_at_end function. Check behavior of a string given as a
producer. Added tests for behavior of asynchat.async_chat when given
int, long, and None terminator arguments. Added usepoll attribute to
TestAsynchat to allow running the asynchat tests with poll support
chosen whether it's available or not (improves coverage of asyncore
code). [GSoC - Alan McIntyre]
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/test/test_asynchat.py | 205 |
1 files changed, 179 insertions, 26 deletions
diff --git a/Lib/test/test_asynchat.py b/Lib/test/test_asynchat.py index 9926167..5fcc529 100644 --- a/Lib/test/test_asynchat.py +++ b/Lib/test/test_asynchat.py @@ -7,8 +7,12 @@ from test import test_support HOST = "127.0.0.1" PORT = 54322 +SERVER_QUIT = 'QUIT\n' class echo_server(threading.Thread): + # parameter to determine the number of bytes passed back to the + # client each send + chunk_size = 1 def run(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -17,15 +21,28 @@ class echo_server(threading.Thread): PORT = test_support.bind_port(sock, HOST, PORT) sock.listen(1) conn, client = sock.accept() - buffer = "" - while "\n" not in buffer: + self.buffer = "" + # collect data until quit message is seen + while SERVER_QUIT not in self.buffer: data = conn.recv(1) if not data: break - buffer = buffer + data - while buffer: - n = conn.send(buffer) - buffer = buffer[n:] + self.buffer = self.buffer + data + + # remove the SERVER_QUIT message + self.buffer = self.buffer.replace(SERVER_QUIT, '') + + # re-send entire set of collected data + try: + # this may fail on some tests, such as test_close_when_done, since + # the client closes the channel when it's done sending + while self.buffer: + n = conn.send(self.buffer[:self.chunk_size]) + time.sleep(0.001) + self.buffer = self.buffer[n:] + except: + pass + conn.close() sock.close() @@ -33,61 +50,197 @@ class echo_client(asynchat.async_chat): def __init__(self, terminator): asynchat.async_chat.__init__(self) - self.contents = None + self.contents = [] self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect((HOST, PORT)) self.set_terminator(terminator) - self.buffer = "" + self.buffer = '' def handle_connect(self): pass ##print "Connected" def collect_incoming_data(self, data): - self.buffer = self.buffer + data + self.buffer += data def found_terminator(self): - #print "Received:", repr(self.buffer) - self.contents = self.buffer + self.contents.append(self.buffer) self.buffer = "" - self.close() class TestAsynchat(unittest.TestCase): + usepoll = False + def setUp (self): pass def tearDown (self): pass - def test_line_terminator(self): + def line_terminator_check(self, term, server_chunk): s = echo_server() + s.chunk_size = server_chunk s.start() - time.sleep(1) # Give server time to initialize - c = echo_client('\n') + time.sleep(0.5) # Give server time to initialize + c = echo_client(term) c.push("hello ") - c.push("world\n") - asyncore.loop() + c.push("world%s" % term) + c.push("I'm not dead yet!%s" % term) + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5) s.join() - self.assertEqual(c.contents, 'hello world') + self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"]) + + # the line terminator tests below check receiving variously-sized + # chunks back from the server in order to exercise all branches of + # async_chat.handle_read - def test_numeric_terminator(self): + def test_line_terminator1(self): + # test one-character terminator + for l in (1,2,3): + self.line_terminator_check('\n', l) + + def test_line_terminator2(self): + # test two-character terminator + for l in (1,2,3): + self.line_terminator_check('\r\n', l) + + def test_line_terminator3(self): + # test three-character terminator + for l in (1,2,3): + self.line_terminator_check('qqq', l) + + def numeric_terminator_check(self, termlen): # Try reading a fixed number of bytes s = echo_server() s.start() - time.sleep(1) # Give server time to initialize - c = echo_client(6L) - c.push("hello ") - c.push("world\n") - asyncore.loop() + time.sleep(0.5) # Give server time to initialize + c = echo_client(termlen) + data = "hello world, I'm not dead yet!\n" + c.push(data) + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5) + s.join() + + self.assertEqual(c.contents, [data[:termlen]]) + + def test_numeric_terminator1(self): + # check that ints & longs both work (since type is + # explicitly checked in async_chat.handle_read) + self.numeric_terminator_check(1) + self.numeric_terminator_check(1L) + + def test_numeric_terminator2(self): + self.numeric_terminator_check(6L) + + def test_none_terminator(self): + # Try reading a fixed number of bytes + s = echo_server() + s.start() + time.sleep(0.5) # Give server time to initialize + c = echo_client(None) + data = "hello world, I'm not dead yet!\n" + c.push(data) + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5) + s.join() + + self.assertEqual(c.contents, []) + self.assertEqual(c.buffer, data) + + def test_simple_producer(self): + s = echo_server() + s.start() + time.sleep(0.5) # Give server time to initialize + c = echo_client('\n') + data = "hello world\nI'm not dead yet!\n" + p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8) + c.push_with_producer(p) + asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5) + s.join() + + self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"]) + + def test_string_producer(self): + s = echo_server() + s.start() + time.sleep(0.5) # Give server time to initialize + c = echo_client('\n') + data = "hello world\nI'm not dead yet!\n" + c.push_with_producer(data+SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5) + s.join() + + self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"]) + + def test_empty_line(self): + # checks that empty lines are handled correctly + s = echo_server() + s.start() + time.sleep(0.5) # Give server time to initialize + c = echo_client('\n') + c.push("hello world\n\nI'm not dead yet!\n") + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll) + s.join() + + self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"]) + + def test_close_when_done(self): + s = echo_server() + s.start() + time.sleep(0.5) # Give server time to initialize + c = echo_client('\n') + c.push("hello world\nI'm not dead yet!\n") + c.push(SERVER_QUIT) + c.close_when_done() + asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5) s.join() - self.assertEqual(c.contents, 'hello ') + self.assertEqual(c.contents, []) + # the server might have been able to send a byte or two back, but this + # at least checks that it received something and didn't just fail + # (which could still result in the client not having received anything) + self.assertTrue(len(s.buffer) > 0) + + +class TestAsynchat_WithPoll(TestAsynchat): + usepoll = True + +class TestHelperFunctions(unittest.TestCase): + def test_find_prefix_at_end(self): + self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1) + self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0) + +class TestFifo(unittest.TestCase): + def test_basic(self): + f = asynchat.fifo() + f.push(7) + f.push('a') + self.assertEqual(len(f), 2) + self.assertEqual(f.first(), 7) + self.assertEqual(f.pop(), (1, 7)) + self.assertEqual(len(f), 1) + self.assertEqual(f.first(), 'a') + self.assertEqual(f.is_empty(), False) + self.assertEqual(f.pop(), (1, 'a')) + self.assertEqual(len(f), 0) + self.assertEqual(f.is_empty(), True) + self.assertEqual(f.pop(), (0, None)) + + def test_given_list(self): + f = asynchat.fifo(['x', 17, 3]) + self.assertEqual(len(f), 3) + self.assertEqual(f.pop(), (1, 'x')) + self.assertEqual(f.pop(), (1, 17)) + self.assertEqual(f.pop(), (1, 3)) + self.assertEqual(f.pop(), (0, None)) def test_main(verbose=None): - test_support.run_unittest(TestAsynchat) + test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll, + TestHelperFunctions, TestFifo) if __name__ == "__main__": test_main(verbose=True) |