diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2015-01-29 23:05:36 (GMT) |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2015-01-29 23:05:36 (GMT) |
commit | a9373ec96657982c1c1673f6817f81d4a7aec833 (patch) | |
tree | 77d8f894a9e54640b3f1178b540dec7358e416ff | |
parent | 2647375cc98ac47dffdf495fc4b0fe5973b0375d (diff) | |
parent | 47cd10d7a903773f574fc93220dbca850067fa0c (diff) | |
download | cpython-a9373ec96657982c1c1673f6817f81d4a7aec833.zip cpython-a9373ec96657982c1c1673f6817f81d4a7aec833.tar.gz cpython-a9373ec96657982c1c1673f6817f81d4a7aec833.tar.bz2 |
Merge 3.4 (asyncio)
-rw-r--r-- | Lib/asyncio/base_subprocess.py | 108 | ||||
-rw-r--r-- | Lib/asyncio/subprocess.py | 40 | ||||
-rw-r--r-- | Lib/asyncio/unix_events.py | 15 | ||||
-rw-r--r-- | Lib/asyncio/windows_events.py | 7 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_events.py | 35 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_subprocess.py | 65 |
6 files changed, 166 insertions, 104 deletions
diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 651a9a2..001f9b8 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -3,6 +3,7 @@ import subprocess import sys import warnings +from . import futures from . import protocols from . import transports from .coroutines import coroutine @@ -13,27 +14,32 @@ class BaseSubprocessTransport(transports.SubprocessTransport): def __init__(self, loop, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=None, **kwargs): + waiter=None, extra=None, **kwargs): super().__init__(extra) self._closed = False self._protocol = protocol self._loop = loop + self._proc = None self._pid = None - + self._returncode = None + self._exit_waiters = [] + self._pending_calls = collections.deque() self._pipes = {} + self._finished = False + if stdin == subprocess.PIPE: self._pipes[0] = None if stdout == subprocess.PIPE: self._pipes[1] = None if stderr == subprocess.PIPE: self._pipes[2] = None - self._pending_calls = collections.deque() - self._finished = False - self._returncode = None + + # Create the child process: set the _proc attribute self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, bufsize=bufsize, **kwargs) self._pid = self._proc.pid self._extra['subprocess'] = self._proc + if self._loop.get_debug(): if isinstance(args, (bytes, str)): program = args @@ -42,6 +48,8 @@ class BaseSubprocessTransport(transports.SubprocessTransport): logger.debug('process %r created: pid %s', program, self._pid) + self._loop.create_task(self._connect_pipes(waiter)) + def __repr__(self): info = [self.__class__.__name__] if self._closed: @@ -77,12 +85,23 @@ class BaseSubprocessTransport(transports.SubprocessTransport): def close(self): self._closed = True + for proto in self._pipes.values(): if proto is None: continue proto.pipe.close() - if self._returncode is None: - self.terminate() + + if self._proc is not None and self._returncode is None: + if self._loop.get_debug(): + logger.warning('Close running child process: kill %r', self) + + try: + self._proc.kill() + except ProcessLookupError: + pass + + # Don't clear the _proc reference yet because _post_init() may + # still run # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks @@ -105,59 +124,42 @@ class BaseSubprocessTransport(transports.SubprocessTransport): else: return None + def _check_proc(self): + if self._closed: + raise ValueError("operation on closed transport") + if self._proc is None: + raise ProcessLookupError() + def send_signal(self, signal): + self._check_proc() self._proc.send_signal(signal) def terminate(self): + self._check_proc() self._proc.terminate() def kill(self): + self._check_proc() self._proc.kill() - def _kill_wait(self): - """Close pipes, kill the subprocess and read its return status. - - Function called when an exception is raised during the creation - of a subprocess. - """ - self._closed = True - if self._loop.get_debug(): - logger.warning('Exception during subprocess creation, ' - 'kill the subprocess %r', - self, - exc_info=True) - - proc = self._proc - if proc.stdout: - proc.stdout.close() - if proc.stderr: - proc.stderr.close() - if proc.stdin: - proc.stdin.close() - - try: - proc.kill() - except ProcessLookupError: - pass - self._returncode = proc.wait() - - self.close() - @coroutine - def _post_init(self): + def _connect_pipes(self, waiter): try: proc = self._proc loop = self._loop + if proc.stdin is not None: _, pipe = yield from loop.connect_write_pipe( lambda: WriteSubprocessPipeProto(self, 0), proc.stdin) self._pipes[0] = pipe + if proc.stdout is not None: _, pipe = yield from loop.connect_read_pipe( lambda: ReadSubprocessPipeProto(self, 1), proc.stdout) self._pipes[1] = pipe + if proc.stderr is not None: _, pipe = yield from loop.connect_read_pipe( lambda: ReadSubprocessPipeProto(self, 2), @@ -166,13 +168,16 @@ class BaseSubprocessTransport(transports.SubprocessTransport): assert self._pending_calls is not None - self._loop.call_soon(self._protocol.connection_made, self) + loop.call_soon(self._protocol.connection_made, self) for callback, data in self._pending_calls: - self._loop.call_soon(callback, *data) + loop.call_soon(callback, *data) self._pending_calls = None - except: - self._kill_wait() - raise + except Exception as exc: + if waiter is not None and not waiter.cancelled(): + waiter.set_exception(exc) + else: + if waiter is not None and not waiter.cancelled(): + waiter.set_result(None) def _call(self, cb, *data): if self._pending_calls is not None: @@ -197,6 +202,23 @@ class BaseSubprocessTransport(transports.SubprocessTransport): self._call(self._protocol.process_exited) self._try_finish() + # wake up futures waiting for wait() + for waiter in self._exit_waiters: + if not waiter.cancelled(): + waiter.set_result(returncode) + self._exit_waiters = None + + def wait(self): + """Wait until the process exit and return the process return code. + + This method is a coroutine.""" + if self._returncode is not None: + return self._returncode + + waiter = futures.Future(loop=self._loop) + self._exit_waiters.append(waiter) + return (yield from waiter) + def _try_finish(self): assert not self._finished if self._returncode is None: @@ -210,9 +232,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport): try: self._protocol.connection_lost(exc) finally: + self._loop = None self._proc = None self._protocol = None - self._loop = None class WriteSubprocessPipeProto(protocols.BaseProtocol): diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index c848a21..d0c9779 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -25,8 +25,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, super().__init__(loop=loop) self._limit = limit self.stdin = self.stdout = self.stderr = None - self.waiter = futures.Future(loop=loop) - self._waiters = collections.deque() self._transport = None def __repr__(self): @@ -61,9 +59,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, reader=None, loop=self._loop) - if not self.waiter.cancelled(): - self.waiter.set_result(None) - def pipe_data_received(self, fd, data): if fd == 1: reader = self.stdout @@ -94,16 +89,9 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, reader.set_exception(exc) def process_exited(self): - returncode = self._transport.get_returncode() self._transport.close() self._transport = None - # wake up futures waiting for wait() - while self._waiters: - waiter = self._waiters.popleft() - if not waiter.cancelled(): - waiter.set_result(returncode) - class Process: def __init__(self, transport, protocol, loop): @@ -124,30 +112,18 @@ class Process: @coroutine def wait(self): - """Wait until the process exit and return the process return code.""" - returncode = self._transport.get_returncode() - if returncode is not None: - return returncode - - waiter = futures.Future(loop=self._loop) - self._protocol._waiters.append(waiter) - yield from waiter - return waiter.result() + """Wait until the process exit and return the process return code. - def _check_alive(self): - if self._transport.get_returncode() is not None: - raise ProcessLookupError() + This method is a coroutine.""" + return (yield from self._transport.wait()) def send_signal(self, signal): - self._check_alive() self._transport.send_signal(signal) def terminate(self): - self._check_alive() self._transport.terminate() def kill(self): - self._check_alive() self._transport.kill() @coroutine @@ -221,11 +197,6 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, protocol_factory, cmd, stdin=stdin, stdout=stdout, stderr=stderr, **kwds) - try: - yield from protocol.waiter - except: - transport._kill_wait() - raise return Process(transport, protocol, loop) @coroutine @@ -241,9 +212,4 @@ def create_subprocess_exec(program, *args, stdin=None, stdout=None, program, *args, stdin=stdin, stdout=stdout, stderr=stderr, **kwds) - try: - yield from protocol.waiter - except: - transport._kill_wait() - raise return Process(transport, protocol, loop) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index b06f1b2..3ecdfd2 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -16,6 +16,7 @@ from . import base_subprocess from . import constants from . import coroutines from . import events +from . import futures from . import selector_events from . import selectors from . import transports @@ -175,16 +176,20 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): stdin, stdout, stderr, bufsize, extra=None, **kwargs): with events.get_child_watcher() as watcher: + waiter = futures.Future(loop=self) transp = _UnixSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=extra, **kwargs) + waiter=waiter, extra=extra, + **kwargs) + + watcher.add_child_handler(transp.get_pid(), + self._child_watcher_callback, transp) try: - yield from transp._post_init() + yield from waiter except: transp.close() + yield from transp.wait() raise - watcher.add_child_handler(transp.get_pid(), - self._child_watcher_callback, transp) return transp @@ -774,7 +779,7 @@ class SafeChildWatcher(BaseChildWatcher): pass def add_child_handler(self, pid, callback, *args): - self._callbacks[pid] = callback, args + self._callbacks[pid] = (callback, args) # Prevent a race condition in case the child is already terminated. self._do_waitpid(pid) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 94aafb6..437eb0a 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -366,13 +366,16 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): + waiter = futures.Future(loop=self) transp = _WindowsSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=extra, **kwargs) + waiter=waiter, extra=extra, + **kwargs) try: - yield from transp._post_init() + yield from waiter except: transp.close() + yield from transp.wait() raise return transp diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 12af62b..4b957d8 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1551,9 +1551,10 @@ class SubprocessTestsMixin: stdin = transp.get_pipe_transport(0) stdin.write(b'Python The Winner') self.loop.run_until_complete(proto.got_data[1].wait()) - transp.close() + with test_utils.disable_logger(): + transp.close() self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) self.assertEqual(b'Python The Winner', proto.data[1]) def test_subprocess_interactive(self): @@ -1567,21 +1568,20 @@ class SubprocessTestsMixin: self.loop.run_until_complete(proto.connected) self.assertEqual('CONNECTED', proto.state) - try: - stdin = transp.get_pipe_transport(0) - stdin.write(b'Python ') - self.loop.run_until_complete(proto.got_data[1].wait()) - proto.got_data[1].clear() - self.assertEqual(b'Python ', proto.data[1]) - - stdin.write(b'The Winner') - self.loop.run_until_complete(proto.got_data[1].wait()) - self.assertEqual(b'Python The Winner', proto.data[1]) - finally: - transp.close() + stdin = transp.get_pipe_transport(0) + stdin.write(b'Python ') + self.loop.run_until_complete(proto.got_data[1].wait()) + proto.got_data[1].clear() + self.assertEqual(b'Python ', proto.data[1]) + stdin.write(b'The Winner') + self.loop.run_until_complete(proto.got_data[1].wait()) + self.assertEqual(b'Python The Winner', proto.data[1]) + + with test_utils.disable_logger(): + transp.close() self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) def test_subprocess_shell(self): connect = self.loop.subprocess_shell( @@ -1739,9 +1739,10 @@ class SubprocessTestsMixin: # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using # WriteFile() we get ERROR_BROKEN_PIPE as expected.) self.assertEqual(b'ERR:OSError', proto.data[2]) - transp.close() + with test_utils.disable_logger(): + transp.close() self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) def test_subprocess_wait_no_same_group(self): # start the new process in a new session diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index ecc2c9d..4f197f3 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -4,6 +4,7 @@ import unittest from unittest import mock import asyncio +from asyncio import base_subprocess from asyncio import subprocess from asyncio import test_utils try: @@ -23,6 +24,70 @@ PROGRAM_CAT = [ 'data = sys.stdin.buffer.read()', 'sys.stdout.buffer.write(data)'))] +class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport): + def _start(self, *args, **kwargs): + self._proc = mock.Mock() + self._proc.stdin = None + self._proc.stdout = None + self._proc.stderr = None + + +class SubprocessTransportTests(test_utils.TestCase): + def setUp(self): + self.loop = self.new_test_loop() + self.set_event_loop(self.loop) + + + def create_transport(self, waiter=None): + protocol = mock.Mock() + protocol.connection_made._is_coroutine = False + protocol.process_exited._is_coroutine = False + transport = TestSubprocessTransport( + self.loop, protocol, ['test'], False, + None, None, None, 0, waiter=waiter) + return (transport, protocol) + + def test_close(self): + waiter = asyncio.Future(loop=self.loop) + transport, protocol = self.create_transport(waiter) + transport._process_exited(0) + transport.close() + + # The loop didn't run yet + self.assertFalse(protocol.connection_made.called) + + # methods must raise ProcessLookupError if the transport was closed + self.assertRaises(ValueError, transport.send_signal, signal.SIGTERM) + self.assertRaises(ValueError, transport.terminate) + self.assertRaises(ValueError, transport.kill) + + self.loop.run_until_complete(waiter) + + def test_proc_exited(self): + waiter = asyncio.Future(loop=self.loop) + transport, protocol = self.create_transport(waiter) + transport._process_exited(6) + self.loop.run_until_complete(waiter) + + self.assertEqual(transport.get_returncode(), 6) + + self.assertTrue(protocol.connection_made.called) + self.assertTrue(protocol.process_exited.called) + self.assertTrue(protocol.connection_lost.called) + self.assertEqual(protocol.connection_lost.call_args[0], (None,)) + + self.assertFalse(transport._closed) + self.assertIsNone(transport._loop) + self.assertIsNone(transport._proc) + self.assertIsNone(transport._protocol) + + # methods must raise ProcessLookupError if the process exited + self.assertRaises(ProcessLookupError, + transport.send_signal, signal.SIGTERM) + self.assertRaises(ProcessLookupError, transport.terminate) + self.assertRaises(ProcessLookupError, transport.kill) + + class SubprocessMixin: def test_stdin_stdout(self): |