summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/subprocess.py
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-02-01 21:49:59 (GMT)
committerVictor Stinner <victor.stinner@gmail.com>2014-02-01 21:49:59 (GMT)
commit915bcb01110c7db65f8be9139bf887c749fbde75 (patch)
treefa24b947b19c1479ed581dc817c2e696386f3fb0 /Lib/asyncio/subprocess.py
parent153d97b24e7253f344860094eb2c98ed93657720 (diff)
downloadcpython-915bcb01110c7db65f8be9139bf887c749fbde75.zip
cpython-915bcb01110c7db65f8be9139bf887c749fbde75.tar.gz
cpython-915bcb01110c7db65f8be9139bf887c749fbde75.tar.bz2
Issue #20400: Merge Tulip into Python: add the new asyncio.subprocess module
* Add a new asyncio.subprocess module * Add new create_subprocess_exec() and create_subprocess_shell() functions * The new asyncio.subprocess.SubprocessStreamProtocol creates stream readers for stdout and stderr and a stream writer for stdin. * The new asyncio.subprocess.Process class offers an API close to the subprocess.Popen class: - pid, returncode, stdin, stdout and stderr attributes - communicate(), wait(), send_signal(), terminate() and kill() methods * Remove STDIN (0), STDOUT (1) and STDERR (2) constants from base_subprocess and unix_events, to not be confused with the symbols with the same name of subprocess and asyncio.subprocess modules * _ProactorBasePipeTransport.get_write_buffer_size() now counts also the size of the pending write * _ProactorBaseWritePipeTransport._loop_writing() may now pause the protocol if the write buffer size is greater than the high water mark (64 KB by default)
Diffstat (limited to 'Lib/asyncio/subprocess.py')
-rw-r--r--Lib/asyncio/subprocess.py197
1 files changed, 197 insertions, 0 deletions
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
new file mode 100644
index 0000000..4312d44
--- /dev/null
+++ b/Lib/asyncio/subprocess.py
@@ -0,0 +1,197 @@
+__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
+
+import collections
+import subprocess
+
+from . import events
+from . import futures
+from . import protocols
+from . import streams
+from . import tasks
+
+
+PIPE = subprocess.PIPE
+STDOUT = subprocess.STDOUT
+DEVNULL = subprocess.DEVNULL
+
+
+class SubprocessStreamProtocol(streams.FlowControlMixin,
+ protocols.SubprocessProtocol):
+ """Like StreamReaderProtocol, but for a subprocess."""
+
+ def __init__(self, limit, loop):
+ super().__init__(loop=loop)
+ self._limit = limit
+ self.stdin = self.stdout = self.stderr = None
+ self.waiter = futures.Future(loop=loop)
+ self._waiters = collections.deque()
+ self._transport = None
+
+ def connection_made(self, transport):
+ self._transport = transport
+ if transport.get_pipe_transport(1):
+ self.stdout = streams.StreamReader(limit=self._limit,
+ loop=self._loop)
+ if transport.get_pipe_transport(2):
+ self.stderr = streams.StreamReader(limit=self._limit,
+ loop=self._loop)
+ stdin = transport.get_pipe_transport(0)
+ if stdin is not None:
+ self.stdin = streams.StreamWriter(stdin,
+ protocol=self,
+ reader=None,
+ loop=self._loop)
+ self.waiter.set_result(None)
+
+ def pipe_data_received(self, fd, data):
+ if fd == 1:
+ reader = self.stdout
+ elif fd == 2:
+ reader = self.stderr
+ else:
+ reader = None
+ if reader is not None:
+ reader.feed_data(data)
+
+ def pipe_connection_lost(self, fd, exc):
+ if fd == 0:
+ pipe = self.stdin
+ if pipe is not None:
+ pipe.close()
+ self.connection_lost(exc)
+ return
+ if fd == 1:
+ reader = self.stdout
+ elif fd == 2:
+ reader = self.stderr
+ else:
+ reader = None
+ if reader != None:
+ if exc is None:
+ reader.feed_eof()
+ else:
+ reader.set_exception(exc)
+
+ def process_exited(self):
+ # wake up futures waiting for wait()
+ returncode = self._transport.get_returncode()
+ while self._waiters:
+ waiter = self._waiters.popleft()
+ waiter.set_result(returncode)
+
+
+class Process:
+ def __init__(self, transport, protocol, loop):
+ self._transport = transport
+ self._protocol = protocol
+ self._loop = loop
+ self.stdin = protocol.stdin
+ self.stdout = protocol.stdout
+ self.stderr = protocol.stderr
+ self.pid = transport.get_pid()
+
+ @property
+ def returncode(self):
+ return self._transport.get_returncode()
+
+ @tasks.coroutine
+ def wait(self):
+ """Wait until the process exit and return the process return code."""
+ returncode = self._transport.get_returncode()
+ if returncode is not None:
+ return returncode
+
+ waiter = futures.Future(loop=self._loop)
+ self._protocol._waiters.append(waiter)
+ yield from waiter
+ return waiter.result()
+
+ def get_subprocess(self):
+ return self._transport.get_extra_info('subprocess')
+
+ def _check_alive(self):
+ if self._transport.get_returncode() is not None:
+ raise ProcessLookupError()
+
+ def send_signal(self, signal):
+ self._check_alive()
+ self._transport.send_signal(signal)
+
+ def terminate(self):
+ self._check_alive()
+ self._transport.terminate()
+
+ def kill(self):
+ self._check_alive()
+ self._transport.kill()
+
+ @tasks.coroutine
+ def _feed_stdin(self, input):
+ self.stdin.write(input)
+ yield from self.stdin.drain()
+ self.stdin.close()
+
+ @tasks.coroutine
+ def _noop(self):
+ return None
+
+ @tasks.coroutine
+ def _read_stream(self, fd):
+ transport = self._transport.get_pipe_transport(fd)
+ if fd == 2:
+ stream = self.stderr
+ else:
+ assert fd == 1
+ stream = self.stdout
+ output = yield from stream.read()
+ transport.close()
+ return output
+
+ @tasks.coroutine
+ def communicate(self, input=None):
+ loop = self._transport._loop
+ if input:
+ stdin = self._feed_stdin(input)
+ else:
+ stdin = self._noop()
+ if self.stdout is not None:
+ stdout = self._read_stream(1)
+ else:
+ stdout = self._noop()
+ if self.stderr is not None:
+ stderr = self._read_stream(2)
+ else:
+ stderr = self._noop()
+ stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
+ loop=loop)
+ yield from self.wait()
+ return (stdout, stderr)
+
+
+@tasks.coroutine
+def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
+ loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
+ if loop is None:
+ loop = events.get_event_loop()
+ protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
+ loop=loop)
+ transport, protocol = yield from loop.subprocess_shell(
+ protocol_factory,
+ cmd, stdin=stdin, stdout=stdout,
+ stderr=stderr, **kwds)
+ yield from protocol.waiter
+ return Process(transport, protocol, loop)
+
+@tasks.coroutine
+def create_subprocess_exec(*args, stdin=None, stdout=None, stderr=None,
+ loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
+ if loop is None:
+ loop = events.get_event_loop()
+ protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
+ loop=loop)
+ transport, protocol = yield from loop.subprocess_exec(
+ protocol_factory,
+ *args, stdin=stdin, stdout=stdout,
+ stderr=stderr, **kwds)
+ yield from protocol.waiter
+ return Process(transport, protocol, loop)