diff options
author | Victor Stinner <vstinner@python.org> | 2023-09-20 13:54:19 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-20 13:54:19 (GMT) |
commit | ced6924630037f1e5b3d1dbef2b600152fb07fbb (patch) | |
tree | 9dd1a37eb08179229e59319f3feba36161325486 | |
parent | 850cc8d0b1db0a912a6e458720e265e6a6e5c1ba (diff) | |
download | cpython-ced6924630037f1e5b3d1dbef2b600152fb07fbb.zip cpython-ced6924630037f1e5b3d1dbef2b600152fb07fbb.tar.gz cpython-ced6924630037f1e5b3d1dbef2b600152fb07fbb.tar.bz2 |
gh-108973: Fix asyncio test_subprocess_consistent_callbacks() (#109431)
SubprocessProtocol process_exited() method can be called before
pipe_data_received() and pipe_connection_lost() methods. Document it
and adapt the test for that.
Revert commit 282edd7b2a74c4dfe1bfe3c5b1d30f9c21d554d6.
_child_watcher_callback() calls immediately _process_exited(): don't
add an additional delay with call_soon(). The reverted change didn't
make _process_exited() more determistic: it can still be called
before pipe_connection_lost() for example.
Co-authored-by: Davide Rizzo <sorcio@gmail.com>
-rw-r--r-- | Doc/library/asyncio-llapi-index.rst | 10 | ||||
-rw-r--r-- | Doc/library/asyncio-protocol.rst | 19 | ||||
-rw-r--r-- | Lib/asyncio/unix_events.py | 3 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_subprocess.py | 54 |
4 files changed, 67 insertions, 19 deletions
diff --git a/Doc/library/asyncio-llapi-index.rst b/Doc/library/asyncio-llapi-index.rst index 9ce48a2..67136ba 100644 --- a/Doc/library/asyncio-llapi-index.rst +++ b/Doc/library/asyncio-llapi-index.rst @@ -484,19 +484,19 @@ Protocol classes can implement the following **callback methods**: :widths: 50 50 :class: full-width-table - * - ``callback`` :meth:`pipe_data_received() - <SubprocessProtocol.pipe_data_received>` + * - ``callback`` :meth:`~SubprocessProtocol.pipe_data_received` - Called when the child process writes data into its *stdout* or *stderr* pipe. - * - ``callback`` :meth:`pipe_connection_lost() - <SubprocessProtocol.pipe_connection_lost>` + * - ``callback`` :meth:`~SubprocessProtocol.pipe_connection_lost` - Called when one of the pipes communicating with the child process is closed. * - ``callback`` :meth:`process_exited() <SubprocessProtocol.process_exited>` - - Called when the child process has exited. + - Called when the child process has exited. It can be called before + :meth:`~SubprocessProtocol.pipe_data_received` and + :meth:`~SubprocessProtocol.pipe_connection_lost` methods. Event Loop Policies diff --git a/Doc/library/asyncio-protocol.rst b/Doc/library/asyncio-protocol.rst index 9781bda..3f734f5 100644 --- a/Doc/library/asyncio-protocol.rst +++ b/Doc/library/asyncio-protocol.rst @@ -708,6 +708,9 @@ factories passed to the :meth:`loop.subprocess_exec` and Called when the child process has exited. + It can be called before :meth:`~SubprocessProtocol.pipe_data_received` and + :meth:`~SubprocessProtocol.pipe_connection_lost` methods. + Examples ======== @@ -1003,12 +1006,26 @@ The subprocess is created by the :meth:`loop.subprocess_exec` method:: def __init__(self, exit_future): self.exit_future = exit_future self.output = bytearray() + self.pipe_closed = False + self.exited = False + + def pipe_connection_lost(self, fd, exc): + self.pipe_closed = True + self.check_for_exit() def pipe_data_received(self, fd, data): self.output.extend(data) def process_exited(self): - self.exit_future.set_result(True) + self.exited = True + # process_exited() method can be called before + # pipe_connection_lost() method: wait until both methods are + # called. + self.check_for_exit() + + def check_for_exit(self): + if self.pipe_closed and self.exited: + self.exit_future.set_result(True) async def get_date(): # Get a reference to the event loop as we plan to use diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index a268086..28cef96 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -226,8 +226,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): return transp def _child_watcher_callback(self, pid, returncode, transp): - # Skip one iteration for callbacks to be executed - self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode) + self.call_soon_threadsafe(transp._process_exited, returncode) async def create_unix_connection( self, protocol_factory, path=None, *, diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index eeeca40..429ef16 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -753,21 +753,44 @@ class SubprocessMixin: self.loop.run_until_complete(main()) - def test_subprocess_consistent_callbacks(self): + def test_subprocess_protocol_events(self): + # gh-108973: Test that all subprocess protocol methods are called. + # The protocol methods are not called in a determistic order. + # The order depends on the event loop and the operating system. events = [] + fds = [1, 2] + expected = [ + ('pipe_data_received', 1, b'stdout'), + ('pipe_data_received', 2, b'stderr'), + ('pipe_connection_lost', 1), + ('pipe_connection_lost', 2), + 'process_exited', + ] + per_fd_expected = [ + 'pipe_data_received', + 'pipe_connection_lost', + ] + class MyProtocol(asyncio.SubprocessProtocol): def __init__(self, exit_future: asyncio.Future) -> None: self.exit_future = exit_future def pipe_data_received(self, fd, data) -> None: events.append(('pipe_data_received', fd, data)) + self.exit_maybe() def pipe_connection_lost(self, fd, exc) -> None: - events.append('pipe_connection_lost') + events.append(('pipe_connection_lost', fd)) + self.exit_maybe() def process_exited(self) -> None: events.append('process_exited') - self.exit_future.set_result(True) + self.exit_maybe() + + def exit_maybe(self): + # Only exit when we got all expected events + if len(events) >= len(expected): + self.exit_future.set_result(True) async def main() -> None: loop = asyncio.get_running_loop() @@ -777,15 +800,24 @@ class SubprocessMixin: sys.executable, '-c', code, stdin=None) await exit_future transport.close() - self.assertEqual(events, [ - ('pipe_data_received', 1, b'stdout'), - ('pipe_data_received', 2, b'stderr'), - 'pipe_connection_lost', - 'pipe_connection_lost', - 'process_exited', - ]) - self.loop.run_until_complete(main()) + return events + + events = self.loop.run_until_complete(main()) + + # First, make sure that we received all events + self.assertSetEqual(set(events), set(expected)) + + # Second, check order of pipe events per file descriptor + per_fd_events = {fd: [] for fd in fds} + for event in events: + if event == 'process_exited': + continue + name, fd = event[:2] + per_fd_events[fd].append(name) + + for fd in fds: + self.assertEqual(per_fd_events[fd], per_fd_expected, (fd, events)) def test_subprocess_communicate_stdout(self): # See https://github.com/python/cpython/issues/100133 |