summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_socketserver.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_socketserver.py')
-rw-r--r--Lib/test/test_socketserver.py290
1 files changed, 174 insertions, 116 deletions
diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py
index 3e0c7a4..5ae8e6d 100644
--- a/Lib/test/test_socketserver.py
+++ b/Lib/test/test_socketserver.py
@@ -1,20 +1,32 @@
-# Test suite for SocketServer.py
+"""
+Test suite for SocketServer.py.
+"""
-from test import test_support
-from test.test_support import (verbose, verify, TESTFN, TestSkipped,
- reap_children)
-test_support.requires('network')
-
-from SocketServer import *
+import os
import socket
import errno
+import imp
import select
import time
import threading
-import os
+from functools import wraps
+import unittest
+import SocketServer
+
+import test.test_support
+from test.test_support import reap_children, verbose, TestSkipped
+from test.test_support import TESTFN as TEST_FILE
+
+test.test_support.requires("network")
NREQ = 3
DELAY = 0.5
+TEST_STR = b"hello world\n"
+HOST = "localhost"
+
+HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
+HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
+
class MyMixinHandler:
def handle(self):
@@ -23,23 +35,41 @@ class MyMixinHandler:
time.sleep(DELAY)
self.wfile.write(line)
-class MyStreamHandler(MyMixinHandler, StreamRequestHandler):
+
+def receive(sock, n, timeout=20):
+ r, w, x = select.select([sock], [], [], timeout)
+ if sock in r:
+ return sock.recv(n)
+ else:
+ raise RuntimeError("timed out on %r" % (sock,))
+
+
+class MyStreamHandler(MyMixinHandler, SocketServer.StreamRequestHandler):
pass
-class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler):
+class MyDatagramHandler(MyMixinHandler,
+ SocketServer.DatagramRequestHandler):
pass
+class ForkingUnixStreamServer(SocketServer.ForkingMixIn,
+ SocketServer.UnixStreamServer):
+ pass
+
+class ForkingUnixDatagramServer(SocketServer.ForkingMixIn,
+ SocketServer.UnixDatagramServer):
+ pass
+
+
class MyMixinServer:
def serve_a_few(self):
for i in range(NREQ):
self.handle_request()
+
def handle_error(self, request, client_address):
self.close_request(request)
self.server_close()
raise
-teststring = b"hello world\n"
-
def receive(sock, n, timeout=20):
r, w, x = select.select([sock], [], [], timeout)
if sock in r:
@@ -75,6 +105,7 @@ class ServerThread(threading.Thread):
self.__svrcls = svrcls
self.__hdlrcls = hdlrcls
self.ready = threading.Event()
+
def run(self):
class svrcls(MyMixinServer, self.__svrcls):
pass
@@ -93,64 +124,8 @@ class ServerThread(threading.Thread):
svr.serve_a_few()
if verbose: print("thread: done")
-seed = 0
-def pickport():
- global seed
- seed += 1
- return 10000 + (os.getpid() % 1000)*10 + seed
-
-host = "localhost"
-testfiles = []
-def pickaddr(proto):
- if proto == socket.AF_INET:
- return (host, pickport())
- else:
- fn = TESTFN + str(pickport())
- if os.name == 'os2':
- # AF_UNIX socket names on OS/2 require a specific prefix
- # which can't include a drive letter and must also use
- # backslashes as directory separators
- if fn[1] == ':':
- fn = fn[2:]
- if fn[0] in (os.sep, os.altsep):
- fn = fn[1:]
- fn = os.path.join('\socket', fn)
- if os.sep == '/':
- fn = fn.replace(os.sep, os.altsep)
- else:
- fn = fn.replace(os.altsep, os.sep)
- testfiles.append(fn)
- return fn
-
-def cleanup():
- for fn in testfiles:
- try:
- os.remove(fn)
- except os.error:
- pass
- testfiles[:] = []
-
-def testloop(proto, servers, hdlrcls, testfunc):
- for svrcls in servers:
- addr = pickaddr(proto)
- if verbose:
- print("ADDR =", addr)
- print("CLASS =", svrcls)
- t = ServerThread(addr, svrcls, hdlrcls)
- if verbose: print("server created")
- t.start()
- if verbose: print("server running")
- for i in range(NREQ):
- t.ready.wait(10*DELAY)
- if not t.ready.isSet():
- raise RuntimeError("Server not ready within a reasonable time")
- if verbose: print("test client", i)
- testfunc(proto, addr)
- if verbose: print("waiting for server")
- t.join()
- if verbose: print("done")
-
-class ForgivingTCPServer(TCPServer):
+
+class ForgivingTCPServer(SocketServer.TCPServer):
# prevent errors if another process is using the port we want
def server_bind(self):
host, default_port = self.server_address
@@ -160,7 +135,7 @@ class ForgivingTCPServer(TCPServer):
for port in [default_port, 3434, 8798, 23833]:
try:
self.server_address = host, port
- TCPServer.server_bind(self)
+ SocketServer.TCPServer.server_bind(self)
break
except socket.error as e:
(err, msg) = e
@@ -168,56 +143,139 @@ class ForgivingTCPServer(TCPServer):
raise
print(' WARNING: failed to listen on port %d, trying another' % port, file=sys.__stderr__)
-tcpservers = [ForgivingTCPServer, ThreadingTCPServer]
-if hasattr(os, 'fork') and os.name not in ('os2',):
- tcpservers.append(ForkingTCPServer)
-udpservers = [UDPServer, ThreadingUDPServer]
-if hasattr(os, 'fork') and os.name not in ('os2',):
- udpservers.append(ForkingUDPServer)
-
-if not hasattr(socket, 'AF_UNIX'):
- streamservers = []
- dgramservers = []
-else:
- class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass
- streamservers = [UnixStreamServer, ThreadingUnixStreamServer]
- if hasattr(os, 'fork') and os.name not in ('os2',):
- streamservers.append(ForkingUnixStreamServer)
- class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass
- dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer]
- if hasattr(os, 'fork') and os.name not in ('os2',):
- dgramservers.append(ForkingUnixDatagramServer)
-
-def sloppy_cleanup():
- # See http://python.org/sf/1540386
- # We need to reap children here otherwise a child from one server
- # can be left running for the next server and cause a test failure.
- time.sleep(DELAY)
- reap_children()
-
-def testall():
- testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream)
- sloppy_cleanup()
- testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram)
- if hasattr(socket, 'AF_UNIX'):
- sloppy_cleanup()
- testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream)
- # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
- # client address so this cannot work:
- ##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram)
+class SocketServerTest(unittest.TestCase):
+ """Test all socket servers."""
+
+ def setUp(self):
+ self.port_seed = 0
+ self.test_files = []
+
+ def tearDown(self):
+ time.sleep(DELAY)
+ reap_children()
+
+ for fn in self.test_files:
+ try:
+ os.remove(fn)
+ except os.error:
+ pass
+ self.test_files[:] = []
+
+ def pickport(self):
+ self.port_seed += 1
+ return 10000 + (os.getpid() % 1000)*10 + self.port_seed
+
+ def pickaddr(self, proto):
+ if proto == socket.AF_INET:
+ return (HOST, self.pickport())
+ else:
+ fn = TEST_FILE + str(self.pickport())
+ if os.name == 'os2':
+ # AF_UNIX socket names on OS/2 require a specific prefix
+ # which can't include a drive letter and must also use
+ # backslashes as directory separators
+ if fn[1] == ':':
+ fn = fn[2:]
+ if fn[0] in (os.sep, os.altsep):
+ fn = fn[1:]
+ fn = os.path.join('\socket', fn)
+ if os.sep == '/':
+ fn = fn.replace(os.sep, os.altsep)
+ else:
+ fn = fn.replace(os.altsep, os.sep)
+ self.test_files.append(fn)
+ return fn
+
+ def run_servers(self, proto, servers, hdlrcls, testfunc):
+ for svrcls in servers:
+ addr = self.pickaddr(proto)
+ if verbose:
+ print "ADDR =", addr
+ print "CLASS =", svrcls
+ t = ServerThread(addr, svrcls, hdlrcls)
+ if verbose: print "server created"
+ t.start()
+ if verbose: print "server running"
+ for i in range(NREQ):
+ t.ready.wait(10*DELAY)
+ self.assert_(t.ready.isSet(),
+ "Server not ready within a reasonable time")
+ if verbose: print "test client", i
+ testfunc(proto, addr)
+ if verbose: print "waiting for server"
+ t.join()
+ if verbose: print "done"
+
+ def stream_examine(self, proto, addr):
+ s = socket.socket(proto, socket.SOCK_STREAM)
+ s.connect(addr)
+ s.sendall(TEST_STR)
+ buf = data = receive(s, 100)
+ while data and '\n' not in buf:
+ data = receive(s, 100)
+ buf += data
+ self.assertEquals(buf, TEST_STR)
+ s.close()
+
+ def dgram_examine(self, proto, addr):
+ s = socket.socket(proto, socket.SOCK_DGRAM)
+ s.sendto(TEST_STR, addr)
+ buf = data = receive(s, 100)
+ while data and '\n' not in buf:
+ data = receive(s, 100)
+ buf += data
+ self.assertEquals(buf, TEST_STR)
+ s.close()
+
+ def test_TCPServers(self):
+ # Test SocketServer.TCPServer
+ servers = [ForgivingTCPServer, SocketServer.ThreadingTCPServer]
+ if HAVE_FORKING:
+ servers.append(SocketServer.ForkingTCPServer)
+ self.run_servers(socket.AF_INET, servers,
+ MyStreamHandler, self.stream_examine)
+
+ def test_UDPServers(self):
+ # Test SocketServer.UDPServer
+ servers = [SocketServer.UDPServer,
+ SocketServer.ThreadingUDPServer]
+ if HAVE_FORKING:
+ servers.append(SocketServer.ForkingUDPServer)
+ self.run_servers(socket.AF_INET, servers, MyDatagramHandler,
+ self.dgram_examine)
+
+ def test_stream_servers(self):
+ # Test SocketServer's stream servers
+ if not HAVE_UNIX_SOCKETS:
+ return
+ servers = [SocketServer.UnixStreamServer,
+ SocketServer.ThreadingUnixStreamServer]
+ if HAVE_FORKING:
+ servers.append(ForkingUnixStreamServer)
+ self.run_servers(socket.AF_UNIX, servers, MyStreamHandler,
+ self.stream_examine)
+
+ # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
+ # client address so this cannot work:
+
+ # def test_dgram_servers(self):
+ # # Test SocketServer.UnixDatagramServer
+ # if not HAVE_UNIX_SOCKETS:
+ # return
+ # servers = [SocketServer.UnixDatagramServer,
+ # SocketServer.ThreadingUnixDatagramServer]
+ # if HAVE_FORKING:
+ # servers.append(ForkingUnixDatagramServer)
+ # self.run_servers(socket.AF_UNIX, servers, MyDatagramHandler,
+ # self.dgram_examine)
+
def test_main():
- import imp
if imp.lock_held():
- # If the import lock is held, the threads will hang.
+ # If the import lock is held, the threads will hang
raise TestSkipped("can't run when import lock is held")
- reap_children()
- try:
- testall()
- finally:
- cleanup()
- reap_children()
+ test.test_support.run_unittest(SocketServerTest)
if __name__ == "__main__":
test_main()