summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio/test_subprocess.py
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-02-01 21:49:59 (GMT)
committerVictor Stinner <victor.stinner@gmail.com>2014-02-01 21:49:59 (GMT)
commit915bcb01110c7db65f8be9139bf887c749fbde75 (patch)
treefa24b947b19c1479ed581dc817c2e696386f3fb0 /Lib/test/test_asyncio/test_subprocess.py
parent153d97b24e7253f344860094eb2c98ed93657720 (diff)
downloadcpython-915bcb01110c7db65f8be9139bf887c749fbde75.zip
cpython-915bcb01110c7db65f8be9139bf887c749fbde75.tar.gz
cpython-915bcb01110c7db65f8be9139bf887c749fbde75.tar.bz2
Issue #20400: Merge Tulip into Python: add the new asyncio.subprocess module
* Add a new asyncio.subprocess module * Add new create_subprocess_exec() and create_subprocess_shell() functions * The new asyncio.subprocess.SubprocessStreamProtocol creates stream readers for stdout and stderr and a stream writer for stdin. * The new asyncio.subprocess.Process class offers an API close to the subprocess.Popen class: - pid, returncode, stdin, stdout and stderr attributes - communicate(), wait(), send_signal(), terminate() and kill() methods * Remove STDIN (0), STDOUT (1) and STDERR (2) constants from base_subprocess and unix_events, to not be confused with the symbols with the same name of subprocess and asyncio.subprocess modules * _ProactorBasePipeTransport.get_write_buffer_size() now counts also the size of the pending write * _ProactorBaseWritePipeTransport._loop_writing() may now pause the protocol if the write buffer size is greater than the high water mark (64 KB by default)
Diffstat (limited to 'Lib/test/test_asyncio/test_subprocess.py')
-rw-r--r--Lib/test/test_asyncio/test_subprocess.py196
1 files changed, 196 insertions, 0 deletions
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
new file mode 100644
index 0000000..4f38cc7
--- /dev/null
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -0,0 +1,196 @@
+from asyncio import subprocess
+import asyncio
+import signal
+import sys
+import unittest
+from test import support
+if sys.platform != 'win32':
+ from asyncio import unix_events
+
+# Program exiting quickly
+PROGRAM_EXIT_FAST = [sys.executable, '-c', 'pass']
+
+# Program blocking
+PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
+
+# Program sleeping during 1 second
+PROGRAM_SLEEP_1SEC = [sys.executable, '-c', 'import time; time.sleep(1)']
+
+# Program copying input to output
+PROGRAM_CAT = [
+ sys.executable, '-c',
+ ';'.join(('import sys',
+ 'data = sys.stdin.buffer.read()',
+ 'sys.stdout.buffer.write(data)'))]
+
+class SubprocessMixin:
+ def test_stdin_stdout(self):
+ args = PROGRAM_CAT
+
+ @asyncio.coroutine
+ def run(data):
+ proc = yield from asyncio.create_subprocess_exec(
+ *args,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ loop=self.loop)
+
+ # feed data
+ proc.stdin.write(data)
+ yield from proc.stdin.drain()
+ proc.stdin.close()
+
+ # get output and exitcode
+ data = yield from proc.stdout.read()
+ exitcode = yield from proc.wait()
+ return (exitcode, data)
+
+ task = run(b'some data')
+ task = asyncio.wait_for(task, 10.0, loop=self.loop)
+ exitcode, stdout = self.loop.run_until_complete(task)
+ self.assertEqual(exitcode, 0)
+ self.assertEqual(stdout, b'some data')
+
+ def test_communicate(self):
+ args = PROGRAM_CAT
+
+ @asyncio.coroutine
+ def run(data):
+ proc = yield from asyncio.create_subprocess_exec(
+ *args,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ loop=self.loop)
+ stdout, stderr = yield from proc.communicate(data)
+ return proc.returncode, stdout
+
+ task = run(b'some data')
+ task = asyncio.wait_for(task, 10.0, loop=self.loop)
+ exitcode, stdout = self.loop.run_until_complete(task)
+ self.assertEqual(exitcode, 0)
+ self.assertEqual(stdout, b'some data')
+
+ def test_shell(self):
+ create = asyncio.create_subprocess_shell('exit 7',
+ loop=self.loop)
+ proc = self.loop.run_until_complete(create)
+ exitcode = self.loop.run_until_complete(proc.wait())
+ self.assertEqual(exitcode, 7)
+
+ def test_start_new_session(self):
+ # start the new process in a new session
+ create = asyncio.create_subprocess_shell('exit 8',
+ start_new_session=True,
+ loop=self.loop)
+ proc = self.loop.run_until_complete(create)
+ exitcode = self.loop.run_until_complete(proc.wait())
+ self.assertEqual(exitcode, 8)
+
+ def test_kill(self):
+ args = PROGRAM_BLOCKED
+ create = asyncio.create_subprocess_exec(*args, loop=self.loop)
+ proc = self.loop.run_until_complete(create)
+ proc.kill()
+ returncode = self.loop.run_until_complete(proc.wait())
+ if sys.platform == 'win32':
+ self.assertIsInstance(returncode, int)
+ # expect 1 but sometimes get 0
+ else:
+ self.assertEqual(-signal.SIGKILL, returncode)
+
+ def test_terminate(self):
+ args = PROGRAM_BLOCKED
+ create = asyncio.create_subprocess_exec(*args, loop=self.loop)
+ proc = self.loop.run_until_complete(create)
+ proc.terminate()
+ returncode = self.loop.run_until_complete(proc.wait())
+ if sys.platform == 'win32':
+ self.assertIsInstance(returncode, int)
+ # expect 1 but sometimes get 0
+ else:
+ self.assertEqual(-signal.SIGTERM, returncode)
+
+ @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
+ def test_send_signal(self):
+ args = PROGRAM_BLOCKED
+ create = asyncio.create_subprocess_exec(*args, loop=self.loop)
+ proc = self.loop.run_until_complete(create)
+ proc.send_signal(signal.SIGHUP)
+ returncode = self.loop.run_until_complete(proc.wait())
+ self.assertEqual(-signal.SIGHUP, returncode)
+
+ def test_get_subprocess(self):
+ args = PROGRAM_EXIT_FAST
+
+ @asyncio.coroutine
+ def run():
+ proc = yield from asyncio.create_subprocess_exec(*args,
+ loop=self.loop)
+ yield from proc.wait()
+
+ popen = proc.get_subprocess()
+ popen.wait()
+ return (proc, popen)
+
+ proc, popen = self.loop.run_until_complete(run())
+ self.assertEqual(popen.returncode, proc.returncode)
+ self.assertEqual(popen.pid, proc.pid)
+
+ def test_broken_pipe(self):
+ large_data = b'x' * support.PIPE_MAX_SIZE
+
+ create = asyncio.create_subprocess_exec(
+ *PROGRAM_SLEEP_1SEC,
+ stdin=subprocess.PIPE,
+ loop=self.loop)
+ proc = self.loop.run_until_complete(create)
+ with self.assertRaises(BrokenPipeError):
+ self.loop.run_until_complete(proc.communicate(large_data))
+ self.loop.run_until_complete(proc.wait())
+
+
+if sys.platform != 'win32':
+ # Unix
+ class SubprocessWatcherMixin(SubprocessMixin):
+ Watcher = None
+
+ def setUp(self):
+ policy = asyncio.get_event_loop_policy()
+ self.loop = policy.new_event_loop()
+
+ # ensure that the event loop is passed explicitly in the code
+ policy.set_event_loop(None)
+
+ watcher = self.Watcher()
+ watcher.attach_loop(self.loop)
+ policy.set_child_watcher(watcher)
+
+ def tearDown(self):
+ policy = asyncio.get_event_loop_policy()
+ policy.set_child_watcher(None)
+ self.loop.close()
+ policy.set_event_loop(None)
+
+ class SubprocessSafeWatcherTests(SubprocessWatcherMixin, unittest.TestCase):
+ Watcher = unix_events.SafeChildWatcher
+
+ class SubprocessFastWatcherTests(SubprocessWatcherMixin, unittest.TestCase):
+ Watcher = unix_events.FastChildWatcher
+else:
+ # Windows
+ class SubprocessProactorTests(SubprocessMixin, unittest.TestCase):
+ def setUp(self):
+ policy = asyncio.get_event_loop_policy()
+ self.loop = asyncio.ProactorEventLoop()
+
+ # ensure that the event loop is passed explicitly in the code
+ policy.set_event_loop(None)
+
+ def tearDown(self):
+ policy = asyncio.get_event_loop_policy()
+ self.loop.close()
+ policy.set_event_loop(None)
+
+
+if __name__ == '__main__':
+ unittest.main()