summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-07-14 16:33:40 (GMT)
committerVictor Stinner <victor.stinner@gmail.com>2014-07-14 16:33:40 (GMT)
commitacdb782a83d72a823030335e8f190890ae4df9cf (patch)
tree4a4a894ea9296027473031806b1b3dcbade6c1ac /Lib
parentb1ebfdddb356d5ad63bacb10589a402c6407a86c (diff)
downloadcpython-acdb782a83d72a823030335e8f190890ae4df9cf.zip
cpython-acdb782a83d72a823030335e8f190890ae4df9cf.tar.gz
cpython-acdb782a83d72a823030335e8f190890ae4df9cf.tar.bz2
asyncio: sync with Tulip
* Tulip issue #184: Log subprocess events in debug mode - Log stdin, stdout and stderr transports and protocols - Log process identifier (pid) - Log connection of pipes - Log process exit - Log Process.communicate() tasks: feed stdin, read stdout and stderr - Add __repr__() method to many classes related to subprocesses * Add BaseSubprocessTransport._pid attribute. Store the pid so it is still accessible after the process exited. It's more convinient for debug. * create_connection(): add the socket in the "connected to" debug log * Clean up some docstrings and comments. Remove unused unimplemented _read_from_self().
Diffstat (limited to 'Lib')
-rw-r--r--Lib/asyncio/base_events.py131
-rw-r--r--Lib/asyncio/base_subprocess.py40
-rw-r--r--Lib/asyncio/streams.py12
-rw-r--r--Lib/asyncio/subprocess.py26
-rw-r--r--Lib/asyncio/unix_events.py13
-rw-r--r--Lib/test/test_asyncio/test_base_events.py2
6 files changed, 181 insertions, 43 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index e5683fd..0aeaae4 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -1,7 +1,7 @@
"""Base implementation of event loop.
The event loop can be broken up into a multiplexer (the part
-responsible for notifying us of IO events) and the event loop proper,
+responsible for notifying us of I/O events) and the event loop proper,
which wraps a multiplexer with functionality for scheduling callbacks,
immediately or at a given time in the future.
@@ -50,6 +50,15 @@ def _format_handle(handle):
return str(handle)
+def _format_pipe(fd):
+ if fd == subprocess.PIPE:
+ return '<pipe>'
+ elif fd == subprocess.STDOUT:
+ return '<stdout>'
+ else:
+ return repr(fd)
+
+
class _StopError(BaseException):
"""Raised to stop the event loop."""
@@ -70,7 +79,7 @@ def _check_resolved_address(sock, address):
type_mask |= socket.SOCK_NONBLOCK
if hasattr(socket, 'SOCK_CLOEXEC'):
type_mask |= socket.SOCK_CLOEXEC
- # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
+ # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
# already resolved.
try:
socket.getaddrinfo(host, port,
@@ -158,7 +167,8 @@ class BaseEventLoop(events.AbstractEventLoop):
def create_task(self, coro):
"""Schedule a coroutine object.
- Return a task object."""
+ Return a task object.
+ """
task = tasks.Task(coro, loop=self)
if task._source_traceback:
del task._source_traceback[-1]
@@ -197,12 +207,13 @@ class BaseEventLoop(events.AbstractEventLoop):
"""Create subprocess transport."""
raise NotImplementedError
- def _read_from_self(self):
- """XXX"""
- raise NotImplementedError
-
def _write_to_self(self):
- """XXX"""
+ """Write a byte to self-pipe, to wake up the event loop.
+
+ This may be called from a different thread.
+
+ The subclass is responsible for implementing the self-pipe.
+ """
raise NotImplementedError
def _process_events(self, event_list):
@@ -233,7 +244,7 @@ class BaseEventLoop(events.AbstractEventLoop):
If the argument is a coroutine, it is wrapped in a Task.
- XXX TBD: It would be disastrous to call run_until_complete()
+ WARNING: It would be disastrous to call run_until_complete()
with the same coroutine twice -- it would wrap it in two
different Tasks and that can't be good.
@@ -261,7 +272,7 @@ class BaseEventLoop(events.AbstractEventLoop):
Every callback scheduled before stop() is called will run.
Callback scheduled after stop() is called won't. However,
- those callbacks will run if run() is called again later.
+ those callbacks will run if run_*() is called again later.
"""
self.call_soon(_raise_stop_error)
@@ -274,7 +285,7 @@ class BaseEventLoop(events.AbstractEventLoop):
The event loop must not be running.
"""
if self._running:
- raise RuntimeError("cannot close a running event loop")
+ raise RuntimeError("Cannot close a running event loop")
if self._closed:
return
if self._debug:
@@ -292,11 +303,16 @@ class BaseEventLoop(events.AbstractEventLoop):
return self._closed
def is_running(self):
- """Returns running status of event loop."""
+ """Returns True if the event loop is running."""
return self._running
def time(self):
- """Return the time according to the event loop's clock."""
+ """Return the time according to the event loop's clock.
+
+ This is a float expressed in seconds since an epoch, but the
+ epoch, precision, accuracy and drift are unspecified and may
+ differ per event loop.
+ """
return time.monotonic()
def call_later(self, delay, callback, *args):
@@ -306,7 +322,7 @@ class BaseEventLoop(events.AbstractEventLoop):
can be used to cancel the call.
The delay can be an int or float, expressed in seconds. It is
- always a relative time.
+ always relative to the current time.
Each callback will be called exactly once. If two callbacks
are scheduled for exactly the same time, it undefined which
@@ -321,7 +337,10 @@ class BaseEventLoop(events.AbstractEventLoop):
return timer
def call_at(self, when, callback, *args):
- """Like call_later(), but uses an absolute time."""
+ """Like call_later(), but uses an absolute time.
+
+ Absolute time corresponds to the event loop's time() method.
+ """
if coroutines.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_at()")
if self._debug:
@@ -335,7 +354,7 @@ class BaseEventLoop(events.AbstractEventLoop):
def call_soon(self, callback, *args):
"""Arrange for a callback to be called as soon as possible.
- This operates as a FIFO queue, callbacks are called in the
+ This operates as a FIFO queue: callbacks are called in the
order in which they are registered. Each callback will be
called exactly once.
@@ -361,10 +380,10 @@ class BaseEventLoop(events.AbstractEventLoop):
def _assert_is_current_event_loop(self):
"""Asserts that this event loop is the current event loop.
- Non-threadsafe methods of this class make this assumption and will
+ Non-thread-safe methods of this class make this assumption and will
likely behave incorrectly when the assumption is violated.
- Should only be called when (self._debug == True). The caller is
+ Should only be called when (self._debug == True). The caller is
responsible for checking this condition for performance reasons.
"""
try:
@@ -373,11 +392,11 @@ class BaseEventLoop(events.AbstractEventLoop):
return
if current is not self:
raise RuntimeError(
- "non-threadsafe operation invoked on an event loop other "
+ "Non-thread-safe operation invoked on an event loop other "
"than the current one")
def call_soon_threadsafe(self, callback, *args):
- """Like call_soon(), but thread safe."""
+ """Like call_soon(), but thread-safe."""
handle = self._call_soon(callback, args, check_loop=False)
if handle._source_traceback:
del handle._source_traceback[-1]
@@ -386,7 +405,7 @@ class BaseEventLoop(events.AbstractEventLoop):
def run_in_executor(self, executor, callback, *args):
if coroutines.iscoroutinefunction(callback):
- raise TypeError("coroutines cannot be used with run_in_executor()")
+ raise TypeError("Coroutines cannot be used with run_in_executor()")
if isinstance(callback, events.Handle):
assert not args
assert not isinstance(callback, events.TimerHandle)
@@ -416,13 +435,13 @@ class BaseEventLoop(events.AbstractEventLoop):
if flags:
msg.append('flags=%r' % flags)
msg = ', '.join(msg)
- logger.debug('Get addresss info %s', msg)
+ logger.debug('Get address info %s', msg)
t0 = self.time()
addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
dt = self.time() - t0
- msg = ('Getting addresss info %s took %.3f ms: %r'
+ msg = ('Getting address info %s took %.3f ms: %r'
% (msg, dt * 1e3, addrinfo))
if dt >= self.slow_callback_duration:
logger.info(msg)
@@ -559,8 +578,8 @@ class BaseEventLoop(events.AbstractEventLoop):
transport, protocol = yield from self._create_connection_transport(
sock, protocol_factory, ssl, server_hostname)
if self._debug:
- logger.debug("connected to %s:%r: (%r, %r)",
- host, port, transport, protocol)
+ logger.debug("%r connected to %s:%r: (%r, %r)",
+ sock, host, port, transport, protocol)
return transport, protocol
@coroutine
@@ -589,7 +608,7 @@ class BaseEventLoop(events.AbstractEventLoop):
raise ValueError('unexpected address family')
addr_pairs_info = (((family, proto), (None, None)),)
else:
- # join addresss by (family, protocol)
+ # join address by (family, protocol)
addr_infos = collections.OrderedDict()
for idx, addr in ((0, local_addr), (1, remote_addr)):
if addr is not None:
@@ -674,7 +693,7 @@ class BaseEventLoop(events.AbstractEventLoop):
reuse_address=None):
"""Create a TCP server bound to host and port.
- Return an Server object which can be used to stop the service.
+ Return a Server object which can be used to stop the service.
This method is a coroutine.
"""
@@ -731,8 +750,7 @@ class BaseEventLoop(events.AbstractEventLoop):
sock.close()
else:
if sock is None:
- raise ValueError(
- 'host and port was not specified and no sock specified')
+ raise ValueError('Neither host/port nor sock were specified')
sockets = [sock]
server = Server(self, sockets)
@@ -750,6 +768,9 @@ class BaseEventLoop(events.AbstractEventLoop):
waiter = futures.Future(loop=self)
transport = self._make_read_pipe_transport(pipe, protocol, waiter)
yield from waiter
+ if self._debug:
+ logger.debug('Read pipe %r connected: (%r, %r)',
+ pipe.fileno(), transport, protocol)
return transport, protocol
@coroutine
@@ -758,8 +779,24 @@ class BaseEventLoop(events.AbstractEventLoop):
waiter = futures.Future(loop=self)
transport = self._make_write_pipe_transport(pipe, protocol, waiter)
yield from waiter
+ if self._debug:
+ logger.debug('Write pipe %r connected: (%r, %r)',
+ pipe.fileno(), transport, protocol)
return transport, protocol
+ def _log_subprocess(self, msg, stdin, stdout, stderr):
+ info = [msg]
+ if stdin is not None:
+ info.append('stdin=%s' % _format_pipe(stdin))
+ if stdout is not None and stderr == subprocess.STDOUT:
+ info.append('stdout=stderr=%s' % _format_pipe(stdout))
+ else:
+ if stdout is not None:
+ info.append('stdout=%s' % _format_pipe(stdout))
+ if stderr is not None:
+ info.append('stderr=%s' % _format_pipe(stderr))
+ logger.debug(' '.join(info))
+
@coroutine
def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
@@ -774,8 +811,15 @@ class BaseEventLoop(events.AbstractEventLoop):
if bufsize != 0:
raise ValueError("bufsize must be 0")
protocol = protocol_factory()
+ if self._debug:
+ # don't log parameters: they may contain sensitive information
+ # (password) and may be too long
+ debug_log = 'run shell command %r' % cmd
+ self._log_subprocess(debug_log, stdin, stdout, stderr)
transport = yield from self._make_subprocess_transport(
protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
+ if self._debug:
+ logger.info('%s: %r' % (debug_log, transport))
return transport, protocol
@coroutine
@@ -796,9 +840,16 @@ class BaseEventLoop(events.AbstractEventLoop):
"a bytes or text string, not %s"
% type(arg).__name__)
protocol = protocol_factory()
+ if self._debug:
+ # don't log parameters: they may contain sensitive information
+ # (password) and may be too long
+ debug_log = 'execute program %r' % program
+ self._log_subprocess(debug_log, stdin, stdout, stderr)
transport = yield from self._make_subprocess_transport(
protocol, popen_args, False, stdin, stdout, stderr,
bufsize, **kwargs)
+ if self._debug:
+ logger.info('%s: %r' % (debug_log, transport))
return transport, protocol
def set_exception_handler(self, handler):
@@ -808,7 +859,7 @@ class BaseEventLoop(events.AbstractEventLoop):
be set.
If handler is a callable object, it should have a
- matching signature to '(loop, context)', where 'loop'
+ signature matching '(loop, context)', where 'loop'
will be a reference to the active event loop, 'context'
will be a dict object (see `call_exception_handler()`
documentation for details about context).
@@ -825,7 +876,7 @@ class BaseEventLoop(events.AbstractEventLoop):
handler is set, and can be called by a custom exception
handler that wants to defer to the default behavior.
- context parameter has the same meaning as in
+ The context parameter has the same meaning as in
`call_exception_handler()`.
"""
message = context.get('message')
@@ -854,10 +905,10 @@ class BaseEventLoop(events.AbstractEventLoop):
logger.error('\n'.join(log_lines), exc_info=exc_info)
def call_exception_handler(self, context):
- """Call the current event loop exception handler.
+ """Call the current event loop's exception handler.
+
+ The context argument is a dict containing the following keys:
- context is a dict object containing the following keys
- (new keys maybe introduced later):
- 'message': Error message;
- 'exception' (optional): Exception object;
- 'future' (optional): Future instance;
@@ -866,8 +917,10 @@ class BaseEventLoop(events.AbstractEventLoop):
- 'transport' (optional): Transport instance;
- 'socket' (optional): Socket instance.
- Note: this method should not be overloaded in subclassed
- event loops. For any custom exception handling, use
+ New keys maybe introduced in the future.
+
+ Note: do not overload this method in an event loop subclass.
+ For custom exception handling, use the
`set_exception_handler()` method.
"""
if self._exception_handler is None:
@@ -892,7 +945,7 @@ class BaseEventLoop(events.AbstractEventLoop):
'context': context,
})
except Exception:
- # Guard 'default_exception_handler' in case it's
+ # Guard 'default_exception_handler' in case it is
# overloaded.
logger.error('Exception in default exception handler '
'while handling an unexpected error '
@@ -900,7 +953,7 @@ class BaseEventLoop(events.AbstractEventLoop):
exc_info=True)
def _add_callback(self, handle):
- """Add a Handle to ready or scheduled."""
+ """Add a Handle to _scheduled (TimerHandle) or _ready."""
assert isinstance(handle, events.Handle), 'A Handle is required here'
if handle._cancelled:
return
@@ -971,7 +1024,7 @@ class BaseEventLoop(events.AbstractEventLoop):
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
- # Use an idiom that is threadsafe without using locks.
+ # Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
index 2f933c5..d008779 100644
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -4,6 +4,7 @@ import subprocess
from . import protocols
from . import transports
from .coroutines import coroutine
+from .log import logger
class BaseSubprocessTransport(transports.SubprocessTransport):
@@ -14,6 +15,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
super().__init__(extra)
self._protocol = protocol
self._loop = loop
+ self._pid = None
self._pipes = {}
if stdin == subprocess.PIPE:
@@ -27,7 +29,36 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
self._returncode = None
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr, bufsize=bufsize, **kwargs)
+ self._pid = self._proc.pid
self._extra['subprocess'] = self._proc
+ if self._loop.get_debug():
+ if isinstance(args, (bytes, str)):
+ program = args
+ else:
+ program = args[0]
+ logger.debug('process %r created: pid %s',
+ program, self._pid)
+
+ def __repr__(self):
+ info = [self.__class__.__name__, 'pid=%s' % self._pid]
+ if self._returncode is not None:
+ info.append('returncode=%s' % self._returncode)
+
+ stdin = self._pipes.get(0)
+ if stdin is not None:
+ info.append('stdin=%s' % stdin.pipe)
+
+ stdout = self._pipes.get(1)
+ stderr = self._pipes.get(2)
+ if stdout is not None and stderr is stdout:
+ info.append('stdout=stderr=%s' % stdout.pipe)
+ else:
+ if stdout is not None:
+ info.append('stdout=%s' % stdout.pipe)
+ if stderr is not None:
+ info.append('stderr=%s' % stderr.pipe)
+
+ return '<%s>' % ' '.join(info)
def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
raise NotImplementedError
@@ -45,7 +76,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
self.terminate()
def get_pid(self):
- return self._proc.pid
+ return self._pid
def get_returncode(self):
return self._returncode
@@ -108,6 +139,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def _process_exited(self, returncode):
assert returncode is not None, returncode
assert self._returncode is None, self._returncode
+ if self._loop.get_debug():
+ logger.info('%r exited with return code %r',
+ self, returncode)
self._returncode = returncode
self._call(self._protocol.process_exited)
self._try_finish()
@@ -141,6 +175,10 @@ class WriteSubprocessPipeProto(protocols.BaseProtocol):
def connection_made(self, transport):
self.pipe = transport
+ def __repr__(self):
+ return ('<%s fd=%s pipe=%r>'
+ % (self.__class__.__name__, self.fd, self.pipe))
+
def connection_lost(self, exc):
self.disconnected = True
self.proc._pipe_connection_lost(self.fd, exc)
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 9b654cd..d18db77 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -15,6 +15,7 @@ from . import events
from . import futures
from . import protocols
from .coroutines import coroutine
+from .log import logger
_DEFAULT_LIMIT = 2**16
@@ -153,10 +154,15 @@ class FlowControlMixin(protocols.Protocol):
def pause_writing(self):
assert not self._paused
self._paused = True
+ if self._loop.get_debug():
+ logger.debug("%r pauses writing", self)
def resume_writing(self):
assert self._paused
self._paused = False
+ if self._loop.get_debug():
+ logger.debug("%r resumes writing", self)
+
waiter = self._drain_waiter
if waiter is not None:
self._drain_waiter = None
@@ -244,6 +250,12 @@ class StreamWriter:
self._reader = reader
self._loop = loop
+ def __repr__(self):
+ info = [self.__class__.__name__, 'transport=%r' % self._transport]
+ if self._reader is not None:
+ info.append('reader=%r' % self._reader)
+ return '<%s>' % ' '.join(info)
+
@property
def transport(self):
return self._transport
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
index 2cd6de6..12902f1 100644
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -9,6 +9,7 @@ from . import protocols
from . import streams
from . import tasks
from .coroutines import coroutine
+from .log import logger
PIPE = subprocess.PIPE
@@ -28,6 +29,16 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
self._waiters = collections.deque()
self._transport = None
+ def __repr__(self):
+ info = [self.__class__.__name__]
+ if self.stdin is not None:
+ info.append('stdin=%r' % self.stdin)
+ if self.stdout is not None:
+ info.append('stdout=%r' % self.stdout)
+ if self.stderr is not None:
+ info.append('stderr=%r' % self.stderr)
+ return '<%s>' % ' '.join(info)
+
def connection_made(self, transport):
self._transport = transport
if transport.get_pipe_transport(1):
@@ -91,6 +102,9 @@ class Process:
self.stderr = protocol.stderr
self.pid = transport.get_pid()
+ def __repr__(self):
+ return '<%s %s>' % (self.__class__.__name__, self.pid)
+
@property
def returncode(self):
return self._transport.get_returncode()
@@ -126,7 +140,13 @@ class Process:
@coroutine
def _feed_stdin(self, input):
self.stdin.write(input)
+ if self._loop.get_debug():
+ logger.debug('%r communicate: feed stdin (%s bytes)',
+ self, len(input))
yield from self.stdin.drain()
+
+ if self._loop.get_debug():
+ logger.debug('%r communicate: close stdin', self)
self.stdin.close()
@coroutine
@@ -141,7 +161,13 @@ class Process:
else:
assert fd == 1
stream = self.stdout
+ if self._loop.get_debug():
+ name = 'stdout' if fd == 1 else 'stderr'
+ logger.debug('%r communicate: read %s', self, name)
output = yield from stream.read()
+ if self._loop.get_debug():
+ name = 'stdout' if fd == 1 else 'stderr'
+ logger.debug('%r communicate: close %s', self, name)
transport.close()
return output
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index 09b875c..4ba4f49 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -565,7 +565,7 @@ class AbstractChildWatcher:
process 'pid' terminates. Specifying another callback for the same
process replaces the previous handler.
- Note: callback() must be thread-safe
+ Note: callback() must be thread-safe.
"""
raise NotImplementedError()
@@ -721,6 +721,9 @@ class SafeChildWatcher(BaseChildWatcher):
return
returncode = self._compute_returncode(status)
+ if self._loop.get_debug():
+ logger.debug('process %s exited with returncode %s',
+ expected_pid, returncode)
try:
callback, args = self._callbacks.pop(pid)
@@ -818,8 +821,16 @@ class FastChildWatcher(BaseChildWatcher):
if self._forks:
# It may not be registered yet.
self._zombies[pid] = returncode
+ if self._loop.get_debug():
+ logger.debug('unknown process %s exited '
+ 'with returncode %s',
+ pid, returncode)
continue
callback = None
+ else:
+ if self._loop.get_debug():
+ logger.debug('process %s exited with returncode %s',
+ pid, returncode)
if callback is None:
logger.warning(
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
index 27610f0..7bf07ed 100644
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -44,8 +44,6 @@ class BaseEventLoopTests(test_utils.TestCase):
self.assertRaises(
NotImplementedError, self.loop._write_to_self)
self.assertRaises(
- NotImplementedError, self.loop._read_from_self)
- self.assertRaises(
NotImplementedError,
self.loop._make_read_pipe_transport, m, m)
self.assertRaises(