diff options
Diffstat (limited to 'Lib/asyncio/subprocess.py')
-rw-r--r-- | Lib/asyncio/subprocess.py | 64 |
1 files changed, 21 insertions, 43 deletions
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index bddfb01..c9506b1 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -19,16 +19,14 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, protocols.SubprocessProtocol): """Like StreamReaderProtocol, but for a subprocess.""" - def __init__(self, limit, loop, *, _asyncio_internal=False): - super().__init__(loop=loop, _asyncio_internal=_asyncio_internal) + def __init__(self, limit, loop): + super().__init__(loop=loop) self._limit = limit self.stdin = self.stdout = self.stderr = None self._transport = None self._process_exited = False self._pipe_fds = [] self._stdin_closed = self._loop.create_future() - self._stdout_closed = self._loop.create_future() - self._stderr_closed = self._loop.create_future() def __repr__(self): info = [self.__class__.__name__] @@ -42,35 +40,27 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, def connection_made(self, transport): self._transport = transport + stdout_transport = transport.get_pipe_transport(1) if stdout_transport is not None: - self.stdout = streams.Stream(mode=streams.StreamMode.READ, - transport=stdout_transport, - protocol=self, - limit=self._limit, - loop=self._loop, - _asyncio_internal=True) - self.stdout._set_transport(stdout_transport) + self.stdout = streams.StreamReader(limit=self._limit, + loop=self._loop) + self.stdout.set_transport(stdout_transport) self._pipe_fds.append(1) stderr_transport = transport.get_pipe_transport(2) if stderr_transport is not None: - self.stderr = streams.Stream(mode=streams.StreamMode.READ, - transport=stderr_transport, - protocol=self, - limit=self._limit, - loop=self._loop, - _asyncio_internal=True) - self.stderr._set_transport(stderr_transport) + self.stderr = streams.StreamReader(limit=self._limit, + loop=self._loop) + self.stderr.set_transport(stderr_transport) self._pipe_fds.append(2) stdin_transport = transport.get_pipe_transport(0) if stdin_transport is not None: - self.stdin = streams.Stream(mode=streams.StreamMode.WRITE, - transport=stdin_transport, - protocol=self, - loop=self._loop, - _asyncio_internal=True) + self.stdin = streams.StreamWriter(stdin_transport, + protocol=self, + reader=None, + loop=self._loop) def pipe_data_received(self, fd, data): if fd == 1: @@ -80,7 +70,7 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, else: reader = None if reader is not None: - reader._feed_data(data) + reader.feed_data(data) def pipe_connection_lost(self, fd, exc): if fd == 0: @@ -101,9 +91,9 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, reader = None if reader is not None: if exc is None: - reader._feed_eof() + reader.feed_eof() else: - reader._set_exception(exc) + reader.set_exception(exc) if fd in self._pipe_fds: self._pipe_fds.remove(fd) @@ -121,20 +111,10 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, def _get_close_waiter(self, stream): if stream is self.stdin: return self._stdin_closed - elif stream is self.stdout: - return self._stdout_closed - elif stream is self.stderr: - return self._stderr_closed class Process: - def __init__(self, transport, protocol, loop, *, _asyncio_internal=False): - if not _asyncio_internal: - warnings.warn(f"{self.__class__} should be instantiated " - "by asyncio internals only, " - "please avoid its creation from user code", - DeprecationWarning) - + def __init__(self, transport, protocol, loop): self._transport = transport self._protocol = protocol self._loop = loop @@ -232,13 +212,12 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, ) protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, - loop=loop, - _asyncio_internal=True) + loop=loop) transport, protocol = await loop.subprocess_shell( protocol_factory, cmd, stdin=stdin, stdout=stdout, stderr=stderr, **kwds) - return Process(transport, protocol, loop, _asyncio_internal=True) + return Process(transport, protocol, loop) async def create_subprocess_exec(program, *args, stdin=None, stdout=None, @@ -253,11 +232,10 @@ async def create_subprocess_exec(program, *args, stdin=None, stdout=None, stacklevel=2 ) protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, - loop=loop, - _asyncio_internal=True) + loop=loop) transport, protocol = await loop.subprocess_exec( protocol_factory, program, *args, stdin=stdin, stdout=stdout, stderr=stderr, **kwds) - return Process(transport, protocol, loop, _asyncio_internal=True) + return Process(transport, protocol, loop) |