summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/asyncio/base_subprocess.py108
-rw-r--r--Lib/asyncio/subprocess.py40
-rw-r--r--Lib/asyncio/unix_events.py15
-rw-r--r--Lib/asyncio/windows_events.py7
-rw-r--r--Lib/test/test_asyncio/test_events.py35
-rw-r--r--Lib/test/test_asyncio/test_subprocess.py65
6 files changed, 166 insertions, 104 deletions
diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
index 651a9a2..001f9b8 100644
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -3,6 +3,7 @@ import subprocess
import sys
import warnings
+from . import futures
from . import protocols
from . import transports
from .coroutines import coroutine
@@ -13,27 +14,32 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def __init__(self, loop, protocol, args, shell,
stdin, stdout, stderr, bufsize,
- extra=None, **kwargs):
+ waiter=None, extra=None, **kwargs):
super().__init__(extra)
self._closed = False
self._protocol = protocol
self._loop = loop
+ self._proc = None
self._pid = None
-
+ self._returncode = None
+ self._exit_waiters = []
+ self._pending_calls = collections.deque()
self._pipes = {}
+ self._finished = False
+
if stdin == subprocess.PIPE:
self._pipes[0] = None
if stdout == subprocess.PIPE:
self._pipes[1] = None
if stderr == subprocess.PIPE:
self._pipes[2] = None
- self._pending_calls = collections.deque()
- self._finished = False
- self._returncode = None
+
+ # Create the child process: set the _proc attribute
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr, bufsize=bufsize, **kwargs)
self._pid = self._proc.pid
self._extra['subprocess'] = self._proc
+
if self._loop.get_debug():
if isinstance(args, (bytes, str)):
program = args
@@ -42,6 +48,8 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
logger.debug('process %r created: pid %s',
program, self._pid)
+ self._loop.create_task(self._connect_pipes(waiter))
+
def __repr__(self):
info = [self.__class__.__name__]
if self._closed:
@@ -77,12 +85,23 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def close(self):
self._closed = True
+
for proto in self._pipes.values():
if proto is None:
continue
proto.pipe.close()
- if self._returncode is None:
- self.terminate()
+
+ if self._proc is not None and self._returncode is None:
+ if self._loop.get_debug():
+ logger.warning('Close running child process: kill %r', self)
+
+ try:
+ self._proc.kill()
+ except ProcessLookupError:
+ pass
+
+ # Don't clear the _proc reference yet because _post_init() may
+ # still run
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
@@ -105,59 +124,42 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
else:
return None
+ def _check_proc(self):
+ if self._closed:
+ raise ValueError("operation on closed transport")
+ if self._proc is None:
+ raise ProcessLookupError()
+
def send_signal(self, signal):
+ self._check_proc()
self._proc.send_signal(signal)
def terminate(self):
+ self._check_proc()
self._proc.terminate()
def kill(self):
+ self._check_proc()
self._proc.kill()
- def _kill_wait(self):
- """Close pipes, kill the subprocess and read its return status.
-
- Function called when an exception is raised during the creation
- of a subprocess.
- """
- self._closed = True
- if self._loop.get_debug():
- logger.warning('Exception during subprocess creation, '
- 'kill the subprocess %r',
- self,
- exc_info=True)
-
- proc = self._proc
- if proc.stdout:
- proc.stdout.close()
- if proc.stderr:
- proc.stderr.close()
- if proc.stdin:
- proc.stdin.close()
-
- try:
- proc.kill()
- except ProcessLookupError:
- pass
- self._returncode = proc.wait()
-
- self.close()
-
@coroutine
- def _post_init(self):
+ def _connect_pipes(self, waiter):
try:
proc = self._proc
loop = self._loop
+
if proc.stdin is not None:
_, pipe = yield from loop.connect_write_pipe(
lambda: WriteSubprocessPipeProto(self, 0),
proc.stdin)
self._pipes[0] = pipe
+
if proc.stdout is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 1),
proc.stdout)
self._pipes[1] = pipe
+
if proc.stderr is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 2),
@@ -166,13 +168,16 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
assert self._pending_calls is not None
- self._loop.call_soon(self._protocol.connection_made, self)
+ loop.call_soon(self._protocol.connection_made, self)
for callback, data in self._pending_calls:
- self._loop.call_soon(callback, *data)
+ loop.call_soon(callback, *data)
self._pending_calls = None
- except:
- self._kill_wait()
- raise
+ except Exception as exc:
+ if waiter is not None and not waiter.cancelled():
+ waiter.set_exception(exc)
+ else:
+ if waiter is not None and not waiter.cancelled():
+ waiter.set_result(None)
def _call(self, cb, *data):
if self._pending_calls is not None:
@@ -197,6 +202,23 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
self._call(self._protocol.process_exited)
self._try_finish()
+ # wake up futures waiting for wait()
+ for waiter in self._exit_waiters:
+ if not waiter.cancelled():
+ waiter.set_result(returncode)
+ self._exit_waiters = None
+
+ def wait(self):
+ """Wait until the process exit and return the process return code.
+
+ This method is a coroutine."""
+ if self._returncode is not None:
+ return self._returncode
+
+ waiter = futures.Future(loop=self._loop)
+ self._exit_waiters.append(waiter)
+ return (yield from waiter)
+
def _try_finish(self):
assert not self._finished
if self._returncode is None:
@@ -210,9 +232,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
try:
self._protocol.connection_lost(exc)
finally:
+ self._loop = None
self._proc = None
self._protocol = None
- self._loop = None
class WriteSubprocessPipeProto(protocols.BaseProtocol):
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
index c848a21..d0c9779 100644
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -25,8 +25,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
super().__init__(loop=loop)
self._limit = limit
self.stdin = self.stdout = self.stderr = None
- self.waiter = futures.Future(loop=loop)
- self._waiters = collections.deque()
self._transport = None
def __repr__(self):
@@ -61,9 +59,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
reader=None,
loop=self._loop)
- if not self.waiter.cancelled():
- self.waiter.set_result(None)
-
def pipe_data_received(self, fd, data):
if fd == 1:
reader = self.stdout
@@ -94,16 +89,9 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
reader.set_exception(exc)
def process_exited(self):
- returncode = self._transport.get_returncode()
self._transport.close()
self._transport = None
- # wake up futures waiting for wait()
- while self._waiters:
- waiter = self._waiters.popleft()
- if not waiter.cancelled():
- waiter.set_result(returncode)
-
class Process:
def __init__(self, transport, protocol, loop):
@@ -124,30 +112,18 @@ class Process:
@coroutine
def wait(self):
- """Wait until the process exit and return the process return code."""
- returncode = self._transport.get_returncode()
- if returncode is not None:
- return returncode
-
- waiter = futures.Future(loop=self._loop)
- self._protocol._waiters.append(waiter)
- yield from waiter
- return waiter.result()
+ """Wait until the process exit and return the process return code.
- def _check_alive(self):
- if self._transport.get_returncode() is not None:
- raise ProcessLookupError()
+ This method is a coroutine."""
+ return (yield from self._transport.wait())
def send_signal(self, signal):
- self._check_alive()
self._transport.send_signal(signal)
def terminate(self):
- self._check_alive()
self._transport.terminate()
def kill(self):
- self._check_alive()
self._transport.kill()
@coroutine
@@ -221,11 +197,6 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
protocol_factory,
cmd, stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
- try:
- yield from protocol.waiter
- except:
- transport._kill_wait()
- raise
return Process(transport, protocol, loop)
@coroutine
@@ -241,9 +212,4 @@ def create_subprocess_exec(program, *args, stdin=None, stdout=None,
program, *args,
stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
- try:
- yield from protocol.waiter
- except:
- transport._kill_wait()
- raise
return Process(transport, protocol, loop)
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index b06f1b2..3ecdfd2 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -16,6 +16,7 @@ from . import base_subprocess
from . import constants
from . import coroutines
from . import events
+from . import futures
from . import selector_events
from . import selectors
from . import transports
@@ -175,16 +176,20 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
with events.get_child_watcher() as watcher:
+ waiter = futures.Future(loop=self)
transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
- extra=extra, **kwargs)
+ waiter=waiter, extra=extra,
+ **kwargs)
+
+ watcher.add_child_handler(transp.get_pid(),
+ self._child_watcher_callback, transp)
try:
- yield from transp._post_init()
+ yield from waiter
except:
transp.close()
+ yield from transp.wait()
raise
- watcher.add_child_handler(transp.get_pid(),
- self._child_watcher_callback, transp)
return transp
@@ -774,7 +779,7 @@ class SafeChildWatcher(BaseChildWatcher):
pass
def add_child_handler(self, pid, callback, *args):
- self._callbacks[pid] = callback, args
+ self._callbacks[pid] = (callback, args)
# Prevent a race condition in case the child is already terminated.
self._do_waitpid(pid)
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index 94aafb6..437eb0a 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -366,13 +366,16 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
+ waiter = futures.Future(loop=self)
transp = _WindowsSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
- extra=extra, **kwargs)
+ waiter=waiter, extra=extra,
+ **kwargs)
try:
- yield from transp._post_init()
+ yield from waiter
except:
transp.close()
+ yield from transp.wait()
raise
return transp
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 12af62b..4b957d8 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -1551,9 +1551,10 @@ class SubprocessTestsMixin:
stdin = transp.get_pipe_transport(0)
stdin.write(b'Python The Winner')
self.loop.run_until_complete(proto.got_data[1].wait())
- transp.close()
+ with test_utils.disable_logger():
+ transp.close()
self.loop.run_until_complete(proto.completed)
- self.check_terminated(proto.returncode)
+ self.check_killed(proto.returncode)
self.assertEqual(b'Python The Winner', proto.data[1])
def test_subprocess_interactive(self):
@@ -1567,21 +1568,20 @@ class SubprocessTestsMixin:
self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
- try:
- stdin = transp.get_pipe_transport(0)
- stdin.write(b'Python ')
- self.loop.run_until_complete(proto.got_data[1].wait())
- proto.got_data[1].clear()
- self.assertEqual(b'Python ', proto.data[1])
-
- stdin.write(b'The Winner')
- self.loop.run_until_complete(proto.got_data[1].wait())
- self.assertEqual(b'Python The Winner', proto.data[1])
- finally:
- transp.close()
+ stdin = transp.get_pipe_transport(0)
+ stdin.write(b'Python ')
+ self.loop.run_until_complete(proto.got_data[1].wait())
+ proto.got_data[1].clear()
+ self.assertEqual(b'Python ', proto.data[1])
+ stdin.write(b'The Winner')
+ self.loop.run_until_complete(proto.got_data[1].wait())
+ self.assertEqual(b'Python The Winner', proto.data[1])
+
+ with test_utils.disable_logger():
+ transp.close()
self.loop.run_until_complete(proto.completed)
- self.check_terminated(proto.returncode)
+ self.check_killed(proto.returncode)
def test_subprocess_shell(self):
connect = self.loop.subprocess_shell(
@@ -1739,9 +1739,10 @@ class SubprocessTestsMixin:
# GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using
# WriteFile() we get ERROR_BROKEN_PIPE as expected.)
self.assertEqual(b'ERR:OSError', proto.data[2])
- transp.close()
+ with test_utils.disable_logger():
+ transp.close()
self.loop.run_until_complete(proto.completed)
- self.check_terminated(proto.returncode)
+ self.check_killed(proto.returncode)
def test_subprocess_wait_no_same_group(self):
# start the new process in a new session
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
index ecc2c9d..4f197f3 100644
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -4,6 +4,7 @@ import unittest
from unittest import mock
import asyncio
+from asyncio import base_subprocess
from asyncio import subprocess
from asyncio import test_utils
try:
@@ -23,6 +24,70 @@ PROGRAM_CAT = [
'data = sys.stdin.buffer.read()',
'sys.stdout.buffer.write(data)'))]
+class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
+ def _start(self, *args, **kwargs):
+ self._proc = mock.Mock()
+ self._proc.stdin = None
+ self._proc.stdout = None
+ self._proc.stderr = None
+
+
+class SubprocessTransportTests(test_utils.TestCase):
+ def setUp(self):
+ self.loop = self.new_test_loop()
+ self.set_event_loop(self.loop)
+
+
+ def create_transport(self, waiter=None):
+ protocol = mock.Mock()
+ protocol.connection_made._is_coroutine = False
+ protocol.process_exited._is_coroutine = False
+ transport = TestSubprocessTransport(
+ self.loop, protocol, ['test'], False,
+ None, None, None, 0, waiter=waiter)
+ return (transport, protocol)
+
+ def test_close(self):
+ waiter = asyncio.Future(loop=self.loop)
+ transport, protocol = self.create_transport(waiter)
+ transport._process_exited(0)
+ transport.close()
+
+ # The loop didn't run yet
+ self.assertFalse(protocol.connection_made.called)
+
+ # methods must raise ProcessLookupError if the transport was closed
+ self.assertRaises(ValueError, transport.send_signal, signal.SIGTERM)
+ self.assertRaises(ValueError, transport.terminate)
+ self.assertRaises(ValueError, transport.kill)
+
+ self.loop.run_until_complete(waiter)
+
+ def test_proc_exited(self):
+ waiter = asyncio.Future(loop=self.loop)
+ transport, protocol = self.create_transport(waiter)
+ transport._process_exited(6)
+ self.loop.run_until_complete(waiter)
+
+ self.assertEqual(transport.get_returncode(), 6)
+
+ self.assertTrue(protocol.connection_made.called)
+ self.assertTrue(protocol.process_exited.called)
+ self.assertTrue(protocol.connection_lost.called)
+ self.assertEqual(protocol.connection_lost.call_args[0], (None,))
+
+ self.assertFalse(transport._closed)
+ self.assertIsNone(transport._loop)
+ self.assertIsNone(transport._proc)
+ self.assertIsNone(transport._protocol)
+
+ # methods must raise ProcessLookupError if the process exited
+ self.assertRaises(ProcessLookupError,
+ transport.send_signal, signal.SIGTERM)
+ self.assertRaises(ProcessLookupError, transport.terminate)
+ self.assertRaises(ProcessLookupError, transport.kill)
+
+
class SubprocessMixin:
def test_stdin_stdout(self):