diff options
-rw-r--r-- | Lib/asyncio/base_events.py | 131 | ||||
-rw-r--r-- | Lib/asyncio/base_subprocess.py | 40 | ||||
-rw-r--r-- | Lib/asyncio/streams.py | 12 | ||||
-rw-r--r-- | Lib/asyncio/subprocess.py | 26 | ||||
-rw-r--r-- | Lib/asyncio/unix_events.py | 13 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_base_events.py | 2 |
6 files changed, 181 insertions, 43 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index e5683fd..0aeaae4 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -1,7 +1,7 @@ """Base implementation of event loop. The event loop can be broken up into a multiplexer (the part -responsible for notifying us of IO events) and the event loop proper, +responsible for notifying us of I/O events) and the event loop proper, which wraps a multiplexer with functionality for scheduling callbacks, immediately or at a given time in the future. @@ -50,6 +50,15 @@ def _format_handle(handle): return str(handle) +def _format_pipe(fd): + if fd == subprocess.PIPE: + return '<pipe>' + elif fd == subprocess.STDOUT: + return '<stdout>' + else: + return repr(fd) + + class _StopError(BaseException): """Raised to stop the event loop.""" @@ -70,7 +79,7 @@ def _check_resolved_address(sock, address): type_mask |= socket.SOCK_NONBLOCK if hasattr(socket, 'SOCK_CLOEXEC'): type_mask |= socket.SOCK_CLOEXEC - # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is + # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is # already resolved. try: socket.getaddrinfo(host, port, @@ -158,7 +167,8 @@ class BaseEventLoop(events.AbstractEventLoop): def create_task(self, coro): """Schedule a coroutine object. - Return a task object.""" + Return a task object. + """ task = tasks.Task(coro, loop=self) if task._source_traceback: del task._source_traceback[-1] @@ -197,12 +207,13 @@ class BaseEventLoop(events.AbstractEventLoop): """Create subprocess transport.""" raise NotImplementedError - def _read_from_self(self): - """XXX""" - raise NotImplementedError - def _write_to_self(self): - """XXX""" + """Write a byte to self-pipe, to wake up the event loop. + + This may be called from a different thread. + + The subclass is responsible for implementing the self-pipe. + """ raise NotImplementedError def _process_events(self, event_list): @@ -233,7 +244,7 @@ class BaseEventLoop(events.AbstractEventLoop): If the argument is a coroutine, it is wrapped in a Task. - XXX TBD: It would be disastrous to call run_until_complete() + WARNING: It would be disastrous to call run_until_complete() with the same coroutine twice -- it would wrap it in two different Tasks and that can't be good. @@ -261,7 +272,7 @@ class BaseEventLoop(events.AbstractEventLoop): Every callback scheduled before stop() is called will run. Callback scheduled after stop() is called won't. However, - those callbacks will run if run() is called again later. + those callbacks will run if run_*() is called again later. """ self.call_soon(_raise_stop_error) @@ -274,7 +285,7 @@ class BaseEventLoop(events.AbstractEventLoop): The event loop must not be running. """ if self._running: - raise RuntimeError("cannot close a running event loop") + raise RuntimeError("Cannot close a running event loop") if self._closed: return if self._debug: @@ -292,11 +303,16 @@ class BaseEventLoop(events.AbstractEventLoop): return self._closed def is_running(self): - """Returns running status of event loop.""" + """Returns True if the event loop is running.""" return self._running def time(self): - """Return the time according to the event loop's clock.""" + """Return the time according to the event loop's clock. + + This is a float expressed in seconds since an epoch, but the + epoch, precision, accuracy and drift are unspecified and may + differ per event loop. + """ return time.monotonic() def call_later(self, delay, callback, *args): @@ -306,7 +322,7 @@ class BaseEventLoop(events.AbstractEventLoop): can be used to cancel the call. The delay can be an int or float, expressed in seconds. It is - always a relative time. + always relative to the current time. Each callback will be called exactly once. If two callbacks are scheduled for exactly the same time, it undefined which @@ -321,7 +337,10 @@ class BaseEventLoop(events.AbstractEventLoop): return timer def call_at(self, when, callback, *args): - """Like call_later(), but uses an absolute time.""" + """Like call_later(), but uses an absolute time. + + Absolute time corresponds to the event loop's time() method. + """ if coroutines.iscoroutinefunction(callback): raise TypeError("coroutines cannot be used with call_at()") if self._debug: @@ -335,7 +354,7 @@ class BaseEventLoop(events.AbstractEventLoop): def call_soon(self, callback, *args): """Arrange for a callback to be called as soon as possible. - This operates as a FIFO queue, callbacks are called in the + This operates as a FIFO queue: callbacks are called in the order in which they are registered. Each callback will be called exactly once. @@ -361,10 +380,10 @@ class BaseEventLoop(events.AbstractEventLoop): def _assert_is_current_event_loop(self): """Asserts that this event loop is the current event loop. - Non-threadsafe methods of this class make this assumption and will + Non-thread-safe methods of this class make this assumption and will likely behave incorrectly when the assumption is violated. - Should only be called when (self._debug == True). The caller is + Should only be called when (self._debug == True). The caller is responsible for checking this condition for performance reasons. """ try: @@ -373,11 +392,11 @@ class BaseEventLoop(events.AbstractEventLoop): return if current is not self: raise RuntimeError( - "non-threadsafe operation invoked on an event loop other " + "Non-thread-safe operation invoked on an event loop other " "than the current one") def call_soon_threadsafe(self, callback, *args): - """Like call_soon(), but thread safe.""" + """Like call_soon(), but thread-safe.""" handle = self._call_soon(callback, args, check_loop=False) if handle._source_traceback: del handle._source_traceback[-1] @@ -386,7 +405,7 @@ class BaseEventLoop(events.AbstractEventLoop): def run_in_executor(self, executor, callback, *args): if coroutines.iscoroutinefunction(callback): - raise TypeError("coroutines cannot be used with run_in_executor()") + raise TypeError("Coroutines cannot be used with run_in_executor()") if isinstance(callback, events.Handle): assert not args assert not isinstance(callback, events.TimerHandle) @@ -416,13 +435,13 @@ class BaseEventLoop(events.AbstractEventLoop): if flags: msg.append('flags=%r' % flags) msg = ', '.join(msg) - logger.debug('Get addresss info %s', msg) + logger.debug('Get address info %s', msg) t0 = self.time() addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) dt = self.time() - t0 - msg = ('Getting addresss info %s took %.3f ms: %r' + msg = ('Getting address info %s took %.3f ms: %r' % (msg, dt * 1e3, addrinfo)) if dt >= self.slow_callback_duration: logger.info(msg) @@ -559,8 +578,8 @@ class BaseEventLoop(events.AbstractEventLoop): transport, protocol = yield from self._create_connection_transport( sock, protocol_factory, ssl, server_hostname) if self._debug: - logger.debug("connected to %s:%r: (%r, %r)", - host, port, transport, protocol) + logger.debug("%r connected to %s:%r: (%r, %r)", + sock, host, port, transport, protocol) return transport, protocol @coroutine @@ -589,7 +608,7 @@ class BaseEventLoop(events.AbstractEventLoop): raise ValueError('unexpected address family') addr_pairs_info = (((family, proto), (None, None)),) else: - # join addresss by (family, protocol) + # join address by (family, protocol) addr_infos = collections.OrderedDict() for idx, addr in ((0, local_addr), (1, remote_addr)): if addr is not None: @@ -674,7 +693,7 @@ class BaseEventLoop(events.AbstractEventLoop): reuse_address=None): """Create a TCP server bound to host and port. - Return an Server object which can be used to stop the service. + Return a Server object which can be used to stop the service. This method is a coroutine. """ @@ -731,8 +750,7 @@ class BaseEventLoop(events.AbstractEventLoop): sock.close() else: if sock is None: - raise ValueError( - 'host and port was not specified and no sock specified') + raise ValueError('Neither host/port nor sock were specified') sockets = [sock] server = Server(self, sockets) @@ -750,6 +768,9 @@ class BaseEventLoop(events.AbstractEventLoop): waiter = futures.Future(loop=self) transport = self._make_read_pipe_transport(pipe, protocol, waiter) yield from waiter + if self._debug: + logger.debug('Read pipe %r connected: (%r, %r)', + pipe.fileno(), transport, protocol) return transport, protocol @coroutine @@ -758,8 +779,24 @@ class BaseEventLoop(events.AbstractEventLoop): waiter = futures.Future(loop=self) transport = self._make_write_pipe_transport(pipe, protocol, waiter) yield from waiter + if self._debug: + logger.debug('Write pipe %r connected: (%r, %r)', + pipe.fileno(), transport, protocol) return transport, protocol + def _log_subprocess(self, msg, stdin, stdout, stderr): + info = [msg] + if stdin is not None: + info.append('stdin=%s' % _format_pipe(stdin)) + if stdout is not None and stderr == subprocess.STDOUT: + info.append('stdout=stderr=%s' % _format_pipe(stdout)) + else: + if stdout is not None: + info.append('stdout=%s' % _format_pipe(stdout)) + if stderr is not None: + info.append('stderr=%s' % _format_pipe(stderr)) + logger.debug(' '.join(info)) + @coroutine def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -774,8 +811,15 @@ class BaseEventLoop(events.AbstractEventLoop): if bufsize != 0: raise ValueError("bufsize must be 0") protocol = protocol_factory() + if self._debug: + # don't log parameters: they may contain sensitive information + # (password) and may be too long + debug_log = 'run shell command %r' % cmd + self._log_subprocess(debug_log, stdin, stdout, stderr) transport = yield from self._make_subprocess_transport( protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) + if self._debug: + logger.info('%s: %r' % (debug_log, transport)) return transport, protocol @coroutine @@ -796,9 +840,16 @@ class BaseEventLoop(events.AbstractEventLoop): "a bytes or text string, not %s" % type(arg).__name__) protocol = protocol_factory() + if self._debug: + # don't log parameters: they may contain sensitive information + # (password) and may be too long + debug_log = 'execute program %r' % program + self._log_subprocess(debug_log, stdin, stdout, stderr) transport = yield from self._make_subprocess_transport( protocol, popen_args, False, stdin, stdout, stderr, bufsize, **kwargs) + if self._debug: + logger.info('%s: %r' % (debug_log, transport)) return transport, protocol def set_exception_handler(self, handler): @@ -808,7 +859,7 @@ class BaseEventLoop(events.AbstractEventLoop): be set. If handler is a callable object, it should have a - matching signature to '(loop, context)', where 'loop' + signature matching '(loop, context)', where 'loop' will be a reference to the active event loop, 'context' will be a dict object (see `call_exception_handler()` documentation for details about context). @@ -825,7 +876,7 @@ class BaseEventLoop(events.AbstractEventLoop): handler is set, and can be called by a custom exception handler that wants to defer to the default behavior. - context parameter has the same meaning as in + The context parameter has the same meaning as in `call_exception_handler()`. """ message = context.get('message') @@ -854,10 +905,10 @@ class BaseEventLoop(events.AbstractEventLoop): logger.error('\n'.join(log_lines), exc_info=exc_info) def call_exception_handler(self, context): - """Call the current event loop exception handler. + """Call the current event loop's exception handler. + + The context argument is a dict containing the following keys: - context is a dict object containing the following keys - (new keys maybe introduced later): - 'message': Error message; - 'exception' (optional): Exception object; - 'future' (optional): Future instance; @@ -866,8 +917,10 @@ class BaseEventLoop(events.AbstractEventLoop): - 'transport' (optional): Transport instance; - 'socket' (optional): Socket instance. - Note: this method should not be overloaded in subclassed - event loops. For any custom exception handling, use + New keys maybe introduced in the future. + + Note: do not overload this method in an event loop subclass. + For custom exception handling, use the `set_exception_handler()` method. """ if self._exception_handler is None: @@ -892,7 +945,7 @@ class BaseEventLoop(events.AbstractEventLoop): 'context': context, }) except Exception: - # Guard 'default_exception_handler' in case it's + # Guard 'default_exception_handler' in case it is # overloaded. logger.error('Exception in default exception handler ' 'while handling an unexpected error ' @@ -900,7 +953,7 @@ class BaseEventLoop(events.AbstractEventLoop): exc_info=True) def _add_callback(self, handle): - """Add a Handle to ready or scheduled.""" + """Add a Handle to _scheduled (TimerHandle) or _ready.""" assert isinstance(handle, events.Handle), 'A Handle is required here' if handle._cancelled: return @@ -971,7 +1024,7 @@ class BaseEventLoop(events.AbstractEventLoop): # Note: We run all currently scheduled callbacks, but not any # callbacks scheduled by callbacks run this time around -- # they will be run the next time (after another I/O poll). - # Use an idiom that is threadsafe without using locks. + # Use an idiom that is thread-safe without using locks. ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 2f933c5..d008779 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -4,6 +4,7 @@ import subprocess from . import protocols from . import transports from .coroutines import coroutine +from .log import logger class BaseSubprocessTransport(transports.SubprocessTransport): @@ -14,6 +15,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport): super().__init__(extra) self._protocol = protocol self._loop = loop + self._pid = None self._pipes = {} if stdin == subprocess.PIPE: @@ -27,7 +29,36 @@ class BaseSubprocessTransport(transports.SubprocessTransport): self._returncode = None self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, bufsize=bufsize, **kwargs) + self._pid = self._proc.pid self._extra['subprocess'] = self._proc + if self._loop.get_debug(): + if isinstance(args, (bytes, str)): + program = args + else: + program = args[0] + logger.debug('process %r created: pid %s', + program, self._pid) + + def __repr__(self): + info = [self.__class__.__name__, 'pid=%s' % self._pid] + if self._returncode is not None: + info.append('returncode=%s' % self._returncode) + + stdin = self._pipes.get(0) + if stdin is not None: + info.append('stdin=%s' % stdin.pipe) + + stdout = self._pipes.get(1) + stderr = self._pipes.get(2) + if stdout is not None and stderr is stdout: + info.append('stdout=stderr=%s' % stdout.pipe) + else: + if stdout is not None: + info.append('stdout=%s' % stdout.pipe) + if stderr is not None: + info.append('stderr=%s' % stderr.pipe) + + return '<%s>' % ' '.join(info) def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): raise NotImplementedError @@ -45,7 +76,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport): self.terminate() def get_pid(self): - return self._proc.pid + return self._pid def get_returncode(self): return self._returncode @@ -108,6 +139,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport): def _process_exited(self, returncode): assert returncode is not None, returncode assert self._returncode is None, self._returncode + if self._loop.get_debug(): + logger.info('%r exited with return code %r', + self, returncode) self._returncode = returncode self._call(self._protocol.process_exited) self._try_finish() @@ -141,6 +175,10 @@ class WriteSubprocessPipeProto(protocols.BaseProtocol): def connection_made(self, transport): self.pipe = transport + def __repr__(self): + return ('<%s fd=%s pipe=%r>' + % (self.__class__.__name__, self.fd, self.pipe)) + def connection_lost(self, exc): self.disconnected = True self.proc._pipe_connection_lost(self.fd, exc) diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 9b654cd..d18db77 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -15,6 +15,7 @@ from . import events from . import futures from . import protocols from .coroutines import coroutine +from .log import logger _DEFAULT_LIMIT = 2**16 @@ -153,10 +154,15 @@ class FlowControlMixin(protocols.Protocol): def pause_writing(self): assert not self._paused self._paused = True + if self._loop.get_debug(): + logger.debug("%r pauses writing", self) def resume_writing(self): assert self._paused self._paused = False + if self._loop.get_debug(): + logger.debug("%r resumes writing", self) + waiter = self._drain_waiter if waiter is not None: self._drain_waiter = None @@ -244,6 +250,12 @@ class StreamWriter: self._reader = reader self._loop = loop + def __repr__(self): + info = [self.__class__.__name__, 'transport=%r' % self._transport] + if self._reader is not None: + info.append('reader=%r' % self._reader) + return '<%s>' % ' '.join(info) + @property def transport(self): return self._transport diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index 2cd6de6..12902f1 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -9,6 +9,7 @@ from . import protocols from . import streams from . import tasks from .coroutines import coroutine +from .log import logger PIPE = subprocess.PIPE @@ -28,6 +29,16 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, self._waiters = collections.deque() self._transport = None + def __repr__(self): + info = [self.__class__.__name__] + if self.stdin is not None: + info.append('stdin=%r' % self.stdin) + if self.stdout is not None: + info.append('stdout=%r' % self.stdout) + if self.stderr is not None: + info.append('stderr=%r' % self.stderr) + return '<%s>' % ' '.join(info) + def connection_made(self, transport): self._transport = transport if transport.get_pipe_transport(1): @@ -91,6 +102,9 @@ class Process: self.stderr = protocol.stderr self.pid = transport.get_pid() + def __repr__(self): + return '<%s %s>' % (self.__class__.__name__, self.pid) + @property def returncode(self): return self._transport.get_returncode() @@ -126,7 +140,13 @@ class Process: @coroutine def _feed_stdin(self, input): self.stdin.write(input) + if self._loop.get_debug(): + logger.debug('%r communicate: feed stdin (%s bytes)', + self, len(input)) yield from self.stdin.drain() + + if self._loop.get_debug(): + logger.debug('%r communicate: close stdin', self) self.stdin.close() @coroutine @@ -141,7 +161,13 @@ class Process: else: assert fd == 1 stream = self.stdout + if self._loop.get_debug(): + name = 'stdout' if fd == 1 else 'stderr' + logger.debug('%r communicate: read %s', self, name) output = yield from stream.read() + if self._loop.get_debug(): + name = 'stdout' if fd == 1 else 'stderr' + logger.debug('%r communicate: close %s', self, name) transport.close() return output diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 09b875c..4ba4f49 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -565,7 +565,7 @@ class AbstractChildWatcher: process 'pid' terminates. Specifying another callback for the same process replaces the previous handler. - Note: callback() must be thread-safe + Note: callback() must be thread-safe. """ raise NotImplementedError() @@ -721,6 +721,9 @@ class SafeChildWatcher(BaseChildWatcher): return returncode = self._compute_returncode(status) + if self._loop.get_debug(): + logger.debug('process %s exited with returncode %s', + expected_pid, returncode) try: callback, args = self._callbacks.pop(pid) @@ -818,8 +821,16 @@ class FastChildWatcher(BaseChildWatcher): if self._forks: # It may not be registered yet. self._zombies[pid] = returncode + if self._loop.get_debug(): + logger.debug('unknown process %s exited ' + 'with returncode %s', + pid, returncode) continue callback = None + else: + if self._loop.get_debug(): + logger.debug('process %s exited with returncode %s', + pid, returncode) if callback is None: logger.warning( diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 27610f0..7bf07ed 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -44,8 +44,6 @@ class BaseEventLoopTests(test_utils.TestCase): self.assertRaises( NotImplementedError, self.loop._write_to_self) self.assertRaises( - NotImplementedError, self.loop._read_from_self) - self.assertRaises( NotImplementedError, self.loop._make_read_pipe_transport, m, m) self.assertRaises( |