diff options
Diffstat (limited to 'Lib/asyncio/subprocess.py')
-rw-r--r-- | Lib/asyncio/subprocess.py | 241 |
1 files changed, 0 insertions, 241 deletions
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py deleted file mode 100644 index c9506b1..0000000 --- a/Lib/asyncio/subprocess.py +++ /dev/null @@ -1,241 +0,0 @@ -__all__ = 'create_subprocess_exec', 'create_subprocess_shell' - -import subprocess -import warnings - -from . import events -from . import protocols -from . import streams -from . import tasks -from .log import logger - - -PIPE = subprocess.PIPE -STDOUT = subprocess.STDOUT -DEVNULL = subprocess.DEVNULL - - -class SubprocessStreamProtocol(streams.FlowControlMixin, - protocols.SubprocessProtocol): - """Like StreamReaderProtocol, but for a subprocess.""" - - def __init__(self, limit, loop): - super().__init__(loop=loop) - self._limit = limit - self.stdin = self.stdout = self.stderr = None - self._transport = None - self._process_exited = False - self._pipe_fds = [] - self._stdin_closed = self._loop.create_future() - - def __repr__(self): - info = [self.__class__.__name__] - if self.stdin is not None: - info.append(f'stdin={self.stdin!r}') - if self.stdout is not None: - info.append(f'stdout={self.stdout!r}') - if self.stderr is not None: - info.append(f'stderr={self.stderr!r}') - return '<{}>'.format(' '.join(info)) - - def connection_made(self, transport): - self._transport = transport - - stdout_transport = transport.get_pipe_transport(1) - if stdout_transport is not None: - self.stdout = streams.StreamReader(limit=self._limit, - loop=self._loop) - self.stdout.set_transport(stdout_transport) - self._pipe_fds.append(1) - - stderr_transport = transport.get_pipe_transport(2) - if stderr_transport is not None: - self.stderr = streams.StreamReader(limit=self._limit, - loop=self._loop) - self.stderr.set_transport(stderr_transport) - self._pipe_fds.append(2) - - stdin_transport = transport.get_pipe_transport(0) - if stdin_transport is not None: - self.stdin = streams.StreamWriter(stdin_transport, - protocol=self, - reader=None, - loop=self._loop) - - def pipe_data_received(self, fd, data): - if fd == 1: - reader = self.stdout - elif fd == 2: - reader = self.stderr - else: - reader = None - if reader is not None: - reader.feed_data(data) - - def pipe_connection_lost(self, fd, exc): - if fd == 0: - pipe = self.stdin - if pipe is not None: - pipe.close() - self.connection_lost(exc) - if exc is None: - self._stdin_closed.set_result(None) - else: - self._stdin_closed.set_exception(exc) - return - if fd == 1: - reader = self.stdout - elif fd == 2: - reader = self.stderr - else: - reader = None - if reader is not None: - if exc is None: - reader.feed_eof() - else: - reader.set_exception(exc) - - if fd in self._pipe_fds: - self._pipe_fds.remove(fd) - self._maybe_close_transport() - - def process_exited(self): - self._process_exited = True - self._maybe_close_transport() - - def _maybe_close_transport(self): - if len(self._pipe_fds) == 0 and self._process_exited: - self._transport.close() - self._transport = None - - def _get_close_waiter(self, stream): - if stream is self.stdin: - return self._stdin_closed - - -class Process: - def __init__(self, transport, protocol, loop): - self._transport = transport - self._protocol = protocol - self._loop = loop - self.stdin = protocol.stdin - self.stdout = protocol.stdout - self.stderr = protocol.stderr - self.pid = transport.get_pid() - - def __repr__(self): - return f'<{self.__class__.__name__} {self.pid}>' - - @property - def returncode(self): - return self._transport.get_returncode() - - async def wait(self): - """Wait until the process exit and return the process return code.""" - return await self._transport._wait() - - def send_signal(self, signal): - self._transport.send_signal(signal) - - def terminate(self): - self._transport.terminate() - - def kill(self): - self._transport.kill() - - async def _feed_stdin(self, input): - debug = self._loop.get_debug() - self.stdin.write(input) - if debug: - logger.debug( - '%r communicate: feed stdin (%s bytes)', self, len(input)) - try: - await self.stdin.drain() - except (BrokenPipeError, ConnectionResetError) as exc: - # communicate() ignores BrokenPipeError and ConnectionResetError - if debug: - logger.debug('%r communicate: stdin got %r', self, exc) - - if debug: - logger.debug('%r communicate: close stdin', self) - self.stdin.close() - - async def _noop(self): - return None - - async def _read_stream(self, fd): - transport = self._transport.get_pipe_transport(fd) - if fd == 2: - stream = self.stderr - else: - assert fd == 1 - stream = self.stdout - if self._loop.get_debug(): - name = 'stdout' if fd == 1 else 'stderr' - logger.debug('%r communicate: read %s', self, name) - output = await stream.read() - if self._loop.get_debug(): - name = 'stdout' if fd == 1 else 'stderr' - logger.debug('%r communicate: close %s', self, name) - transport.close() - return output - - async def communicate(self, input=None): - if input is not None: - stdin = self._feed_stdin(input) - else: - stdin = self._noop() - if self.stdout is not None: - stdout = self._read_stream(1) - else: - stdout = self._noop() - if self.stderr is not None: - stderr = self._read_stream(2) - else: - stderr = self._noop() - stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr, - loop=self._loop) - await self.wait() - return (stdout, stderr) - - -async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, - loop=None, limit=streams._DEFAULT_LIMIT, - **kwds): - if loop is None: - loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8 " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, - stacklevel=2 - ) - - protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, - loop=loop) - transport, protocol = await loop.subprocess_shell( - protocol_factory, - cmd, stdin=stdin, stdout=stdout, - stderr=stderr, **kwds) - return Process(transport, protocol, loop) - - -async def create_subprocess_exec(program, *args, stdin=None, stdout=None, - stderr=None, loop=None, - limit=streams._DEFAULT_LIMIT, **kwds): - if loop is None: - loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8 " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, - stacklevel=2 - ) - protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, - loop=loop) - transport, protocol = await loop.subprocess_exec( - protocol_factory, - program, *args, - stdin=stdin, stdout=stdout, - stderr=stderr, **kwds) - return Process(transport, protocol, loop) |