diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-02-01 21:49:59 (GMT) |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-02-01 21:49:59 (GMT) |
commit | 915bcb01110c7db65f8be9139bf887c749fbde75 (patch) | |
tree | fa24b947b19c1479ed581dc817c2e696386f3fb0 /Lib/test/test_asyncio/test_subprocess.py | |
parent | 153d97b24e7253f344860094eb2c98ed93657720 (diff) | |
download | cpython-915bcb01110c7db65f8be9139bf887c749fbde75.zip cpython-915bcb01110c7db65f8be9139bf887c749fbde75.tar.gz cpython-915bcb01110c7db65f8be9139bf887c749fbde75.tar.bz2 |
Issue #20400: Merge Tulip into Python: add the new asyncio.subprocess module
* Add a new asyncio.subprocess module
* Add new create_subprocess_exec() and create_subprocess_shell() functions
* The new asyncio.subprocess.SubprocessStreamProtocol creates stream readers
for stdout and stderr and a stream writer for stdin.
* The new asyncio.subprocess.Process class offers an API close to the
subprocess.Popen class:
- pid, returncode, stdin, stdout and stderr attributes
- communicate(), wait(), send_signal(), terminate() and kill() methods
* Remove STDIN (0), STDOUT (1) and STDERR (2) constants from base_subprocess
and unix_events, to not be confused with the symbols with the same name of
subprocess and asyncio.subprocess modules
* _ProactorBasePipeTransport.get_write_buffer_size() now counts also the size
of the pending write
* _ProactorBaseWritePipeTransport._loop_writing() may now pause the protocol if
the write buffer size is greater than the high water mark (64 KB by default)
Diffstat (limited to 'Lib/test/test_asyncio/test_subprocess.py')
-rw-r--r-- | Lib/test/test_asyncio/test_subprocess.py | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py new file mode 100644 index 0000000..4f38cc7 --- /dev/null +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -0,0 +1,196 @@ +from asyncio import subprocess +import asyncio +import signal +import sys +import unittest +from test import support +if sys.platform != 'win32': + from asyncio import unix_events + +# Program exiting quickly +PROGRAM_EXIT_FAST = [sys.executable, '-c', 'pass'] + +# Program blocking +PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)'] + +# Program sleeping during 1 second +PROGRAM_SLEEP_1SEC = [sys.executable, '-c', 'import time; time.sleep(1)'] + +# Program copying input to output +PROGRAM_CAT = [ + sys.executable, '-c', + ';'.join(('import sys', + 'data = sys.stdin.buffer.read()', + 'sys.stdout.buffer.write(data)'))] + +class SubprocessMixin: + def test_stdin_stdout(self): + args = PROGRAM_CAT + + @asyncio.coroutine + def run(data): + proc = yield from asyncio.create_subprocess_exec( + *args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + loop=self.loop) + + # feed data + proc.stdin.write(data) + yield from proc.stdin.drain() + proc.stdin.close() + + # get output and exitcode + data = yield from proc.stdout.read() + exitcode = yield from proc.wait() + return (exitcode, data) + + task = run(b'some data') + task = asyncio.wait_for(task, 10.0, loop=self.loop) + exitcode, stdout = self.loop.run_until_complete(task) + self.assertEqual(exitcode, 0) + self.assertEqual(stdout, b'some data') + + def test_communicate(self): + args = PROGRAM_CAT + + @asyncio.coroutine + def run(data): + proc = yield from asyncio.create_subprocess_exec( + *args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + loop=self.loop) + stdout, stderr = yield from proc.communicate(data) + return proc.returncode, stdout + + task = run(b'some data') + task = asyncio.wait_for(task, 10.0, loop=self.loop) + exitcode, stdout = self.loop.run_until_complete(task) + self.assertEqual(exitcode, 0) + self.assertEqual(stdout, b'some data') + + def test_shell(self): + create = asyncio.create_subprocess_shell('exit 7', + loop=self.loop) + proc = self.loop.run_until_complete(create) + exitcode = self.loop.run_until_complete(proc.wait()) + self.assertEqual(exitcode, 7) + + def test_start_new_session(self): + # start the new process in a new session + create = asyncio.create_subprocess_shell('exit 8', + start_new_session=True, + loop=self.loop) + proc = self.loop.run_until_complete(create) + exitcode = self.loop.run_until_complete(proc.wait()) + self.assertEqual(exitcode, 8) + + def test_kill(self): + args = PROGRAM_BLOCKED + create = asyncio.create_subprocess_exec(*args, loop=self.loop) + proc = self.loop.run_until_complete(create) + proc.kill() + returncode = self.loop.run_until_complete(proc.wait()) + if sys.platform == 'win32': + self.assertIsInstance(returncode, int) + # expect 1 but sometimes get 0 + else: + self.assertEqual(-signal.SIGKILL, returncode) + + def test_terminate(self): + args = PROGRAM_BLOCKED + create = asyncio.create_subprocess_exec(*args, loop=self.loop) + proc = self.loop.run_until_complete(create) + proc.terminate() + returncode = self.loop.run_until_complete(proc.wait()) + if sys.platform == 'win32': + self.assertIsInstance(returncode, int) + # expect 1 but sometimes get 0 + else: + self.assertEqual(-signal.SIGTERM, returncode) + + @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") + def test_send_signal(self): + args = PROGRAM_BLOCKED + create = asyncio.create_subprocess_exec(*args, loop=self.loop) + proc = self.loop.run_until_complete(create) + proc.send_signal(signal.SIGHUP) + returncode = self.loop.run_until_complete(proc.wait()) + self.assertEqual(-signal.SIGHUP, returncode) + + def test_get_subprocess(self): + args = PROGRAM_EXIT_FAST + + @asyncio.coroutine + def run(): + proc = yield from asyncio.create_subprocess_exec(*args, + loop=self.loop) + yield from proc.wait() + + popen = proc.get_subprocess() + popen.wait() + return (proc, popen) + + proc, popen = self.loop.run_until_complete(run()) + self.assertEqual(popen.returncode, proc.returncode) + self.assertEqual(popen.pid, proc.pid) + + def test_broken_pipe(self): + large_data = b'x' * support.PIPE_MAX_SIZE + + create = asyncio.create_subprocess_exec( + *PROGRAM_SLEEP_1SEC, + stdin=subprocess.PIPE, + loop=self.loop) + proc = self.loop.run_until_complete(create) + with self.assertRaises(BrokenPipeError): + self.loop.run_until_complete(proc.communicate(large_data)) + self.loop.run_until_complete(proc.wait()) + + +if sys.platform != 'win32': + # Unix + class SubprocessWatcherMixin(SubprocessMixin): + Watcher = None + + def setUp(self): + policy = asyncio.get_event_loop_policy() + self.loop = policy.new_event_loop() + + # ensure that the event loop is passed explicitly in the code + policy.set_event_loop(None) + + watcher = self.Watcher() + watcher.attach_loop(self.loop) + policy.set_child_watcher(watcher) + + def tearDown(self): + policy = asyncio.get_event_loop_policy() + policy.set_child_watcher(None) + self.loop.close() + policy.set_event_loop(None) + + class SubprocessSafeWatcherTests(SubprocessWatcherMixin, unittest.TestCase): + Watcher = unix_events.SafeChildWatcher + + class SubprocessFastWatcherTests(SubprocessWatcherMixin, unittest.TestCase): + Watcher = unix_events.FastChildWatcher +else: + # Windows + class SubprocessProactorTests(SubprocessMixin, unittest.TestCase): + def setUp(self): + policy = asyncio.get_event_loop_policy() + self.loop = asyncio.ProactorEventLoop() + + # ensure that the event loop is passed explicitly in the code + policy.set_event_loop(None) + + def tearDown(self): + policy = asyncio.get_event_loop_policy() + self.loop.close() + policy.set_event_loop(None) + + +if __name__ == '__main__': + unittest.main() |