summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_socketserver.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_socketserver.py')
-rw-r--r--Lib/test/test_socketserver.py99
1 files changed, 98 insertions, 1 deletions
diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py
index 0d0f86f..554c106 100644
--- a/Lib/test/test_socketserver.py
+++ b/Lib/test/test_socketserver.py
@@ -58,6 +58,7 @@ if HAVE_UNIX_SOCKETS:
@contextlib.contextmanager
def simple_subprocess(testcase):
+ """Tests that a custom child process is not waited on (Issue 1540386)"""
pid = os.fork()
if pid == 0:
# Don't raise an exception; it would be caught by the test harness.
@@ -103,7 +104,6 @@ class SocketServerTest(unittest.TestCase):
class MyServer(svrcls):
def handle_error(self, request, client_address):
self.close_request(request)
- self.server_close()
raise
class MyHandler(hdlrbase):
@@ -279,6 +279,103 @@ class SocketServerTest(unittest.TestCase):
socketserver.TCPServer((HOST, -1),
socketserver.StreamRequestHandler)
+ def test_context_manager(self):
+ with socketserver.TCPServer((HOST, 0),
+ socketserver.StreamRequestHandler) as server:
+ pass
+ self.assertEqual(-1, server.socket.fileno())
+
+
+class ErrorHandlerTest(unittest.TestCase):
+ """Test that the servers pass normal exceptions from the handler to
+ handle_error(), and that exiting exceptions like SystemExit and
+ KeyboardInterrupt are not passed."""
+
+ def tearDown(self):
+ test.support.unlink(test.support.TESTFN)
+
+ def test_sync_handled(self):
+ BaseErrorTestServer(ValueError)
+ self.check_result(handled=True)
+
+ def test_sync_not_handled(self):
+ with self.assertRaises(SystemExit):
+ BaseErrorTestServer(SystemExit)
+ self.check_result(handled=False)
+
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ def test_threading_handled(self):
+ ThreadingErrorTestServer(ValueError)
+ self.check_result(handled=True)
+
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ def test_threading_not_handled(self):
+ ThreadingErrorTestServer(SystemExit)
+ self.check_result(handled=False)
+
+ @requires_forking
+ def test_forking_handled(self):
+ ForkingErrorTestServer(ValueError)
+ self.check_result(handled=True)
+
+ @requires_forking
+ def test_forking_not_handled(self):
+ ForkingErrorTestServer(SystemExit)
+ self.check_result(handled=False)
+
+ def check_result(self, handled):
+ with open(test.support.TESTFN) as log:
+ expected = 'Handler called\n' + 'Error handled\n' * handled
+ self.assertEqual(log.read(), expected)
+
+
+class BaseErrorTestServer(socketserver.TCPServer):
+ def __init__(self, exception):
+ self.exception = exception
+ super().__init__((HOST, 0), BadHandler)
+ with socket.create_connection(self.server_address):
+ pass
+ try:
+ self.handle_request()
+ finally:
+ self.server_close()
+ self.wait_done()
+
+ def handle_error(self, request, client_address):
+ with open(test.support.TESTFN, 'a') as log:
+ log.write('Error handled\n')
+
+ def wait_done(self):
+ pass
+
+
+class BadHandler(socketserver.BaseRequestHandler):
+ def handle(self):
+ with open(test.support.TESTFN, 'a') as log:
+ log.write('Handler called\n')
+ raise self.server.exception('Test error')
+
+
+class ThreadingErrorTestServer(socketserver.ThreadingMixIn,
+ BaseErrorTestServer):
+ def __init__(self, *pos, **kw):
+ self.done = threading.Event()
+ super().__init__(*pos, **kw)
+
+ def shutdown_request(self, *pos, **kw):
+ super().shutdown_request(*pos, **kw)
+ self.done.set()
+
+ def wait_done(self):
+ self.done.wait()
+
+
+class ForkingErrorTestServer(socketserver.ForkingMixIn, BaseErrorTestServer):
+ def wait_done(self):
+ [child] = self.active_children
+ os.waitpid(child, 0)
+ self.active_children.clear()
+
class MiscTestCase(unittest.TestCase):