diff options
Diffstat (limited to 'Lib/test/test_asyncio/test_subprocess.py')
| -rw-r--r-- | Lib/test/test_asyncio/test_subprocess.py | 54 |
1 files changed, 44 insertions, 10 deletions
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 9bc60b9..6ba8894 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -4,6 +4,7 @@ import signal import sys import unittest import warnings +import functools from unittest import mock import asyncio @@ -30,6 +31,19 @@ PROGRAM_CAT = [ 'sys.stdout.buffer.write(data)'))] +@functools.cache +def _has_pidfd_support(): + if not hasattr(os, 'pidfd_open'): + return False + + try: + os.close(os.pidfd_open(os.getpid())) + except OSError: + return False + + return True + + def tearDownModule(): asyncio.set_event_loop_policy(None) @@ -708,17 +722,8 @@ if sys.platform != 'win32': Watcher = unix_events.FastChildWatcher - def has_pidfd_support(): - if not hasattr(os, 'pidfd_open'): - return False - try: - os.close(os.pidfd_open(os.getpid())) - except OSError: - return False - return True - @unittest.skipUnless( - has_pidfd_support(), + _has_pidfd_support(), "operating system does not support pidfds", ) class SubprocessPidfdWatcherTests(SubprocessWatcherMixin, @@ -751,6 +756,35 @@ if sys.platform != 'win32': mock.call.__exit__(RuntimeError, mock.ANY, mock.ANY), ]) + + @unittest.skipUnless( + _has_pidfd_support(), + "operating system does not support pidfds", + ) + def test_create_subprocess_with_pidfd(self): + async def in_thread(): + proc = await asyncio.create_subprocess_exec( + *PROGRAM_CAT, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + stdout, stderr = await proc.communicate(b"some data") + return proc.returncode, stdout + + async def main(): + # asyncio.Runner did not call asyncio.set_event_loop() + with self.assertRaises(RuntimeError): + asyncio.get_event_loop_policy().get_event_loop() + return await asyncio.to_thread(asyncio.run, in_thread()) + + asyncio.set_child_watcher(asyncio.PidfdChildWatcher()) + try: + with asyncio.Runner(loop_factory=asyncio.new_event_loop) as runner: + returncode, stdout = runner.run(main()) + self.assertEqual(returncode, 0) + self.assertEqual(stdout, b'some data') + finally: + asyncio.set_child_watcher(None) else: # Windows class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase): |
