summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/test/test_socket.py81
1 files changed, 50 insertions, 31 deletions
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index dec63e2..98d9e78 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -191,7 +191,7 @@ class SocketConnectedTest(ThreadedTCPSocketTest):
class GeneralModuleTests(unittest.TestCase):
def testSocketError(self):
- """Testing that socket module exceptions."""
+ # Testing socket module exceptions
def raise_error(*args, **kwargs):
raise socket.error
def raise_herror(*args, **kwargs):
@@ -206,7 +206,7 @@ class GeneralModuleTests(unittest.TestCase):
"Error raising socket exception.")
def testCrucialConstants(self):
- """Testing for mission critical constants."""
+ # Testing for mission critical constants
socket.AF_INET
socket.SOCK_STREAM
socket.SOCK_DGRAM
@@ -217,7 +217,7 @@ class GeneralModuleTests(unittest.TestCase):
socket.SO_REUSEADDR
def testHostnameRes(self):
- """Testing hostname resolution mechanisms."""
+ # Testing hostname resolution mechanisms
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
self.assert_(ip.find('.') >= 0, "Error resolving host to ip.")
@@ -228,7 +228,7 @@ class GeneralModuleTests(unittest.TestCase):
self.fail("Error testing host resolution mechanisms.")
def testRefCountGetNameInfo(self):
- """Testing reference count for getnameinfo."""
+ # Testing reference count for getnameinfo
import sys
if hasattr(sys, "getrefcount"):
try:
@@ -240,7 +240,7 @@ class GeneralModuleTests(unittest.TestCase):
self.fail("socket.getnameinfo loses a reference")
def testInterpreterCrash(self):
- """Making sure getnameinfo doesn't crash the interpreter."""
+ # Making sure getnameinfo doesn't crash the interpreter
try:
# On some versions, this crashes the interpreter.
socket.getnameinfo(('x', 0, 0, 0), 0)
@@ -258,7 +258,7 @@ class GeneralModuleTests(unittest.TestCase):
self.assertRaises(OverflowError, func, 2L**34)
def testGetServByName(self):
- """Testing getservbyname()."""
+ # Testing getservbyname()
# try a few protocols - not everyone has telnet enabled
found = 0
for proto in ("telnet", "ssh", "www", "ftp"):
@@ -278,7 +278,7 @@ class GeneralModuleTests(unittest.TestCase):
raise socket.error
def testDefaultTimeout(self):
- """Testing default timeout."""
+ # Testing default timeout
# The default timeout should initially be None
self.assertEqual(socket.getdefaulttimeout(), None)
s = socket.socket()
@@ -308,28 +308,28 @@ class GeneralModuleTests(unittest.TestCase):
# XXX The following don't test module-level functionality...
def testSockName(self):
- """Testing getsockname()."""
+ # Testing getsockname()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("0.0.0.0", PORT+1))
name = sock.getsockname()
self.assertEqual(name, ("0.0.0.0", PORT+1))
def testGetSockOpt(self):
- """Testing getsockopt()."""
+ # Testing getsockopt()
# We know a socket should start without reuse==0
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
self.failIf(reuse != 0, "initial mode is reuse")
def testSetSockOpt(self):
- """Testing setsockopt()."""
+ # Testing setsockopt()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
self.failIf(reuse == 0, "failed to set reuse mode")
def testSendAfterClose(self):
- """testing send() after close() with timeout."""
+ # testing send() after close() with timeout
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.close()
@@ -341,7 +341,7 @@ class BasicTCPTest(SocketConnectedTest):
SocketConnectedTest.__init__(self, methodName=methodName)
def testRecv(self):
- """Testing large receive over TCP."""
+ # Testing large receive over TCP
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, MSG)
@@ -349,7 +349,7 @@ class BasicTCPTest(SocketConnectedTest):
self.serv_conn.send(MSG)
def testOverFlowRecv(self):
- """Testing receive in chunks over TCP."""
+ # Testing receive in chunks over TCP
seg1 = self.cli_conn.recv(len(MSG) - 3)
seg2 = self.cli_conn.recv(1024)
msg = seg1 + seg2
@@ -359,7 +359,7 @@ class BasicTCPTest(SocketConnectedTest):
self.serv_conn.send(MSG)
def testRecvFrom(self):
- """Testing large recvfrom() over TCP."""
+ # Testing large recvfrom() over TCP
msg, addr = self.cli_conn.recvfrom(1024)
self.assertEqual(msg, MSG)
@@ -367,7 +367,7 @@ class BasicTCPTest(SocketConnectedTest):
self.serv_conn.send(MSG)
def testOverFlowRecvFrom(self):
- """Testing recvfrom() in chunks over TCP."""
+ # Testing recvfrom() in chunks over TCP
seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
seg2, addr = self.cli_conn.recvfrom(1024)
msg = seg1 + seg2
@@ -377,7 +377,7 @@ class BasicTCPTest(SocketConnectedTest):
self.serv_conn.send(MSG)
def testSendAll(self):
- """Testing sendall() with a 2048 byte string over TCP."""
+ # Testing sendall() with a 2048 byte string over TCP
while 1:
read = self.cli_conn.recv(1024)
if not read:
@@ -391,7 +391,7 @@ class BasicTCPTest(SocketConnectedTest):
self.serv_conn.sendall(big_chunk)
def testFromFd(self):
- """Testing fromfd()."""
+ # Testing fromfd()
if not hasattr(socket, "fromfd"):
return # On Windows, this doesn't exist
fd = self.cli_conn.fileno()
@@ -403,7 +403,7 @@ class BasicTCPTest(SocketConnectedTest):
self.serv_conn.send(MSG)
def testShutdown(self):
- """Testing shutdown()."""
+ # Testing shutdown()
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, MSG)
@@ -417,7 +417,7 @@ class BasicUDPTest(ThreadedUDPSocketTest):
ThreadedUDPSocketTest.__init__(self, methodName=methodName)
def testSendtoAndRecv(self):
- """Testing sendto() and Recv() over UDP."""
+ # Testing sendto() and Recv() over UDP
msg = self.serv.recv(len(MSG))
self.assertEqual(msg, MSG)
@@ -425,7 +425,7 @@ class BasicUDPTest(ThreadedUDPSocketTest):
self.cli.sendto(MSG, 0, (HOST, PORT))
def testRecvFrom(self):
- """Testing recvfrom() over UDP."""
+ # Testing recvfrom() over UDP
msg, addr = self.serv.recvfrom(len(MSG))
self.assertEqual(msg, MSG)
@@ -438,7 +438,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
ThreadedTCPSocketTest.__init__(self, methodName=methodName)
def testSetBlocking(self):
- """Testing whether set blocking works."""
+ # Testing whether set blocking works
self.serv.setblocking(0)
start = time.time()
try:
@@ -452,7 +452,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
pass
def testAccept(self):
- """Testing non-blocking accept."""
+ # Testing non-blocking accept
self.serv.setblocking(0)
try:
conn, addr = self.serv.accept()
@@ -471,7 +471,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
self.cli.connect((HOST, PORT))
def testConnect(self):
- """Testing non-blocking connect."""
+ # Testing non-blocking connect
conn, addr = self.serv.accept()
def _testConnect(self):
@@ -479,7 +479,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
self.cli.connect((HOST, PORT))
def testRecv(self):
- """Testing non-blocking recv."""
+ # Testing non-blocking recv
conn, addr = self.serv.accept()
conn.setblocking(0)
try:
@@ -526,7 +526,7 @@ class FileObjectClassTestCase(SocketConnectedTest):
SocketConnectedTest.clientTearDown(self)
def testSmallRead(self):
- """Performing small file read test."""
+ # Performing small file read test
first_seg = self.serv_file.read(len(MSG)-3)
second_seg = self.serv_file.read(3)
msg = first_seg + second_seg
@@ -536,22 +536,31 @@ class FileObjectClassTestCase(SocketConnectedTest):
self.cli_file.write(MSG)
self.cli_file.flush()
+ def testFullRead(self):
+ # read until EOF
+ msg = self.serv_file.read()
+ self.assertEqual(msg, MSG)
+
+ def _testFullRead(self):
+ self.cli_file.write(MSG)
+ self.cli_file.close()
+
def testUnbufferedRead(self):
- """Performing unbuffered file read test."""
+ # Performing unbuffered file read test
buf = ''
while 1:
char = self.serv_file.read(1)
- self.failIf(not char)
- buf += char
- if buf == MSG:
+ if not char:
break
+ buf += char
+ self.assertEqual(buf, MSG)
def _testUnbufferedRead(self):
self.cli_file.write(MSG)
self.cli_file.flush()
def testReadline(self):
- """Performing file readline test."""
+ # Performing file readline test
line = self.serv_file.readline()
self.assertEqual(line, MSG)
@@ -572,7 +581,7 @@ class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
bufsize = 0 # Use unbuffered mode
def testUnbufferedReadline(self):
- """Read a line, create a new file object, read another line with it."""
+ # Read a line, create a new file object, read another line with it
line = self.serv_file.readline() # first line
self.assertEqual(line, "A. " + MSG) # first line
self.serv_file = self.cli_conn.makefile('rb', 0)
@@ -584,6 +593,14 @@ class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
self.cli_file.write("B. " + MSG)
self.cli_file.flush()
+class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
+
+ bufsize = 1 # Default-buffered for reading; line-buffered for writing
+
+
+class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
+
+ bufsize = 2 # Exercise the buffering code
def test_main():
suite = unittest.TestSuite()
@@ -593,6 +610,8 @@ def test_main():
suite.addTest(unittest.makeSuite(NonBlockingTCPTests))
suite.addTest(unittest.makeSuite(FileObjectClassTestCase))
suite.addTest(unittest.makeSuite(UnbufferedFileObjectClassTestCase))
+ suite.addTest(unittest.makeSuite(LineBufferedFileObjectClassTestCase))
+ suite.addTest(unittest.makeSuite(SmallBufferedFileObjectClassTestCase))
test_support.run_suite(suite)
if __name__ == "__main__":