summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_socketserver.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_socketserver.py')
-rw-r--r--Lib/test/test_socketserver.py94
1 files changed, 41 insertions, 53 deletions
diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py
index 325d485..0d0f86f 100644
--- a/Lib/test/test_socketserver.py
+++ b/Lib/test/test_socketserver.py
@@ -160,6 +160,8 @@ class SocketServerTest(unittest.TestCase):
def dgram_examine(self, proto, addr):
s = socket.socket(proto, socket.SOCK_DGRAM)
+ if HAVE_UNIX_SOCKETS and proto == socket.AF_UNIX:
+ s.bind(self.pickaddr(proto))
s.sendto(TEST_STR, addr)
buf = data = receive(s, 100)
while data and b'\n' not in buf:
@@ -222,59 +224,24 @@ class SocketServerTest(unittest.TestCase):
socketserver.DatagramRequestHandler,
self.dgram_examine)
- @contextlib.contextmanager
- def mocked_select_module(self):
- """Mocks the select.select() call to raise EINTR for first call"""
- old_select = select.select
-
- class MockSelect:
- def __init__(self):
- self.called = 0
-
- def __call__(self, *args):
- self.called += 1
- if self.called == 1:
- # raise the exception on first call
- raise OSError(errno.EINTR, os.strerror(errno.EINTR))
- else:
- # Return real select value for consecutive calls
- return old_select(*args)
-
- select.select = MockSelect()
- try:
- yield select.select
- finally:
- select.select = old_select
-
- def test_InterruptServerSelectCall(self):
- with self.mocked_select_module() as mock_select:
- pid = self.run_server(socketserver.TCPServer,
- socketserver.StreamRequestHandler,
- self.stream_examine)
- # Make sure select was called again:
- self.assertGreater(mock_select.called, 1)
-
- # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
- # client address so this cannot work:
-
- # @requires_unix_sockets
- # def test_UnixDatagramServer(self):
- # self.run_server(socketserver.UnixDatagramServer,
- # socketserver.DatagramRequestHandler,
- # self.dgram_examine)
- #
- # @requires_unix_sockets
- # def test_ThreadingUnixDatagramServer(self):
- # self.run_server(socketserver.ThreadingUnixDatagramServer,
- # socketserver.DatagramRequestHandler,
- # self.dgram_examine)
- #
- # @requires_unix_sockets
- # @requires_forking
- # def test_ForkingUnixDatagramServer(self):
- # self.run_server(socketserver.ForkingUnixDatagramServer,
- # socketserver.DatagramRequestHandler,
- # self.dgram_examine)
+ @requires_unix_sockets
+ def test_UnixDatagramServer(self):
+ self.run_server(socketserver.UnixDatagramServer,
+ socketserver.DatagramRequestHandler,
+ self.dgram_examine)
+
+ @requires_unix_sockets
+ def test_ThreadingUnixDatagramServer(self):
+ self.run_server(socketserver.ThreadingUnixDatagramServer,
+ socketserver.DatagramRequestHandler,
+ self.dgram_examine)
+
+ @requires_unix_sockets
+ @requires_forking
+ def test_ForkingUnixDatagramServer(self):
+ self.run_server(ForkingUnixDatagramServer,
+ socketserver.DatagramRequestHandler,
+ self.dgram_examine)
@reap_threads
def test_shutdown(self):
@@ -325,6 +292,27 @@ class MiscTestCase(unittest.TestCase):
expected.append(name)
self.assertCountEqual(socketserver.__all__, expected)
+ def test_shutdown_request_called_if_verify_request_false(self):
+ # Issue #26309: BaseServer should call shutdown_request even if
+ # verify_request is False
+
+ class MyServer(socketserver.TCPServer):
+ def verify_request(self, request, client_address):
+ return False
+
+ shutdown_called = 0
+ def shutdown_request(self, request):
+ self.shutdown_called += 1
+ socketserver.TCPServer.shutdown_request(self, request)
+
+ server = MyServer((HOST, 0), socketserver.StreamRequestHandler)
+ s = socket.socket(server.address_family, socket.SOCK_STREAM)
+ s.connect(server.server_address)
+ s.close()
+ server.handle_request()
+ self.assertEqual(server.shutdown_called, 1)
+ server.server_close()
+
if __name__ == "__main__":
unittest.main()