diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-02-01 21:49:59 (GMT) |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-02-01 21:49:59 (GMT) |
commit | 915bcb01110c7db65f8be9139bf887c749fbde75 (patch) | |
tree | fa24b947b19c1479ed581dc817c2e696386f3fb0 /Lib/asyncio/subprocess.py | |
parent | 153d97b24e7253f344860094eb2c98ed93657720 (diff) | |
download | cpython-915bcb01110c7db65f8be9139bf887c749fbde75.zip cpython-915bcb01110c7db65f8be9139bf887c749fbde75.tar.gz cpython-915bcb01110c7db65f8be9139bf887c749fbde75.tar.bz2 |
Issue #20400: Merge Tulip into Python: add the new asyncio.subprocess module
* Add a new asyncio.subprocess module
* Add new create_subprocess_exec() and create_subprocess_shell() functions
* The new asyncio.subprocess.SubprocessStreamProtocol creates stream readers
for stdout and stderr and a stream writer for stdin.
* The new asyncio.subprocess.Process class offers an API close to the
subprocess.Popen class:
- pid, returncode, stdin, stdout and stderr attributes
- communicate(), wait(), send_signal(), terminate() and kill() methods
* Remove STDIN (0), STDOUT (1) and STDERR (2) constants from base_subprocess
and unix_events, to not be confused with the symbols with the same name of
subprocess and asyncio.subprocess modules
* _ProactorBasePipeTransport.get_write_buffer_size() now counts also the size
of the pending write
* _ProactorBaseWritePipeTransport._loop_writing() may now pause the protocol if
the write buffer size is greater than the high water mark (64 KB by default)
Diffstat (limited to 'Lib/asyncio/subprocess.py')
-rw-r--r-- | Lib/asyncio/subprocess.py | 197 |
1 files changed, 197 insertions, 0 deletions
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py new file mode 100644 index 0000000..4312d44 --- /dev/null +++ b/Lib/asyncio/subprocess.py @@ -0,0 +1,197 @@ +__all__ = ['create_subprocess_exec', 'create_subprocess_shell'] + +import collections +import subprocess + +from . import events +from . import futures +from . import protocols +from . import streams +from . import tasks + + +PIPE = subprocess.PIPE +STDOUT = subprocess.STDOUT +DEVNULL = subprocess.DEVNULL + + +class SubprocessStreamProtocol(streams.FlowControlMixin, + protocols.SubprocessProtocol): + """Like StreamReaderProtocol, but for a subprocess.""" + + def __init__(self, limit, loop): + super().__init__(loop=loop) + self._limit = limit + self.stdin = self.stdout = self.stderr = None + self.waiter = futures.Future(loop=loop) + self._waiters = collections.deque() + self._transport = None + + def connection_made(self, transport): + self._transport = transport + if transport.get_pipe_transport(1): + self.stdout = streams.StreamReader(limit=self._limit, + loop=self._loop) + if transport.get_pipe_transport(2): + self.stderr = streams.StreamReader(limit=self._limit, + loop=self._loop) + stdin = transport.get_pipe_transport(0) + if stdin is not None: + self.stdin = streams.StreamWriter(stdin, + protocol=self, + reader=None, + loop=self._loop) + self.waiter.set_result(None) + + def pipe_data_received(self, fd, data): + if fd == 1: + reader = self.stdout + elif fd == 2: + reader = self.stderr + else: + reader = None + if reader is not None: + reader.feed_data(data) + + def pipe_connection_lost(self, fd, exc): + if fd == 0: + pipe = self.stdin + if pipe is not None: + pipe.close() + self.connection_lost(exc) + return + if fd == 1: + reader = self.stdout + elif fd == 2: + reader = self.stderr + else: + reader = None + if reader != None: + if exc is None: + reader.feed_eof() + else: + reader.set_exception(exc) + + def process_exited(self): + # wake up futures waiting for wait() + returncode = self._transport.get_returncode() + while self._waiters: + waiter = self._waiters.popleft() + waiter.set_result(returncode) + + +class Process: + def __init__(self, transport, protocol, loop): + self._transport = transport + self._protocol = protocol + self._loop = loop + self.stdin = protocol.stdin + self.stdout = protocol.stdout + self.stderr = protocol.stderr + self.pid = transport.get_pid() + + @property + def returncode(self): + return self._transport.get_returncode() + + @tasks.coroutine + def wait(self): + """Wait until the process exit and return the process return code.""" + returncode = self._transport.get_returncode() + if returncode is not None: + return returncode + + waiter = futures.Future(loop=self._loop) + self._protocol._waiters.append(waiter) + yield from waiter + return waiter.result() + + def get_subprocess(self): + return self._transport.get_extra_info('subprocess') + + def _check_alive(self): + if self._transport.get_returncode() is not None: + raise ProcessLookupError() + + def send_signal(self, signal): + self._check_alive() + self._transport.send_signal(signal) + + def terminate(self): + self._check_alive() + self._transport.terminate() + + def kill(self): + self._check_alive() + self._transport.kill() + + @tasks.coroutine + def _feed_stdin(self, input): + self.stdin.write(input) + yield from self.stdin.drain() + self.stdin.close() + + @tasks.coroutine + def _noop(self): + return None + + @tasks.coroutine + def _read_stream(self, fd): + transport = self._transport.get_pipe_transport(fd) + if fd == 2: + stream = self.stderr + else: + assert fd == 1 + stream = self.stdout + output = yield from stream.read() + transport.close() + return output + + @tasks.coroutine + def communicate(self, input=None): + loop = self._transport._loop + if input: + stdin = self._feed_stdin(input) + else: + stdin = self._noop() + if self.stdout is not None: + stdout = self._read_stream(1) + else: + stdout = self._noop() + if self.stderr is not None: + stderr = self._read_stream(2) + else: + stderr = self._noop() + stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr, + loop=loop) + yield from self.wait() + return (stdout, stderr) + + +@tasks.coroutine +def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, + loop=None, limit=streams._DEFAULT_LIMIT, **kwds): + if loop is None: + loop = events.get_event_loop() + protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, + loop=loop) + transport, protocol = yield from loop.subprocess_shell( + protocol_factory, + cmd, stdin=stdin, stdout=stdout, + stderr=stderr, **kwds) + yield from protocol.waiter + return Process(transport, protocol, loop) + +@tasks.coroutine +def create_subprocess_exec(*args, stdin=None, stdout=None, stderr=None, + loop=None, limit=streams._DEFAULT_LIMIT, **kwds): + if loop is None: + loop = events.get_event_loop() + protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, + loop=loop) + transport, protocol = yield from loop.subprocess_exec( + protocol_factory, + *args, stdin=stdin, stdout=stdout, + stderr=stderr, **kwds) + yield from protocol.waiter + return Process(transport, protocol, loop) |