diff options
Diffstat (limited to 'Lib/test/test_asyncio/test_subprocess.py')
-rw-r--r-- | Lib/test/test_asyncio/test_subprocess.py | 743 |
1 files changed, 0 insertions, 743 deletions
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py deleted file mode 100644 index a6c3acc..0000000 --- a/Lib/test/test_asyncio/test_subprocess.py +++ /dev/null @@ -1,743 +0,0 @@ -import os -import signal -import sys -import unittest -import warnings -from unittest import mock - -import asyncio -from asyncio import base_subprocess -from asyncio import subprocess -from test.test_asyncio import utils as test_utils -from test import support - -if sys.platform != 'win32': - from asyncio import unix_events - -# Program blocking -PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)'] - -# Program copying input to output -PROGRAM_CAT = [ - sys.executable, '-c', - ';'.join(('import sys', - 'data = sys.stdin.buffer.read()', - 'sys.stdout.buffer.write(data)'))] - - -def tearDownModule(): - asyncio.set_event_loop_policy(None) - - -class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport): - def _start(self, *args, **kwargs): - self._proc = mock.Mock() - self._proc.stdin = None - self._proc.stdout = None - self._proc.stderr = None - self._proc.pid = -1 - - -class SubprocessTransportTests(test_utils.TestCase): - def setUp(self): - super().setUp() - self.loop = self.new_test_loop() - self.set_event_loop(self.loop) - - def create_transport(self, waiter=None): - protocol = mock.Mock() - protocol.connection_made._is_coroutine = False - protocol.process_exited._is_coroutine = False - transport = TestSubprocessTransport( - self.loop, protocol, ['test'], False, - None, None, None, 0, waiter=waiter) - return (transport, protocol) - - def test_proc_exited(self): - waiter = self.loop.create_future() - transport, protocol = self.create_transport(waiter) - transport._process_exited(6) - self.loop.run_until_complete(waiter) - - self.assertEqual(transport.get_returncode(), 6) - - self.assertTrue(protocol.connection_made.called) - self.assertTrue(protocol.process_exited.called) - self.assertTrue(protocol.connection_lost.called) - self.assertEqual(protocol.connection_lost.call_args[0], (None,)) - - self.assertFalse(transport.is_closing()) - self.assertIsNone(transport._loop) - self.assertIsNone(transport._proc) - self.assertIsNone(transport._protocol) - - # methods must raise ProcessLookupError if the process exited - self.assertRaises(ProcessLookupError, - transport.send_signal, signal.SIGTERM) - self.assertRaises(ProcessLookupError, transport.terminate) - self.assertRaises(ProcessLookupError, transport.kill) - - transport.close() - - def test_subprocess_repr(self): - waiter = self.loop.create_future() - transport, protocol = self.create_transport(waiter) - transport._process_exited(6) - self.loop.run_until_complete(waiter) - - self.assertEqual( - repr(transport), - "<TestSubprocessTransport pid=-1 returncode=6>" - ) - transport._returncode = None - self.assertEqual( - repr(transport), - "<TestSubprocessTransport pid=-1 running>" - ) - transport._pid = None - transport._returncode = None - self.assertEqual( - repr(transport), - "<TestSubprocessTransport not started>" - ) - transport.close() - - -class SubprocessMixin: - - def test_stdin_stdout(self): - args = PROGRAM_CAT - - async def run(data): - proc = await asyncio.create_subprocess_exec( - *args, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - ) - - # feed data - proc.stdin.write(data) - await proc.stdin.drain() - proc.stdin.close() - - # get output and exitcode - data = await proc.stdout.read() - exitcode = await proc.wait() - return (exitcode, data) - - task = run(b'some data') - task = asyncio.wait_for(task, 60.0) - exitcode, stdout = self.loop.run_until_complete(task) - self.assertEqual(exitcode, 0) - self.assertEqual(stdout, b'some data') - - def test_communicate(self): - args = PROGRAM_CAT - - async def run(data): - proc = await asyncio.create_subprocess_exec( - *args, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - ) - stdout, stderr = await proc.communicate(data) - return proc.returncode, stdout - - task = run(b'some data') - task = asyncio.wait_for(task, support.LONG_TIMEOUT) - exitcode, stdout = self.loop.run_until_complete(task) - self.assertEqual(exitcode, 0) - self.assertEqual(stdout, b'some data') - - def test_shell(self): - proc = self.loop.run_until_complete( - asyncio.create_subprocess_shell('exit 7') - ) - exitcode = self.loop.run_until_complete(proc.wait()) - self.assertEqual(exitcode, 7) - - def test_start_new_session(self): - # start the new process in a new session - proc = self.loop.run_until_complete( - asyncio.create_subprocess_shell( - 'exit 8', - start_new_session=True, - ) - ) - exitcode = self.loop.run_until_complete(proc.wait()) - self.assertEqual(exitcode, 8) - - def test_kill(self): - args = PROGRAM_BLOCKED - proc = self.loop.run_until_complete( - asyncio.create_subprocess_exec(*args) - ) - proc.kill() - returncode = self.loop.run_until_complete(proc.wait()) - if sys.platform == 'win32': - self.assertIsInstance(returncode, int) - # expect 1 but sometimes get 0 - else: - self.assertEqual(-signal.SIGKILL, returncode) - - def test_terminate(self): - args = PROGRAM_BLOCKED - proc = self.loop.run_until_complete( - asyncio.create_subprocess_exec(*args) - ) - proc.terminate() - returncode = self.loop.run_until_complete(proc.wait()) - if sys.platform == 'win32': - self.assertIsInstance(returncode, int) - # expect 1 but sometimes get 0 - else: - self.assertEqual(-signal.SIGTERM, returncode) - - @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") - def test_send_signal(self): - # bpo-31034: Make sure that we get the default signal handler (killing - # the process). The parent process may have decided to ignore SIGHUP, - # and signal handlers are inherited. - old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL) - try: - code = 'import time; print("sleeping", flush=True); time.sleep(3600)' - args = [sys.executable, '-c', code] - proc = self.loop.run_until_complete( - asyncio.create_subprocess_exec( - *args, - stdout=subprocess.PIPE, - ) - ) - - async def send_signal(proc): - # basic synchronization to wait until the program is sleeping - line = await proc.stdout.readline() - self.assertEqual(line, b'sleeping\n') - - proc.send_signal(signal.SIGHUP) - returncode = await proc.wait() - return returncode - - returncode = self.loop.run_until_complete(send_signal(proc)) - self.assertEqual(-signal.SIGHUP, returncode) - finally: - signal.signal(signal.SIGHUP, old_handler) - - def prepare_broken_pipe_test(self): - # buffer large enough to feed the whole pipe buffer - large_data = b'x' * support.PIPE_MAX_SIZE - - # the program ends before the stdin can be feeded - proc = self.loop.run_until_complete( - asyncio.create_subprocess_exec( - sys.executable, '-c', 'pass', - stdin=subprocess.PIPE, - ) - ) - - return (proc, large_data) - - def test_stdin_broken_pipe(self): - proc, large_data = self.prepare_broken_pipe_test() - - async def write_stdin(proc, data): - await asyncio.sleep(0.5) - proc.stdin.write(data) - await proc.stdin.drain() - - coro = write_stdin(proc, large_data) - # drain() must raise BrokenPipeError or ConnectionResetError - with test_utils.disable_logger(): - self.assertRaises((BrokenPipeError, ConnectionResetError), - self.loop.run_until_complete, coro) - self.loop.run_until_complete(proc.wait()) - - def test_communicate_ignore_broken_pipe(self): - proc, large_data = self.prepare_broken_pipe_test() - - # communicate() must ignore BrokenPipeError when feeding stdin - self.loop.set_exception_handler(lambda loop, msg: None) - self.loop.run_until_complete(proc.communicate(large_data)) - self.loop.run_until_complete(proc.wait()) - - def test_pause_reading(self): - limit = 10 - size = (limit * 2 + 1) - - async def test_pause_reading(): - code = '\n'.join(( - 'import sys', - 'sys.stdout.write("x" * %s)' % size, - 'sys.stdout.flush()', - )) - - connect_read_pipe = self.loop.connect_read_pipe - - async def connect_read_pipe_mock(*args, **kw): - transport, protocol = await connect_read_pipe(*args, **kw) - transport.pause_reading = mock.Mock() - transport.resume_reading = mock.Mock() - return (transport, protocol) - - self.loop.connect_read_pipe = connect_read_pipe_mock - - proc = await asyncio.create_subprocess_exec( - sys.executable, '-c', code, - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - limit=limit, - ) - stdout_transport = proc._transport.get_pipe_transport(1) - - stdout, stderr = await proc.communicate() - - # The child process produced more than limit bytes of output, - # the stream reader transport should pause the protocol to not - # allocate too much memory. - return (stdout, stdout_transport) - - # Issue #22685: Ensure that the stream reader pauses the protocol - # when the child process produces too much data - stdout, transport = self.loop.run_until_complete(test_pause_reading()) - - self.assertEqual(stdout, b'x' * size) - self.assertTrue(transport.pause_reading.called) - self.assertTrue(transport.resume_reading.called) - - def test_stdin_not_inheritable(self): - # asyncio issue #209: stdin must not be inheritable, otherwise - # the Process.communicate() hangs - async def len_message(message): - code = 'import sys; data = sys.stdin.read(); print(len(data))' - proc = await asyncio.create_subprocess_exec( - sys.executable, '-c', code, - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - close_fds=False, - ) - stdout, stderr = await proc.communicate(message) - exitcode = await proc.wait() - return (stdout, exitcode) - - output, exitcode = self.loop.run_until_complete(len_message(b'abc')) - self.assertEqual(output.rstrip(), b'3') - self.assertEqual(exitcode, 0) - - def test_empty_input(self): - - async def empty_input(): - code = 'import sys; data = sys.stdin.read(); print(len(data))' - proc = await asyncio.create_subprocess_exec( - sys.executable, '-c', code, - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - close_fds=False, - ) - stdout, stderr = await proc.communicate(b'') - exitcode = await proc.wait() - return (stdout, exitcode) - - output, exitcode = self.loop.run_until_complete(empty_input()) - self.assertEqual(output.rstrip(), b'0') - self.assertEqual(exitcode, 0) - - def test_devnull_input(self): - - async def empty_input(): - code = 'import sys; data = sys.stdin.read(); print(len(data))' - proc = await asyncio.create_subprocess_exec( - sys.executable, '-c', code, - stdin=asyncio.subprocess.DEVNULL, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - close_fds=False, - ) - stdout, stderr = await proc.communicate() - exitcode = await proc.wait() - return (stdout, exitcode) - - output, exitcode = self.loop.run_until_complete(empty_input()) - self.assertEqual(output.rstrip(), b'0') - self.assertEqual(exitcode, 0) - - def test_devnull_output(self): - - async def empty_output(): - code = 'import sys; data = sys.stdin.read(); print(len(data))' - proc = await asyncio.create_subprocess_exec( - sys.executable, '-c', code, - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.DEVNULL, - stderr=asyncio.subprocess.PIPE, - close_fds=False, - ) - stdout, stderr = await proc.communicate(b"abc") - exitcode = await proc.wait() - return (stdout, exitcode) - - output, exitcode = self.loop.run_until_complete(empty_output()) - self.assertEqual(output, None) - self.assertEqual(exitcode, 0) - - def test_devnull_error(self): - - async def empty_error(): - code = 'import sys; data = sys.stdin.read(); print(len(data))' - proc = await asyncio.create_subprocess_exec( - sys.executable, '-c', code, - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.DEVNULL, - close_fds=False, - ) - stdout, stderr = await proc.communicate(b"abc") - exitcode = await proc.wait() - return (stderr, exitcode) - - output, exitcode = self.loop.run_until_complete(empty_error()) - self.assertEqual(output, None) - self.assertEqual(exitcode, 0) - - def test_cancel_process_wait(self): - # Issue #23140: cancel Process.wait() - - async def cancel_wait(): - proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED) - - # Create an internal future waiting on the process exit - task = self.loop.create_task(proc.wait()) - self.loop.call_soon(task.cancel) - try: - await task - except asyncio.CancelledError: - pass - - # Cancel the future - task.cancel() - - # Kill the process and wait until it is done - proc.kill() - await proc.wait() - - self.loop.run_until_complete(cancel_wait()) - - def test_cancel_make_subprocess_transport_exec(self): - - async def cancel_make_transport(): - coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED) - task = self.loop.create_task(coro) - - self.loop.call_soon(task.cancel) - try: - await task - except asyncio.CancelledError: - pass - - # ignore the log: - # "Exception during subprocess creation, kill the subprocess" - with test_utils.disable_logger(): - self.loop.run_until_complete(cancel_make_transport()) - - def test_cancel_post_init(self): - - async def cancel_make_transport(): - coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol, - *PROGRAM_BLOCKED) - task = self.loop.create_task(coro) - - self.loop.call_soon(task.cancel) - try: - await task - except asyncio.CancelledError: - pass - - # ignore the log: - # "Exception during subprocess creation, kill the subprocess" - with test_utils.disable_logger(): - self.loop.run_until_complete(cancel_make_transport()) - test_utils.run_briefly(self.loop) - - def test_close_kill_running(self): - - async def kill_running(): - create = self.loop.subprocess_exec(asyncio.SubprocessProtocol, - *PROGRAM_BLOCKED) - transport, protocol = await create - - kill_called = False - def kill(): - nonlocal kill_called - kill_called = True - orig_kill() - - proc = transport.get_extra_info('subprocess') - orig_kill = proc.kill - proc.kill = kill - returncode = transport.get_returncode() - transport.close() - await transport._wait() - return (returncode, kill_called) - - # Ignore "Close running child process: kill ..." log - with test_utils.disable_logger(): - returncode, killed = self.loop.run_until_complete(kill_running()) - self.assertIsNone(returncode) - - # transport.close() must kill the process if it is still running - self.assertTrue(killed) - test_utils.run_briefly(self.loop) - - def test_close_dont_kill_finished(self): - - async def kill_running(): - create = self.loop.subprocess_exec(asyncio.SubprocessProtocol, - *PROGRAM_BLOCKED) - transport, protocol = await create - proc = transport.get_extra_info('subprocess') - - # kill the process (but asyncio is not notified immediately) - proc.kill() - proc.wait() - - proc.kill = mock.Mock() - proc_returncode = proc.poll() - transport_returncode = transport.get_returncode() - transport.close() - return (proc_returncode, transport_returncode, proc.kill.called) - - # Ignore "Unknown child process pid ..." log of SafeChildWatcher, - # emitted because the test already consumes the exit status: - # proc.wait() - with test_utils.disable_logger(): - result = self.loop.run_until_complete(kill_running()) - test_utils.run_briefly(self.loop) - - proc_returncode, transport_return_code, killed = result - - self.assertIsNotNone(proc_returncode) - self.assertIsNone(transport_return_code) - - # transport.close() must not kill the process if it finished, even if - # the transport was not notified yet - self.assertFalse(killed) - - # Unlike SafeChildWatcher, FastChildWatcher does not pop the - # callbacks if waitpid() is called elsewhere. Let's clear them - # manually to avoid a warning when the watcher is detached. - if (sys.platform != 'win32' and - isinstance(self, SubprocessFastWatcherTests)): - asyncio.get_child_watcher()._callbacks.clear() - - async def _test_popen_error(self, stdin): - if sys.platform == 'win32': - target = 'asyncio.windows_utils.Popen' - else: - target = 'subprocess.Popen' - with mock.patch(target) as popen: - exc = ZeroDivisionError - popen.side_effect = exc - - with warnings.catch_warnings(record=True) as warns: - with self.assertRaises(exc): - await asyncio.create_subprocess_exec( - sys.executable, - '-c', - 'pass', - stdin=stdin - ) - self.assertEqual(warns, []) - - def test_popen_error(self): - # Issue #24763: check that the subprocess transport is closed - # when BaseSubprocessTransport fails - self.loop.run_until_complete(self._test_popen_error(stdin=None)) - - def test_popen_error_with_stdin_pipe(self): - # Issue #35721: check that newly created socket pair is closed when - # Popen fails - self.loop.run_until_complete( - self._test_popen_error(stdin=subprocess.PIPE)) - - def test_read_stdout_after_process_exit(self): - - async def execute(): - code = '\n'.join(['import sys', - 'for _ in range(64):', - ' sys.stdout.write("x" * 4096)', - 'sys.stdout.flush()', - 'sys.exit(1)']) - - process = await asyncio.create_subprocess_exec( - sys.executable, '-c', code, - stdout=asyncio.subprocess.PIPE, - ) - - while True: - data = await process.stdout.read(65536) - if data: - await asyncio.sleep(0.3) - else: - break - - self.loop.run_until_complete(execute()) - - def test_create_subprocess_exec_text_mode_fails(self): - async def execute(): - with self.assertRaises(ValueError): - await subprocess.create_subprocess_exec(sys.executable, - text=True) - - with self.assertRaises(ValueError): - await subprocess.create_subprocess_exec(sys.executable, - encoding="utf-8") - - with self.assertRaises(ValueError): - await subprocess.create_subprocess_exec(sys.executable, - errors="strict") - - self.loop.run_until_complete(execute()) - - def test_create_subprocess_shell_text_mode_fails(self): - - async def execute(): - with self.assertRaises(ValueError): - await subprocess.create_subprocess_shell(sys.executable, - text=True) - - with self.assertRaises(ValueError): - await subprocess.create_subprocess_shell(sys.executable, - encoding="utf-8") - - with self.assertRaises(ValueError): - await subprocess.create_subprocess_shell(sys.executable, - errors="strict") - - self.loop.run_until_complete(execute()) - - def test_create_subprocess_exec_with_path(self): - async def execute(): - p = await subprocess.create_subprocess_exec( - support.FakePath(sys.executable), '-c', 'pass') - await p.wait() - p = await subprocess.create_subprocess_exec( - sys.executable, '-c', 'pass', support.FakePath('.')) - await p.wait() - - self.assertIsNone(self.loop.run_until_complete(execute())) - - def test_exec_loop_deprecated(self): - async def go(): - with self.assertWarns(DeprecationWarning): - proc = await asyncio.create_subprocess_exec( - sys.executable, '-c', 'pass', - loop=self.loop, - ) - await proc.wait() - self.loop.run_until_complete(go()) - - def test_shell_loop_deprecated(self): - async def go(): - with self.assertWarns(DeprecationWarning): - proc = await asyncio.create_subprocess_shell( - "exit 0", - loop=self.loop, - ) - await proc.wait() - self.loop.run_until_complete(go()) - - -if sys.platform != 'win32': - # Unix - class SubprocessWatcherMixin(SubprocessMixin): - - Watcher = None - - def setUp(self): - super().setUp() - policy = asyncio.get_event_loop_policy() - self.loop = policy.new_event_loop() - self.set_event_loop(self.loop) - - watcher = self.Watcher() - watcher.attach_loop(self.loop) - policy.set_child_watcher(watcher) - - def tearDown(self): - super().tearDown() - policy = asyncio.get_event_loop_policy() - watcher = policy.get_child_watcher() - policy.set_child_watcher(None) - watcher.attach_loop(None) - watcher.close() - - class SubprocessThreadedWatcherTests(SubprocessWatcherMixin, - test_utils.TestCase): - - Watcher = unix_events.ThreadedChildWatcher - - class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin, - test_utils.TestCase): - - Watcher = unix_events.MultiLoopChildWatcher - - class SubprocessSafeWatcherTests(SubprocessWatcherMixin, - test_utils.TestCase): - - Watcher = unix_events.SafeChildWatcher - - class SubprocessFastWatcherTests(SubprocessWatcherMixin, - test_utils.TestCase): - - Watcher = unix_events.FastChildWatcher - - def has_pidfd_support(): - if not hasattr(os, 'pidfd_open'): - return False - try: - os.close(os.pidfd_open(os.getpid())) - except OSError: - return False - return True - - @unittest.skipUnless( - has_pidfd_support(), - "operating system does not support pidfds", - ) - class SubprocessPidfdWatcherTests(SubprocessWatcherMixin, - test_utils.TestCase): - Watcher = unix_events.PidfdChildWatcher - -else: - # Windows - class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase): - - def setUp(self): - super().setUp() - self.loop = asyncio.ProactorEventLoop() - self.set_event_loop(self.loop) - - -class GenericWatcherTests: - - def test_create_subprocess_fails_with_inactive_watcher(self): - - async def execute(): - watcher = mock.create_authspec(asyncio.AbstractChildWatcher) - watcher.is_active.return_value = False - asyncio.set_child_watcher(watcher) - - with self.assertRaises(RuntimeError): - await subprocess.create_subprocess_exec( - support.FakePath(sys.executable), '-c', 'pass') - - watcher.add_child_handler.assert_not_called() - - self.assertIsNone(self.loop.run_until_complete(execute())) - - - - -if __name__ == '__main__': - unittest.main() |