summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asynchat.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_asynchat.py')
-rw-r--r--Lib/test/test_asynchat.py213
1 files changed, 186 insertions, 27 deletions
diff --git a/Lib/test/test_asynchat.py b/Lib/test/test_asynchat.py
index 7629296..be8e116 100644
--- a/Lib/test/test_asynchat.py
+++ b/Lib/test/test_asynchat.py
@@ -3,12 +3,17 @@
import thread # If this fails, we can't test this module
import asyncore, asynchat, socket, threading, time
import unittest
+import sys
from test import test_support
HOST = "127.0.0.1"
PORT = 54322
+SERVER_QUIT = b'QUIT\n'
class echo_server(threading.Thread):
+ # parameter to determine the number of bytes passed back to the
+ # client each send
+ chunk_size = 1
def run(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -17,15 +22,28 @@ class echo_server(threading.Thread):
PORT = test_support.bind_port(sock, HOST, PORT)
sock.listen(1)
conn, client = sock.accept()
- buffer = b""
- while b"\n" not in buffer:
+ self.buffer = b""
+ # collect data until quit message is seen
+ while SERVER_QUIT not in self.buffer:
data = conn.recv(1)
if not data:
break
- buffer = buffer + data
- while buffer:
- n = conn.send(buffer)
- buffer = buffer[n:]
+ self.buffer = self.buffer + data
+
+ # remove the SERVER_QUIT message
+ self.buffer = self.buffer.replace(SERVER_QUIT, b'')
+
+ # re-send entire set of collected data
+ try:
+ # this may fail on some tests, such as test_close_when_done, since
+ # the client closes the channel when it's done sending
+ while self.buffer:
+ n = conn.send(self.buffer[:self.chunk_size])
+ time.sleep(0.001)
+ self.buffer = self.buffer[n:]
+ except:
+ pass
+
conn.close()
sock.close()
@@ -33,7 +51,7 @@ class echo_client(asynchat.async_chat):
def __init__(self, terminator):
asynchat.async_chat.__init__(self)
- self.contents = None
+ self.contents = []
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((HOST, PORT))
self.set_terminator(terminator)
@@ -41,53 +59,194 @@ class echo_client(asynchat.async_chat):
def handle_connect(self):
pass
- ##print "Connected"
+
+ if sys.platform == 'darwin':
+ # select.poll returns a select.POLLHUP at the end of the tests
+ # on darwin, so just ignore it
+ def handle_expt(self):
+ pass
def collect_incoming_data(self, data):
- self.buffer = self.buffer + data
+ self.buffer += data
def found_terminator(self):
- #print "Received:", repr(self.buffer)
- self.contents = self.buffer
+ self.contents.append(self.buffer)
self.buffer = b""
- self.close()
class TestAsynchat(unittest.TestCase):
+ usepoll = False
+
def setUp (self):
pass
def tearDown (self):
pass
- def test_line_terminator(self):
+ def line_terminator_check(self, term, server_chunk):
+ s = echo_server()
+ s.chunk_size = server_chunk
+ s.start()
+ time.sleep(0.5) # Give server time to initialize
+ c = echo_client(term)
+ c.push(b"hello ")
+ c.push(bytes("world%s" % term))
+ c.push(bytes("I'm not dead yet!%s" % term))
+ c.push(SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ s.join()
+
+ self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
+
+ # the line terminator tests below check receiving variously-sized
+ # chunks back from the server in order to exercise all branches of
+ # async_chat.handle_read
+
+ def test_line_terminator1(self):
+ # test one-character terminator
+ for l in (1,2,3):
+ self.line_terminator_check(b'\n', l)
+
+ def test_line_terminator2(self):
+ # test two-character terminator
+ for l in (1,2,3):
+ self.line_terminator_check(b'\r\n', l)
+
+ def test_line_terminator3(self):
+ # test three-character terminator
+ for l in (1,2,3):
+ self.line_terminator_check(b'qqq', l)
+
+ def numeric_terminator_check(self, termlen):
+ # Try reading a fixed number of bytes
s = echo_server()
s.start()
- time.sleep(1) # Give server time to initialize
- c = echo_client('\n')
- c.push("hello ")
- c.push("world\n")
- asyncore.loop()
+ time.sleep(0.5) # Give server time to initialize
+ c = echo_client(termlen)
+ data = b"hello world, I'm not dead yet!\n"
+ c.push(data)
+ c.push(SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
- self.assertEqual(c.contents, b'hello world')
+ self.assertEqual(c.contents, [data[:termlen]])
- def test_numeric_terminator(self):
+ def test_numeric_terminator1(self):
+ # check that ints & longs both work (since type is
+ # explicitly checked in async_chat.handle_read)
+ self.numeric_terminator_check(1)
+
+ def test_numeric_terminator2(self):
+ self.numeric_terminator_check(6)
+
+ def test_none_terminator(self):
# Try reading a fixed number of bytes
s = echo_server()
s.start()
- time.sleep(1) # Give server time to initialize
- c = echo_client(6)
- c.push("hello ")
- c.push("world\n")
- asyncore.loop()
+ time.sleep(0.5) # Give server time to initialize
+ c = echo_client(None)
+ data = b"hello world, I'm not dead yet!\n"
+ c.push(data)
+ c.push(SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ s.join()
+
+ self.assertEqual(c.contents, [])
+ self.assertEqual(c.buffer, data)
+
+ def test_simple_producer(self):
+ s = echo_server()
+ s.start()
+ time.sleep(0.5) # Give server time to initialize
+ c = echo_client(b'\n')
+ data = b"hello world\nI'm not dead yet!\n"
+ p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
+ c.push_with_producer(p)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ s.join()
+
+ self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
+
+ def test_string_producer(self):
+ s = echo_server()
+ s.start()
+ time.sleep(0.5) # Give server time to initialize
+ c = echo_client(b'\n')
+ data = b"hello world\nI'm not dead yet!\n"
+ c.push_with_producer(data+SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ s.join()
+
+ self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
+
+ def test_empty_line(self):
+ # checks that empty lines are handled correctly
+ s = echo_server()
+ s.start()
+ time.sleep(0.5) # Give server time to initialize
+ c = echo_client(b'\n')
+ c.push("hello world\n\nI'm not dead yet!\n")
+ c.push(SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ s.join()
+
+ self.assertEqual(c.contents,
+ [b"hello world", b"", b"I'm not dead yet!"])
+
+ def test_close_when_done(self):
+ s = echo_server()
+ s.start()
+ time.sleep(0.5) # Give server time to initialize
+ c = echo_client(b'\n')
+ c.push("hello world\nI'm not dead yet!\n")
+ c.push(SERVER_QUIT)
+ c.close_when_done()
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
- self.assertEqual(c.contents, b'hello ')
+ self.assertEqual(c.contents, [])
+ # the server might have been able to send a byte or two back, but this
+ # at least checks that it received something and didn't just fail
+ # (which could still result in the client not having received anything)
+ self.assertTrue(len(s.buffer) > 0)
+
+
+class TestAsynchat_WithPoll(TestAsynchat):
+ usepoll = True
+
+class TestHelperFunctions(unittest.TestCase):
+ def test_find_prefix_at_end(self):
+ self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
+ self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
+
+class TestFifo(unittest.TestCase):
+ def test_basic(self):
+ f = asynchat.fifo()
+ f.push(7)
+ f.push(b'a')
+ self.assertEqual(len(f), 2)
+ self.assertEqual(f.first(), 7)
+ self.assertEqual(f.pop(), (1, 7))
+ self.assertEqual(len(f), 1)
+ self.assertEqual(f.first(), b'a')
+ self.assertEqual(f.is_empty(), False)
+ self.assertEqual(f.pop(), (1, b'a'))
+ self.assertEqual(len(f), 0)
+ self.assertEqual(f.is_empty(), True)
+ self.assertEqual(f.pop(), (0, None))
+
+ def test_given_list(self):
+ f = asynchat.fifo([b'x', 17, 3])
+ self.assertEqual(len(f), 3)
+ self.assertEqual(f.pop(), (1, b'x'))
+ self.assertEqual(f.pop(), (1, 17))
+ self.assertEqual(f.pop(), (1, 3))
+ self.assertEqual(f.pop(), (0, None))
def test_main(verbose=None):
- test_support.run_unittest(TestAsynchat)
+ test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
+ TestHelperFunctions, TestFifo)
if __name__ == "__main__":
test_main(verbose=True)