import asyncio import os import socket import time import threading import unittest from test.support import socket_helper from test.test_asyncio import utils as test_utils from test.test_asyncio import functional as func_tests def tearDownModule(): asyncio.set_event_loop_policy(None) class BaseStartServer(func_tests.FunctionalTestCaseMixin): def new_loop(self): raise NotImplementedError def test_start_server_1(self): HELLO_MSG = b'1' * 1024 * 5 + b'\n' def client(sock, addr): for i in range(10): time.sleep(0.2) if srv.is_serving(): break else: raise RuntimeError sock.settimeout(2) sock.connect(addr) sock.send(HELLO_MSG) sock.recv_all(1) sock.close() async def serve(reader, writer): await reader.readline() main_task.cancel() writer.write(b'1') writer.close() await writer.wait_closed() async def main(srv): async with srv: await srv.serve_forever() srv = self.loop.run_until_complete(asyncio.start_server( serve, socket_helper.HOSTv4, 0, start_serving=False)) self.assertFalse(srv.is_serving()) main_task = self.loop.create_task(main(srv)) addr = srv.sockets[0].getsockname() with self.assertRaises(asyncio.CancelledError): with self.tcp_client(lambda sock: client(sock, addr)): self.loop.run_until_complete(main_task) self.assertEqual(srv.sockets, ()) self.assertIsNone(srv._sockets) self.assertIsNone(srv._waiters) self.assertFalse(srv.is_serving()) with self.assertRaisesRegex(RuntimeError, r'is closed'): self.loop.run_until_complete(srv.serve_forever()) class SelectorStartServerTests(BaseStartServer, unittest.TestCase): def new_loop(self): return asyncio.SelectorEventLoop() @socket_helper.skip_unless_bind_unix_socket def test_start_unix_server_1(self): HELLO_MSG = b'1' * 1024 * 5 + b'\n' started = threading.Event() def client(sock, addr): sock.settimeout(2) started.wait(5) sock.connect(addr) sock.send(HELLO_MSG) sock.recv_all(1) sock.close() async def serve(reader, writer): await reader.readline() main_task.cancel() writer.write(b'1') writer.close() await writer.wait_closed() async def main(srv): async with srv: self.assertFalse(srv.is_serving()) await srv.start_serving() self.assertTrue(srv.is_serving()) started.set() await srv.serve_forever() with test_utils.unix_socket_path() as addr: srv = self.loop.run_until_complete(asyncio.start_unix_server( serve, addr, start_serving=False)) main_task = self.loop.create_task(main(srv)) with self.assertRaises(asyncio.CancelledError): with self.unix_client(lambda sock: client(sock, addr)): self.loop.run_until_complete(main_task) self.assertEqual(srv.sockets, ()) self.assertIsNone(srv._sockets) self.assertIsNone(srv._waiters) self.assertFalse(srv.is_serving()) with self.assertRaisesRegex(RuntimeError, r'is closed'): self.loop.run_until_complete(srv.serve_forever()) class TestServer2(unittest.IsolatedAsyncioTestCase): async def test_wait_closed_basic(self): async def serve(*args): pass srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) self.addCleanup(srv.close) # active count = 0, not closed: should block task1 = asyncio.create_task(srv.wait_closed()) await asyncio.sleep(0) self.assertFalse(task1.done()) # active count != 0, not closed: should block srv._attach() task2 = asyncio.create_task(srv.wait_closed()) await asyncio.sleep(0) self.assertFalse(task1.done()) self.assertFalse(task2.done()) srv.close() await asyncio.sleep(0) # active count != 0, closed: should block task3 = asyncio.create_task(srv.wait_closed()) await asyncio.sleep(0) self.assertFalse(task1.done()) self.assertFalse(task2.done()) self.assertFalse(task3.done()) srv._detach() # active count == 0, closed: should unblock await task1 await task2 await task3 await srv.wait_closed() # Return immediately async def test_wait_closed_race(self): # Test a regression in 3.12.0, should be fixed in 3.12.1 async def serve(*args): pass srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) self.addCleanup(srv.close) task = asyncio.create_task(srv.wait_closed()) await asyncio.sleep(0) self.assertFalse(task.done()) srv._attach() loop = asyncio.get_running_loop() loop.call_soon(srv.close) loop.call_soon(srv._detach) await srv.wait_closed() # Test the various corner cases of Unix server socket removal class UnixServerCleanupTests(unittest.IsolatedAsyncioTestCase): @socket_helper.skip_unless_bind_unix_socket async def test_unix_server_addr_cleanup(self): # Default scenario with test_utils.unix_socket_path() as addr: async def serve(*args): pass srv = await asyncio.start_unix_server(serve, addr) srv.close() self.assertFalse(os.path.exists(addr)) @socket_helper.skip_unless_bind_unix_socket async def test_unix_server_sock_cleanup(self): # Using already bound socket with test_utils.unix_socket_path() as addr: async def serve(*args): pass with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.bind(addr) srv = await asyncio.start_unix_server(serve, sock=sock) srv.close() self.assertFalse(os.path.exists(addr)) @socket_helper.skip_unless_bind_unix_socket async def test_unix_server_cleanup_gone(self): # Someone else has already cleaned up the socket with test_utils.unix_socket_path() as addr: async def serve(*args): pass with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.bind(addr) srv = await asyncio.start_unix_server(serve, sock=sock) os.unlink(addr) srv.close() @socket_helper.skip_unless_bind_unix_socket async def test_unix_server_cleanup_replaced(self): # Someone else has replaced the socket with their own with test_utils.unix_socket_path() as addr: async def serve(*args): pass srv = await asyncio.start_unix_server(serve, addr) os.unlink(addr) with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.bind(addr) srv.close() self.assertTrue(os.path.exists(addr)) @socket_helper.skip_unless_bind_unix_socket async def test_unix_server_cleanup_prevented(self): # Automatic cleanup explicitly disabled with test_utils.unix_socket_path() as addr: async def serve(*args): pass srv = await asyncio.start_unix_server(serve, addr, cleanup_socket=False) srv.close() self.assertTrue(os.path.exists(addr)) @unittest.skipUnless(hasattr(asyncio, 'ProactorEventLoop'), 'Windows only') class ProactorStartServerTests(BaseStartServer, unittest.TestCase): def new_loop(self): return asyncio.ProactorEventLoop() if __name__ == '__main__': unittest.main()