summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-09-20 13:54:19 (GMT)
committerGitHub <noreply@github.com>2023-09-20 13:54:19 (GMT)
commitced6924630037f1e5b3d1dbef2b600152fb07fbb (patch)
tree9dd1a37eb08179229e59319f3feba36161325486
parent850cc8d0b1db0a912a6e458720e265e6a6e5c1ba (diff)
downloadcpython-ced6924630037f1e5b3d1dbef2b600152fb07fbb.zip
cpython-ced6924630037f1e5b3d1dbef2b600152fb07fbb.tar.gz
cpython-ced6924630037f1e5b3d1dbef2b600152fb07fbb.tar.bz2
gh-108973: Fix asyncio test_subprocess_consistent_callbacks() (#109431)
SubprocessProtocol process_exited() method can be called before pipe_data_received() and pipe_connection_lost() methods. Document it and adapt the test for that. Revert commit 282edd7b2a74c4dfe1bfe3c5b1d30f9c21d554d6. _child_watcher_callback() calls immediately _process_exited(): don't add an additional delay with call_soon(). The reverted change didn't make _process_exited() more determistic: it can still be called before pipe_connection_lost() for example. Co-authored-by: Davide Rizzo <sorcio@gmail.com>
-rw-r--r--Doc/library/asyncio-llapi-index.rst10
-rw-r--r--Doc/library/asyncio-protocol.rst19
-rw-r--r--Lib/asyncio/unix_events.py3
-rw-r--r--Lib/test/test_asyncio/test_subprocess.py54
4 files changed, 67 insertions, 19 deletions
diff --git a/Doc/library/asyncio-llapi-index.rst b/Doc/library/asyncio-llapi-index.rst
index 9ce48a2..67136ba 100644
--- a/Doc/library/asyncio-llapi-index.rst
+++ b/Doc/library/asyncio-llapi-index.rst
@@ -484,19 +484,19 @@ Protocol classes can implement the following **callback methods**:
:widths: 50 50
:class: full-width-table
- * - ``callback`` :meth:`pipe_data_received()
- <SubprocessProtocol.pipe_data_received>`
+ * - ``callback`` :meth:`~SubprocessProtocol.pipe_data_received`
- Called when the child process writes data into its
*stdout* or *stderr* pipe.
- * - ``callback`` :meth:`pipe_connection_lost()
- <SubprocessProtocol.pipe_connection_lost>`
+ * - ``callback`` :meth:`~SubprocessProtocol.pipe_connection_lost`
- Called when one of the pipes communicating with
the child process is closed.
* - ``callback`` :meth:`process_exited()
<SubprocessProtocol.process_exited>`
- - Called when the child process has exited.
+ - Called when the child process has exited. It can be called before
+ :meth:`~SubprocessProtocol.pipe_data_received` and
+ :meth:`~SubprocessProtocol.pipe_connection_lost` methods.
Event Loop Policies
diff --git a/Doc/library/asyncio-protocol.rst b/Doc/library/asyncio-protocol.rst
index 9781bda..3f734f5 100644
--- a/Doc/library/asyncio-protocol.rst
+++ b/Doc/library/asyncio-protocol.rst
@@ -708,6 +708,9 @@ factories passed to the :meth:`loop.subprocess_exec` and
Called when the child process has exited.
+ It can be called before :meth:`~SubprocessProtocol.pipe_data_received` and
+ :meth:`~SubprocessProtocol.pipe_connection_lost` methods.
+
Examples
========
@@ -1003,12 +1006,26 @@ The subprocess is created by the :meth:`loop.subprocess_exec` method::
def __init__(self, exit_future):
self.exit_future = exit_future
self.output = bytearray()
+ self.pipe_closed = False
+ self.exited = False
+
+ def pipe_connection_lost(self, fd, exc):
+ self.pipe_closed = True
+ self.check_for_exit()
def pipe_data_received(self, fd, data):
self.output.extend(data)
def process_exited(self):
- self.exit_future.set_result(True)
+ self.exited = True
+ # process_exited() method can be called before
+ # pipe_connection_lost() method: wait until both methods are
+ # called.
+ self.check_for_exit()
+
+ def check_for_exit(self):
+ if self.pipe_closed and self.exited:
+ self.exit_future.set_result(True)
async def get_date():
# Get a reference to the event loop as we plan to use
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index a268086..28cef96 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -226,8 +226,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
return transp
def _child_watcher_callback(self, pid, returncode, transp):
- # Skip one iteration for callbacks to be executed
- self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode)
+ self.call_soon_threadsafe(transp._process_exited, returncode)
async def create_unix_connection(
self, protocol_factory, path=None, *,
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
index eeeca40..429ef16 100644
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -753,21 +753,44 @@ class SubprocessMixin:
self.loop.run_until_complete(main())
- def test_subprocess_consistent_callbacks(self):
+ def test_subprocess_protocol_events(self):
+ # gh-108973: Test that all subprocess protocol methods are called.
+ # The protocol methods are not called in a determistic order.
+ # The order depends on the event loop and the operating system.
events = []
+ fds = [1, 2]
+ expected = [
+ ('pipe_data_received', 1, b'stdout'),
+ ('pipe_data_received', 2, b'stderr'),
+ ('pipe_connection_lost', 1),
+ ('pipe_connection_lost', 2),
+ 'process_exited',
+ ]
+ per_fd_expected = [
+ 'pipe_data_received',
+ 'pipe_connection_lost',
+ ]
+
class MyProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future: asyncio.Future) -> None:
self.exit_future = exit_future
def pipe_data_received(self, fd, data) -> None:
events.append(('pipe_data_received', fd, data))
+ self.exit_maybe()
def pipe_connection_lost(self, fd, exc) -> None:
- events.append('pipe_connection_lost')
+ events.append(('pipe_connection_lost', fd))
+ self.exit_maybe()
def process_exited(self) -> None:
events.append('process_exited')
- self.exit_future.set_result(True)
+ self.exit_maybe()
+
+ def exit_maybe(self):
+ # Only exit when we got all expected events
+ if len(events) >= len(expected):
+ self.exit_future.set_result(True)
async def main() -> None:
loop = asyncio.get_running_loop()
@@ -777,15 +800,24 @@ class SubprocessMixin:
sys.executable, '-c', code, stdin=None)
await exit_future
transport.close()
- self.assertEqual(events, [
- ('pipe_data_received', 1, b'stdout'),
- ('pipe_data_received', 2, b'stderr'),
- 'pipe_connection_lost',
- 'pipe_connection_lost',
- 'process_exited',
- ])
- self.loop.run_until_complete(main())
+ return events
+
+ events = self.loop.run_until_complete(main())
+
+ # First, make sure that we received all events
+ self.assertSetEqual(set(events), set(expected))
+
+ # Second, check order of pipe events per file descriptor
+ per_fd_events = {fd: [] for fd in fds}
+ for event in events:
+ if event == 'process_exited':
+ continue
+ name, fd = event[:2]
+ per_fd_events[fd].append(name)
+
+ for fd in fds:
+ self.assertEqual(per_fd_events[fd], per_fd_expected, (fd, events))
def test_subprocess_communicate_stdout(self):
# See https://github.com/python/cpython/issues/100133