diff options
Diffstat (limited to 'Lib/asyncio/base_events.py')
-rw-r--r-- | Lib/asyncio/base_events.py | 1909 |
1 files changed, 0 insertions, 1909 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py deleted file mode 100644 index adcdec1..0000000 --- a/Lib/asyncio/base_events.py +++ /dev/null @@ -1,1909 +0,0 @@ -"""Base implementation of event loop. - -The event loop can be broken up into a multiplexer (the part -responsible for notifying us of I/O events) and the event loop proper, -which wraps a multiplexer with functionality for scheduling callbacks, -immediately or at a given time in the future. - -Whenever a public API takes a callback, subsequent positional -arguments will be passed to the callback if/when it is called. This -avoids the proliferation of trivial lambdas implementing closures. -Keyword arguments for the callback are not supported; this is a -conscious design decision, leaving the door open for keyword arguments -to modify the meaning of the API call itself. -""" - -import collections -import collections.abc -import concurrent.futures -import functools -import heapq -import itertools -import os -import socket -import stat -import subprocess -import threading -import time -import traceback -import sys -import warnings -import weakref - -try: - import ssl -except ImportError: # pragma: no cover - ssl = None - -from . import constants -from . import coroutines -from . import events -from . import exceptions -from . import futures -from . import protocols -from . import sslproto -from . import staggered -from . import tasks -from . import transports -from . import trsock -from .log import logger - - -__all__ = 'BaseEventLoop', - - -# Minimum number of _scheduled timer handles before cleanup of -# cancelled handles is performed. -_MIN_SCHEDULED_TIMER_HANDLES = 100 - -# Minimum fraction of _scheduled timer handles that are cancelled -# before cleanup of cancelled handles is performed. -_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 - - -_HAS_IPv6 = hasattr(socket, 'AF_INET6') - -# Maximum timeout passed to select to avoid OS limitations -MAXIMUM_SELECT_TIMEOUT = 24 * 3600 - -# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s -# *reuse_address* parameter -_unset = object() - - -def _format_handle(handle): - cb = handle._callback - if isinstance(getattr(cb, '__self__', None), tasks.Task): - # format the task - return repr(cb.__self__) - else: - return str(handle) - - -def _format_pipe(fd): - if fd == subprocess.PIPE: - return '<pipe>' - elif fd == subprocess.STDOUT: - return '<stdout>' - else: - return repr(fd) - - -def _set_reuseport(sock): - if not hasattr(socket, 'SO_REUSEPORT'): - raise ValueError('reuse_port not supported by socket module') - else: - try: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) - except OSError: - raise ValueError('reuse_port not supported by socket module, ' - 'SO_REUSEPORT defined but not implemented.') - - -def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): - # Try to skip getaddrinfo if "host" is already an IP. Users might have - # handled name resolution in their own code and pass in resolved IPs. - if not hasattr(socket, 'inet_pton'): - return - - if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ - host is None: - return None - - if type == socket.SOCK_STREAM: - proto = socket.IPPROTO_TCP - elif type == socket.SOCK_DGRAM: - proto = socket.IPPROTO_UDP - else: - return None - - if port is None: - port = 0 - elif isinstance(port, bytes) and port == b'': - port = 0 - elif isinstance(port, str) and port == '': - port = 0 - else: - # If port's a service name like "http", don't skip getaddrinfo. - try: - port = int(port) - except (TypeError, ValueError): - return None - - if family == socket.AF_UNSPEC: - afs = [socket.AF_INET] - if _HAS_IPv6: - afs.append(socket.AF_INET6) - else: - afs = [family] - - if isinstance(host, bytes): - host = host.decode('idna') - if '%' in host: - # Linux's inet_pton doesn't accept an IPv6 zone index after host, - # like '::1%lo0'. - return None - - for af in afs: - try: - socket.inet_pton(af, host) - # The host has already been resolved. - if _HAS_IPv6 and af == socket.AF_INET6: - return af, type, proto, '', (host, port, flowinfo, scopeid) - else: - return af, type, proto, '', (host, port) - except OSError: - pass - - # "host" is not an IP address. - return None - - -def _interleave_addrinfos(addrinfos, first_address_family_count=1): - """Interleave list of addrinfo tuples by family.""" - # Group addresses by family - addrinfos_by_family = collections.OrderedDict() - for addr in addrinfos: - family = addr[0] - if family not in addrinfos_by_family: - addrinfos_by_family[family] = [] - addrinfos_by_family[family].append(addr) - addrinfos_lists = list(addrinfos_by_family.values()) - - reordered = [] - if first_address_family_count > 1: - reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) - del addrinfos_lists[0][:first_address_family_count - 1] - reordered.extend( - a for a in itertools.chain.from_iterable( - itertools.zip_longest(*addrinfos_lists) - ) if a is not None) - return reordered - - -def _run_until_complete_cb(fut): - if not fut.cancelled(): - exc = fut.exception() - if isinstance(exc, (SystemExit, KeyboardInterrupt)): - # Issue #22429: run_forever() already finished, no need to - # stop it. - return - futures._get_loop(fut).stop() - - -if hasattr(socket, 'TCP_NODELAY'): - def _set_nodelay(sock): - if (sock.family in {socket.AF_INET, socket.AF_INET6} and - sock.type == socket.SOCK_STREAM and - sock.proto == socket.IPPROTO_TCP): - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) -else: - def _set_nodelay(sock): - pass - - -class _SendfileFallbackProtocol(protocols.Protocol): - def __init__(self, transp): - if not isinstance(transp, transports._FlowControlMixin): - raise TypeError("transport should be _FlowControlMixin instance") - self._transport = transp - self._proto = transp.get_protocol() - self._should_resume_reading = transp.is_reading() - self._should_resume_writing = transp._protocol_paused - transp.pause_reading() - transp.set_protocol(self) - if self._should_resume_writing: - self._write_ready_fut = self._transport._loop.create_future() - else: - self._write_ready_fut = None - - async def drain(self): - if self._transport.is_closing(): - raise ConnectionError("Connection closed by peer") - fut = self._write_ready_fut - if fut is None: - return - await fut - - def connection_made(self, transport): - raise RuntimeError("Invalid state: " - "connection should have been established already.") - - def connection_lost(self, exc): - if self._write_ready_fut is not None: - # Never happens if peer disconnects after sending the whole content - # Thus disconnection is always an exception from user perspective - if exc is None: - self._write_ready_fut.set_exception( - ConnectionError("Connection is closed by peer")) - else: - self._write_ready_fut.set_exception(exc) - self._proto.connection_lost(exc) - - def pause_writing(self): - if self._write_ready_fut is not None: - return - self._write_ready_fut = self._transport._loop.create_future() - - def resume_writing(self): - if self._write_ready_fut is None: - return - self._write_ready_fut.set_result(False) - self._write_ready_fut = None - - def data_received(self, data): - raise RuntimeError("Invalid state: reading should be paused") - - def eof_received(self): - raise RuntimeError("Invalid state: reading should be paused") - - async def restore(self): - self._transport.set_protocol(self._proto) - if self._should_resume_reading: - self._transport.resume_reading() - if self._write_ready_fut is not None: - # Cancel the future. - # Basically it has no effect because protocol is switched back, - # no code should wait for it anymore. - self._write_ready_fut.cancel() - if self._should_resume_writing: - self._proto.resume_writing() - - -class Server(events.AbstractServer): - - def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, - ssl_handshake_timeout): - self._loop = loop - self._sockets = sockets - self._active_count = 0 - self._waiters = [] - self._protocol_factory = protocol_factory - self._backlog = backlog - self._ssl_context = ssl_context - self._ssl_handshake_timeout = ssl_handshake_timeout - self._serving = False - self._serving_forever_fut = None - - def __repr__(self): - return f'<{self.__class__.__name__} sockets={self.sockets!r}>' - - def _attach(self): - assert self._sockets is not None - self._active_count += 1 - - def _detach(self): - assert self._active_count > 0 - self._active_count -= 1 - if self._active_count == 0 and self._sockets is None: - self._wakeup() - - def _wakeup(self): - waiters = self._waiters - self._waiters = None - for waiter in waiters: - if not waiter.done(): - waiter.set_result(waiter) - - def _start_serving(self): - if self._serving: - return - self._serving = True - for sock in self._sockets: - sock.listen(self._backlog) - self._loop._start_serving( - self._protocol_factory, sock, self._ssl_context, - self, self._backlog, self._ssl_handshake_timeout) - - def get_loop(self): - return self._loop - - def is_serving(self): - return self._serving - - @property - def sockets(self): - if self._sockets is None: - return () - return tuple(trsock.TransportSocket(s) for s in self._sockets) - - def close(self): - sockets = self._sockets - if sockets is None: - return - self._sockets = None - - for sock in sockets: - self._loop._stop_serving(sock) - - self._serving = False - - if (self._serving_forever_fut is not None and - not self._serving_forever_fut.done()): - self._serving_forever_fut.cancel() - self._serving_forever_fut = None - - if self._active_count == 0: - self._wakeup() - - async def start_serving(self): - self._start_serving() - # Skip one loop iteration so that all 'loop.add_reader' - # go through. - await tasks.sleep(0, loop=self._loop) - - async def serve_forever(self): - if self._serving_forever_fut is not None: - raise RuntimeError( - f'server {self!r} is already being awaited on serve_forever()') - if self._sockets is None: - raise RuntimeError(f'server {self!r} is closed') - - self._start_serving() - self._serving_forever_fut = self._loop.create_future() - - try: - await self._serving_forever_fut - except exceptions.CancelledError: - try: - self.close() - await self.wait_closed() - finally: - raise - finally: - self._serving_forever_fut = None - - async def wait_closed(self): - if self._sockets is None or self._waiters is None: - return - waiter = self._loop.create_future() - self._waiters.append(waiter) - await waiter - - -class BaseEventLoop(events.AbstractEventLoop): - - def __init__(self): - self._timer_cancelled_count = 0 - self._closed = False - self._stopping = False - self._ready = collections.deque() - self._scheduled = [] - self._default_executor = None - self._internal_fds = 0 - # Identifier of the thread running the event loop, or None if the - # event loop is not running - self._thread_id = None - self._clock_resolution = time.get_clock_info('monotonic').resolution - self._exception_handler = None - self.set_debug(coroutines._is_debug_mode()) - # In debug mode, if the execution of a callback or a step of a task - # exceed this duration in seconds, the slow callback/task is logged. - self.slow_callback_duration = 0.1 - self._current_handle = None - self._task_factory = None - self._coroutine_origin_tracking_enabled = False - self._coroutine_origin_tracking_saved_depth = None - - # A weak set of all asynchronous generators that are - # being iterated by the loop. - self._asyncgens = weakref.WeakSet() - # Set to True when `loop.shutdown_asyncgens` is called. - self._asyncgens_shutdown_called = False - # Set to True when `loop.shutdown_default_executor` is called. - self._executor_shutdown_called = False - - def __repr__(self): - return ( - f'<{self.__class__.__name__} running={self.is_running()} ' - f'closed={self.is_closed()} debug={self.get_debug()}>' - ) - - def create_future(self): - """Create a Future object attached to the loop.""" - return futures.Future(loop=self) - - def create_task(self, coro, *, name=None): - """Schedule a coroutine object. - - Return a task object. - """ - self._check_closed() - if self._task_factory is None: - task = tasks.Task(coro, loop=self, name=name) - if task._source_traceback: - del task._source_traceback[-1] - else: - task = self._task_factory(self, coro) - tasks._set_task_name(task, name) - - return task - - def set_task_factory(self, factory): - """Set a task factory that will be used by loop.create_task(). - - If factory is None the default task factory will be set. - - If factory is a callable, it should have a signature matching - '(loop, coro)', where 'loop' will be a reference to the active - event loop, 'coro' will be a coroutine object. The callable - must return a Future. - """ - if factory is not None and not callable(factory): - raise TypeError('task factory must be a callable or None') - self._task_factory = factory - - def get_task_factory(self): - """Return a task factory, or None if the default one is in use.""" - return self._task_factory - - def _make_socket_transport(self, sock, protocol, waiter=None, *, - extra=None, server=None): - """Create socket transport.""" - raise NotImplementedError - - def _make_ssl_transport( - self, rawsock, protocol, sslcontext, waiter=None, - *, server_side=False, server_hostname=None, - extra=None, server=None, - ssl_handshake_timeout=None, - call_connection_made=True): - """Create SSL transport.""" - raise NotImplementedError - - def _make_datagram_transport(self, sock, protocol, - address=None, waiter=None, extra=None): - """Create datagram transport.""" - raise NotImplementedError - - def _make_read_pipe_transport(self, pipe, protocol, waiter=None, - extra=None): - """Create read pipe transport.""" - raise NotImplementedError - - def _make_write_pipe_transport(self, pipe, protocol, waiter=None, - extra=None): - """Create write pipe transport.""" - raise NotImplementedError - - async def _make_subprocess_transport(self, protocol, args, shell, - stdin, stdout, stderr, bufsize, - extra=None, **kwargs): - """Create subprocess transport.""" - raise NotImplementedError - - def _write_to_self(self): - """Write a byte to self-pipe, to wake up the event loop. - - This may be called from a different thread. - - The subclass is responsible for implementing the self-pipe. - """ - raise NotImplementedError - - def _process_events(self, event_list): - """Process selector events.""" - raise NotImplementedError - - def _check_closed(self): - if self._closed: - raise RuntimeError('Event loop is closed') - - def _check_default_executor(self): - if self._executor_shutdown_called: - raise RuntimeError('Executor shutdown has been called') - - def _asyncgen_finalizer_hook(self, agen): - self._asyncgens.discard(agen) - if not self.is_closed(): - self.call_soon_threadsafe(self.create_task, agen.aclose()) - - def _asyncgen_firstiter_hook(self, agen): - if self._asyncgens_shutdown_called: - warnings.warn( - f"asynchronous generator {agen!r} was scheduled after " - f"loop.shutdown_asyncgens() call", - ResourceWarning, source=self) - - self._asyncgens.add(agen) - - async def shutdown_asyncgens(self): - """Shutdown all active asynchronous generators.""" - self._asyncgens_shutdown_called = True - - if not len(self._asyncgens): - # If Python version is <3.6 or we don't have any asynchronous - # generators alive. - return - - closing_agens = list(self._asyncgens) - self._asyncgens.clear() - - results = await tasks.gather( - *[ag.aclose() for ag in closing_agens], - return_exceptions=True, - loop=self) - - for result, agen in zip(results, closing_agens): - if isinstance(result, Exception): - self.call_exception_handler({ - 'message': f'an error occurred during closing of ' - f'asynchronous generator {agen!r}', - 'exception': result, - 'asyncgen': agen - }) - - async def shutdown_default_executor(self): - """Schedule the shutdown of the default executor.""" - self._executor_shutdown_called = True - if self._default_executor is None: - return - future = self.create_future() - thread = threading.Thread(target=self._do_shutdown, args=(future,)) - thread.start() - try: - await future - finally: - thread.join() - - def _do_shutdown(self, future): - try: - self._default_executor.shutdown(wait=True) - self.call_soon_threadsafe(future.set_result, None) - except Exception as ex: - self.call_soon_threadsafe(future.set_exception, ex) - - def run_forever(self): - """Run until stop() is called.""" - self._check_closed() - if self.is_running(): - raise RuntimeError('This event loop is already running') - if events._get_running_loop() is not None: - raise RuntimeError( - 'Cannot run the event loop while another loop is running') - self._set_coroutine_origin_tracking(self._debug) - self._thread_id = threading.get_ident() - - old_agen_hooks = sys.get_asyncgen_hooks() - sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, - finalizer=self._asyncgen_finalizer_hook) - try: - events._set_running_loop(self) - while True: - self._run_once() - if self._stopping: - break - finally: - self._stopping = False - self._thread_id = None - events._set_running_loop(None) - self._set_coroutine_origin_tracking(False) - sys.set_asyncgen_hooks(*old_agen_hooks) - - def run_until_complete(self, future): - """Run until the Future is done. - - If the argument is a coroutine, it is wrapped in a Task. - - WARNING: It would be disastrous to call run_until_complete() - with the same coroutine twice -- it would wrap it in two - different Tasks and that can't be good. - - Return the Future's result, or raise its exception. - """ - self._check_closed() - - new_task = not futures.isfuture(future) - future = tasks.ensure_future(future, loop=self) - if new_task: - # An exception is raised if the future didn't complete, so there - # is no need to log the "destroy pending task" message - future._log_destroy_pending = False - - future.add_done_callback(_run_until_complete_cb) - try: - self.run_forever() - except: - if new_task and future.done() and not future.cancelled(): - # The coroutine raised a BaseException. Consume the exception - # to not log a warning, the caller doesn't have access to the - # local task. - future.exception() - raise - finally: - future.remove_done_callback(_run_until_complete_cb) - if not future.done(): - raise RuntimeError('Event loop stopped before Future completed.') - - return future.result() - - def stop(self): - """Stop running the event loop. - - Every callback already scheduled will still run. This simply informs - run_forever to stop looping after a complete iteration. - """ - self._stopping = True - - def close(self): - """Close the event loop. - - This clears the queues and shuts down the executor, - but does not wait for the executor to finish. - - The event loop must not be running. - """ - if self.is_running(): - raise RuntimeError("Cannot close a running event loop") - if self._closed: - return - if self._debug: - logger.debug("Close %r", self) - self._closed = True - self._ready.clear() - self._scheduled.clear() - self._executor_shutdown_called = True - executor = self._default_executor - if executor is not None: - self._default_executor = None - executor.shutdown(wait=False) - - def is_closed(self): - """Returns True if the event loop was closed.""" - return self._closed - - def __del__(self, _warn=warnings.warn): - if not self.is_closed(): - _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) - if not self.is_running(): - self.close() - - def is_running(self): - """Returns True if the event loop is running.""" - return (self._thread_id is not None) - - def time(self): - """Return the time according to the event loop's clock. - - This is a float expressed in seconds since an epoch, but the - epoch, precision, accuracy and drift are unspecified and may - differ per event loop. - """ - return time.monotonic() - - def call_later(self, delay, callback, *args, context=None): - """Arrange for a callback to be called at a given time. - - Return a Handle: an opaque object with a cancel() method that - can be used to cancel the call. - - The delay can be an int or float, expressed in seconds. It is - always relative to the current time. - - Each callback will be called exactly once. If two callbacks - are scheduled for exactly the same time, it undefined which - will be called first. - - Any positional arguments after the callback will be passed to - the callback when it is called. - """ - timer = self.call_at(self.time() + delay, callback, *args, - context=context) - if timer._source_traceback: - del timer._source_traceback[-1] - return timer - - def call_at(self, when, callback, *args, context=None): - """Like call_later(), but uses an absolute time. - - Absolute time corresponds to the event loop's time() method. - """ - self._check_closed() - if self._debug: - self._check_thread() - self._check_callback(callback, 'call_at') - timer = events.TimerHandle(when, callback, args, self, context) - if timer._source_traceback: - del timer._source_traceback[-1] - heapq.heappush(self._scheduled, timer) - timer._scheduled = True - return timer - - def call_soon(self, callback, *args, context=None): - """Arrange for a callback to be called as soon as possible. - - This operates as a FIFO queue: callbacks are called in the - order in which they are registered. Each callback will be - called exactly once. - - Any positional arguments after the callback will be passed to - the callback when it is called. - """ - self._check_closed() - if self._debug: - self._check_thread() - self._check_callback(callback, 'call_soon') - handle = self._call_soon(callback, args, context) - if handle._source_traceback: - del handle._source_traceback[-1] - return handle - - def _check_callback(self, callback, method): - if (coroutines.iscoroutine(callback) or - coroutines.iscoroutinefunction(callback)): - raise TypeError( - f"coroutines cannot be used with {method}()") - if not callable(callback): - raise TypeError( - f'a callable object was expected by {method}(), ' - f'got {callback!r}') - - def _call_soon(self, callback, args, context): - handle = events.Handle(callback, args, self, context) - if handle._source_traceback: - del handle._source_traceback[-1] - self._ready.append(handle) - return handle - - def _check_thread(self): - """Check that the current thread is the thread running the event loop. - - Non-thread-safe methods of this class make this assumption and will - likely behave incorrectly when the assumption is violated. - - Should only be called when (self._debug == True). The caller is - responsible for checking this condition for performance reasons. - """ - if self._thread_id is None: - return - thread_id = threading.get_ident() - if thread_id != self._thread_id: - raise RuntimeError( - "Non-thread-safe operation invoked on an event loop other " - "than the current one") - - def call_soon_threadsafe(self, callback, *args, context=None): - """Like call_soon(), but thread-safe.""" - self._check_closed() - if self._debug: - self._check_callback(callback, 'call_soon_threadsafe') - handle = self._call_soon(callback, args, context) - if handle._source_traceback: - del handle._source_traceback[-1] - self._write_to_self() - return handle - - def run_in_executor(self, executor, func, *args): - self._check_closed() - if self._debug: - self._check_callback(func, 'run_in_executor') - if executor is None: - executor = self._default_executor - # Only check when the default executor is being used - self._check_default_executor() - if executor is None: - executor = concurrent.futures.ThreadPoolExecutor() - self._default_executor = executor - return futures.wrap_future( - executor.submit(func, *args), loop=self) - - def set_default_executor(self, executor): - if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): - warnings.warn( - 'Using the default executor that is not an instance of ' - 'ThreadPoolExecutor is deprecated and will be prohibited ' - 'in Python 3.9', - DeprecationWarning, 2) - self._default_executor = executor - - def _getaddrinfo_debug(self, host, port, family, type, proto, flags): - msg = [f"{host}:{port!r}"] - if family: - msg.append(f'family={family!r}') - if type: - msg.append(f'type={type!r}') - if proto: - msg.append(f'proto={proto!r}') - if flags: - msg.append(f'flags={flags!r}') - msg = ', '.join(msg) - logger.debug('Get address info %s', msg) - - t0 = self.time() - addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) - dt = self.time() - t0 - - msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' - if dt >= self.slow_callback_duration: - logger.info(msg) - else: - logger.debug(msg) - return addrinfo - - async def getaddrinfo(self, host, port, *, - family=0, type=0, proto=0, flags=0): - if self._debug: - getaddr_func = self._getaddrinfo_debug - else: - getaddr_func = socket.getaddrinfo - - return await self.run_in_executor( - None, getaddr_func, host, port, family, type, proto, flags) - - async def getnameinfo(self, sockaddr, flags=0): - return await self.run_in_executor( - None, socket.getnameinfo, sockaddr, flags) - - async def sock_sendfile(self, sock, file, offset=0, count=None, - *, fallback=True): - if self._debug and sock.gettimeout() != 0: - raise ValueError("the socket must be non-blocking") - self._check_sendfile_params(sock, file, offset, count) - try: - return await self._sock_sendfile_native(sock, file, - offset, count) - except exceptions.SendfileNotAvailableError as exc: - if not fallback: - raise - return await self._sock_sendfile_fallback(sock, file, - offset, count) - - async def _sock_sendfile_native(self, sock, file, offset, count): - # NB: sendfile syscall is not supported for SSL sockets and - # non-mmap files even if sendfile is supported by OS - raise exceptions.SendfileNotAvailableError( - f"syscall sendfile is not available for socket {sock!r} " - "and file {file!r} combination") - - async def _sock_sendfile_fallback(self, sock, file, offset, count): - if offset: - file.seek(offset) - blocksize = ( - min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) - if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE - ) - buf = bytearray(blocksize) - total_sent = 0 - try: - while True: - if count: - blocksize = min(count - total_sent, blocksize) - if blocksize <= 0: - break - view = memoryview(buf)[:blocksize] - read = await self.run_in_executor(None, file.readinto, view) - if not read: - break # EOF - await self.sock_sendall(sock, view[:read]) - total_sent += read - return total_sent - finally: - if total_sent > 0 and hasattr(file, 'seek'): - file.seek(offset + total_sent) - - def _check_sendfile_params(self, sock, file, offset, count): - if 'b' not in getattr(file, 'mode', 'b'): - raise ValueError("file should be opened in binary mode") - if not sock.type == socket.SOCK_STREAM: - raise ValueError("only SOCK_STREAM type sockets are supported") - if count is not None: - if not isinstance(count, int): - raise TypeError( - "count must be a positive integer (got {!r})".format(count)) - if count <= 0: - raise ValueError( - "count must be a positive integer (got {!r})".format(count)) - if not isinstance(offset, int): - raise TypeError( - "offset must be a non-negative integer (got {!r})".format( - offset)) - if offset < 0: - raise ValueError( - "offset must be a non-negative integer (got {!r})".format( - offset)) - - async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): - """Create, bind and connect one socket.""" - my_exceptions = [] - exceptions.append(my_exceptions) - family, type_, proto, _, address = addr_info - sock = None - try: - sock = socket.socket(family=family, type=type_, proto=proto) - sock.setblocking(False) - if local_addr_infos is not None: - for _, _, _, _, laddr in local_addr_infos: - try: - sock.bind(laddr) - break - except OSError as exc: - msg = ( - f'error while attempting to bind on ' - f'address {laddr!r}: ' - f'{exc.strerror.lower()}' - ) - exc = OSError(exc.errno, msg) - my_exceptions.append(exc) - else: # all bind attempts failed - raise my_exceptions.pop() - await self.sock_connect(sock, address) - return sock - except OSError as exc: - my_exceptions.append(exc) - if sock is not None: - sock.close() - raise - except: - if sock is not None: - sock.close() - raise - - async def create_connection( - self, protocol_factory, host=None, port=None, - *, ssl=None, family=0, - proto=0, flags=0, sock=None, - local_addr=None, server_hostname=None, - ssl_handshake_timeout=None, - happy_eyeballs_delay=None, interleave=None): - """Connect to a TCP server. - - Create a streaming transport connection to a given Internet host and - port: socket family AF_INET or socket.AF_INET6 depending on host (or - family if specified), socket type SOCK_STREAM. protocol_factory must be - a callable returning a protocol instance. - - This method is a coroutine which will try to establish the connection - in the background. When successful, the coroutine returns a - (transport, protocol) pair. - """ - if server_hostname is not None and not ssl: - raise ValueError('server_hostname is only meaningful with ssl') - - if server_hostname is None and ssl: - # Use host as default for server_hostname. It is an error - # if host is empty or not set, e.g. when an - # already-connected socket was passed or when only a port - # is given. To avoid this error, you can pass - # server_hostname='' -- this will bypass the hostname - # check. (This also means that if host is a numeric - # IP/IPv6 address, we will attempt to verify that exact - # address; this will probably fail, but it is possible to - # create a certificate for a specific IP address, so we - # don't judge it here.) - if not host: - raise ValueError('You must set server_hostname ' - 'when using ssl without a host') - server_hostname = host - - if ssl_handshake_timeout is not None and not ssl: - raise ValueError( - 'ssl_handshake_timeout is only meaningful with ssl') - - if happy_eyeballs_delay is not None and interleave is None: - # If using happy eyeballs, default to interleave addresses by family - interleave = 1 - - if host is not None or port is not None: - if sock is not None: - raise ValueError( - 'host/port and sock can not be specified at the same time') - - infos = await self._ensure_resolved( - (host, port), family=family, - type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) - if not infos: - raise OSError('getaddrinfo() returned empty list') - - if local_addr is not None: - laddr_infos = await self._ensure_resolved( - local_addr, family=family, - type=socket.SOCK_STREAM, proto=proto, - flags=flags, loop=self) - if not laddr_infos: - raise OSError('getaddrinfo() returned empty list') - else: - laddr_infos = None - - if interleave: - infos = _interleave_addrinfos(infos, interleave) - - exceptions = [] - if happy_eyeballs_delay is None: - # not using happy eyeballs - for addrinfo in infos: - try: - sock = await self._connect_sock( - exceptions, addrinfo, laddr_infos) - break - except OSError: - continue - else: # using happy eyeballs - sock, _, _ = await staggered.staggered_race( - (functools.partial(self._connect_sock, - exceptions, addrinfo, laddr_infos) - for addrinfo in infos), - happy_eyeballs_delay, loop=self) - - if sock is None: - exceptions = [exc for sub in exceptions for exc in sub] - if len(exceptions) == 1: - raise exceptions[0] - else: - # If they all have the same str(), raise one. - model = str(exceptions[0]) - if all(str(exc) == model for exc in exceptions): - raise exceptions[0] - # Raise a combined exception so the user can see all - # the various error messages. - raise OSError('Multiple exceptions: {}'.format( - ', '.join(str(exc) for exc in exceptions))) - - else: - if sock is None: - raise ValueError( - 'host and port was not specified and no sock specified') - if sock.type != socket.SOCK_STREAM: - # We allow AF_INET, AF_INET6, AF_UNIX as long as they - # are SOCK_STREAM. - # We support passing AF_UNIX sockets even though we have - # a dedicated API for that: create_unix_connection. - # Disallowing AF_UNIX in this method, breaks backwards - # compatibility. - raise ValueError( - f'A Stream Socket was expected, got {sock!r}') - - transport, protocol = await self._create_connection_transport( - sock, protocol_factory, ssl, server_hostname, - ssl_handshake_timeout=ssl_handshake_timeout) - if self._debug: - # Get the socket from the transport because SSL transport closes - # the old socket and creates a new SSL socket - sock = transport.get_extra_info('socket') - logger.debug("%r connected to %s:%r: (%r, %r)", - sock, host, port, transport, protocol) - return transport, protocol - - async def _create_connection_transport( - self, sock, protocol_factory, ssl, - server_hostname, server_side=False, - ssl_handshake_timeout=None): - - sock.setblocking(False) - - protocol = protocol_factory() - waiter = self.create_future() - if ssl: - sslcontext = None if isinstance(ssl, bool) else ssl - transport = self._make_ssl_transport( - sock, protocol, sslcontext, waiter, - server_side=server_side, server_hostname=server_hostname, - ssl_handshake_timeout=ssl_handshake_timeout) - else: - transport = self._make_socket_transport(sock, protocol, waiter) - - try: - await waiter - except: - transport.close() - raise - - return transport, protocol - - async def sendfile(self, transport, file, offset=0, count=None, - *, fallback=True): - """Send a file to transport. - - Return the total number of bytes which were sent. - - The method uses high-performance os.sendfile if available. - - file must be a regular file object opened in binary mode. - - offset tells from where to start reading the file. If specified, - count is the total number of bytes to transmit as opposed to - sending the file until EOF is reached. File position is updated on - return or also in case of error in which case file.tell() - can be used to figure out the number of bytes - which were sent. - - fallback set to True makes asyncio to manually read and send - the file when the platform does not support the sendfile syscall - (e.g. Windows or SSL socket on Unix). - - Raise SendfileNotAvailableError if the system does not support - sendfile syscall and fallback is False. - """ - if transport.is_closing(): - raise RuntimeError("Transport is closing") - mode = getattr(transport, '_sendfile_compatible', - constants._SendfileMode.UNSUPPORTED) - if mode is constants._SendfileMode.UNSUPPORTED: - raise RuntimeError( - f"sendfile is not supported for transport {transport!r}") - if mode is constants._SendfileMode.TRY_NATIVE: - try: - return await self._sendfile_native(transport, file, - offset, count) - except exceptions.SendfileNotAvailableError as exc: - if not fallback: - raise - - if not fallback: - raise RuntimeError( - f"fallback is disabled and native sendfile is not " - f"supported for transport {transport!r}") - - return await self._sendfile_fallback(transport, file, - offset, count) - - async def _sendfile_native(self, transp, file, offset, count): - raise exceptions.SendfileNotAvailableError( - "sendfile syscall is not supported") - - async def _sendfile_fallback(self, transp, file, offset, count): - if offset: - file.seek(offset) - blocksize = min(count, 16384) if count else 16384 - buf = bytearray(blocksize) - total_sent = 0 - proto = _SendfileFallbackProtocol(transp) - try: - while True: - if count: - blocksize = min(count - total_sent, blocksize) - if blocksize <= 0: - return total_sent - view = memoryview(buf)[:blocksize] - read = await self.run_in_executor(None, file.readinto, view) - if not read: - return total_sent # EOF - await proto.drain() - transp.write(view[:read]) - total_sent += read - finally: - if total_sent > 0 and hasattr(file, 'seek'): - file.seek(offset + total_sent) - await proto.restore() - - async def start_tls(self, transport, protocol, sslcontext, *, - server_side=False, - server_hostname=None, - ssl_handshake_timeout=None): - """Upgrade transport to TLS. - - Return a new transport that *protocol* should start using - immediately. - """ - if ssl is None: - raise RuntimeError('Python ssl module is not available') - - if not isinstance(sslcontext, ssl.SSLContext): - raise TypeError( - f'sslcontext is expected to be an instance of ssl.SSLContext, ' - f'got {sslcontext!r}') - - if not getattr(transport, '_start_tls_compatible', False): - raise TypeError( - f'transport {transport!r} is not supported by start_tls()') - - waiter = self.create_future() - ssl_protocol = sslproto.SSLProtocol( - self, protocol, sslcontext, waiter, - server_side, server_hostname, - ssl_handshake_timeout=ssl_handshake_timeout, - call_connection_made=False) - - # Pause early so that "ssl_protocol.data_received()" doesn't - # have a chance to get called before "ssl_protocol.connection_made()". - transport.pause_reading() - - transport.set_protocol(ssl_protocol) - conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) - resume_cb = self.call_soon(transport.resume_reading) - - try: - await waiter - except BaseException: - transport.close() - conmade_cb.cancel() - resume_cb.cancel() - raise - - return ssl_protocol._app_transport - - async def create_datagram_endpoint(self, protocol_factory, - local_addr=None, remote_addr=None, *, - family=0, proto=0, flags=0, - reuse_address=_unset, reuse_port=None, - allow_broadcast=None, sock=None): - """Create datagram connection.""" - if sock is not None: - if sock.type != socket.SOCK_DGRAM: - raise ValueError( - f'A UDP Socket was expected, got {sock!r}') - if (local_addr or remote_addr or - family or proto or flags or - reuse_port or allow_broadcast): - # show the problematic kwargs in exception msg - opts = dict(local_addr=local_addr, remote_addr=remote_addr, - family=family, proto=proto, flags=flags, - reuse_address=reuse_address, reuse_port=reuse_port, - allow_broadcast=allow_broadcast) - problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) - raise ValueError( - f'socket modifier keyword arguments can not be used ' - f'when sock is specified. ({problems})') - sock.setblocking(False) - r_addr = None - else: - if not (local_addr or remote_addr): - if family == 0: - raise ValueError('unexpected address family') - addr_pairs_info = (((family, proto), (None, None)),) - elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: - for addr in (local_addr, remote_addr): - if addr is not None and not isinstance(addr, str): - raise TypeError('string is expected') - - if local_addr and local_addr[0] not in (0, '\x00'): - try: - if stat.S_ISSOCK(os.stat(local_addr).st_mode): - os.remove(local_addr) - except FileNotFoundError: - pass - except OSError as err: - # Directory may have permissions only to create socket. - logger.error('Unable to check or remove stale UNIX ' - 'socket %r: %r', - local_addr, err) - - addr_pairs_info = (((family, proto), - (local_addr, remote_addr)), ) - else: - # join address by (family, protocol) - addr_infos = {} # Using order preserving dict - for idx, addr in ((0, local_addr), (1, remote_addr)): - if addr is not None: - assert isinstance(addr, tuple) and len(addr) == 2, ( - '2-tuple is expected') - - infos = await self._ensure_resolved( - addr, family=family, type=socket.SOCK_DGRAM, - proto=proto, flags=flags, loop=self) - if not infos: - raise OSError('getaddrinfo() returned empty list') - - for fam, _, pro, _, address in infos: - key = (fam, pro) - if key not in addr_infos: - addr_infos[key] = [None, None] - addr_infos[key][idx] = address - - # each addr has to have info for each (family, proto) pair - addr_pairs_info = [ - (key, addr_pair) for key, addr_pair in addr_infos.items() - if not ((local_addr and addr_pair[0] is None) or - (remote_addr and addr_pair[1] is None))] - - if not addr_pairs_info: - raise ValueError('can not get address information') - - exceptions = [] - - # bpo-37228 - if reuse_address is not _unset: - if reuse_address: - raise ValueError("Passing `reuse_address=True` is no " - "longer supported, as the usage of " - "SO_REUSEPORT in UDP poses a significant " - "security concern.") - else: - warnings.warn("The *reuse_address* parameter has been " - "deprecated as of 3.5.10 and is scheduled " - "for removal in 3.11.", DeprecationWarning, - stacklevel=2) - - for ((family, proto), - (local_address, remote_address)) in addr_pairs_info: - sock = None - r_addr = None - try: - sock = socket.socket( - family=family, type=socket.SOCK_DGRAM, proto=proto) - if reuse_port: - _set_reuseport(sock) - if allow_broadcast: - sock.setsockopt( - socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - sock.setblocking(False) - - if local_addr: - sock.bind(local_address) - if remote_addr: - if not allow_broadcast: - await self.sock_connect(sock, remote_address) - r_addr = remote_address - except OSError as exc: - if sock is not None: - sock.close() - exceptions.append(exc) - except: - if sock is not None: - sock.close() - raise - else: - break - else: - raise exceptions[0] - - protocol = protocol_factory() - waiter = self.create_future() - transport = self._make_datagram_transport( - sock, protocol, r_addr, waiter) - if self._debug: - if local_addr: - logger.info("Datagram endpoint local_addr=%r remote_addr=%r " - "created: (%r, %r)", - local_addr, remote_addr, transport, protocol) - else: - logger.debug("Datagram endpoint remote_addr=%r created: " - "(%r, %r)", - remote_addr, transport, protocol) - - try: - await waiter - except: - transport.close() - raise - - return transport, protocol - - async def _ensure_resolved(self, address, *, - family=0, type=socket.SOCK_STREAM, - proto=0, flags=0, loop): - host, port = address[:2] - info = _ipaddr_info(host, port, family, type, proto, *address[2:]) - if info is not None: - # "host" is already a resolved IP. - return [info] - else: - return await loop.getaddrinfo(host, port, family=family, type=type, - proto=proto, flags=flags) - - async def _create_server_getaddrinfo(self, host, port, family, flags): - infos = await self._ensure_resolved((host, port), family=family, - type=socket.SOCK_STREAM, - flags=flags, loop=self) - if not infos: - raise OSError(f'getaddrinfo({host!r}) returned empty list') - return infos - - async def create_server( - self, protocol_factory, host=None, port=None, - *, - family=socket.AF_UNSPEC, - flags=socket.AI_PASSIVE, - sock=None, - backlog=100, - ssl=None, - reuse_address=None, - reuse_port=None, - ssl_handshake_timeout=None, - start_serving=True): - """Create a TCP server. - - The host parameter can be a string, in that case the TCP server is - bound to host and port. - - The host parameter can also be a sequence of strings and in that case - the TCP server is bound to all hosts of the sequence. If a host - appears multiple times (possibly indirectly e.g. when hostnames - resolve to the same IP address), the server is only bound once to that - host. - - Return a Server object which can be used to stop the service. - - This method is a coroutine. - """ - if isinstance(ssl, bool): - raise TypeError('ssl argument must be an SSLContext or None') - - if ssl_handshake_timeout is not None and ssl is None: - raise ValueError( - 'ssl_handshake_timeout is only meaningful with ssl') - - if host is not None or port is not None: - if sock is not None: - raise ValueError( - 'host/port and sock can not be specified at the same time') - - if reuse_address is None: - reuse_address = os.name == 'posix' and sys.platform != 'cygwin' - sockets = [] - if host == '': - hosts = [None] - elif (isinstance(host, str) or - not isinstance(host, collections.abc.Iterable)): - hosts = [host] - else: - hosts = host - - fs = [self._create_server_getaddrinfo(host, port, family=family, - flags=flags) - for host in hosts] - infos = await tasks.gather(*fs, loop=self) - infos = set(itertools.chain.from_iterable(infos)) - - completed = False - try: - for res in infos: - af, socktype, proto, canonname, sa = res - try: - sock = socket.socket(af, socktype, proto) - except socket.error: - # Assume it's a bad family/type/protocol combination. - if self._debug: - logger.warning('create_server() failed to create ' - 'socket.socket(%r, %r, %r)', - af, socktype, proto, exc_info=True) - continue - sockets.append(sock) - if reuse_address: - sock.setsockopt( - socket.SOL_SOCKET, socket.SO_REUSEADDR, True) - if reuse_port: - _set_reuseport(sock) - # Disable IPv4/IPv6 dual stack support (enabled by - # default on Linux) which makes a single socket - # listen on both address families. - if (_HAS_IPv6 and - af == socket.AF_INET6 and - hasattr(socket, 'IPPROTO_IPV6')): - sock.setsockopt(socket.IPPROTO_IPV6, - socket.IPV6_V6ONLY, - True) - try: - sock.bind(sa) - except OSError as err: - raise OSError(err.errno, 'error while attempting ' - 'to bind on address %r: %s' - % (sa, err.strerror.lower())) from None - completed = True - finally: - if not completed: - for sock in sockets: - sock.close() - else: - if sock is None: - raise ValueError('Neither host/port nor sock were specified') - if sock.type != socket.SOCK_STREAM: - raise ValueError(f'A Stream Socket was expected, got {sock!r}') - sockets = [sock] - - for sock in sockets: - sock.setblocking(False) - - server = Server(self, sockets, protocol_factory, - ssl, backlog, ssl_handshake_timeout) - if start_serving: - server._start_serving() - # Skip one loop iteration so that all 'loop.add_reader' - # go through. - await tasks.sleep(0, loop=self) - - if self._debug: - logger.info("%r is serving", server) - return server - - async def connect_accepted_socket( - self, protocol_factory, sock, - *, ssl=None, - ssl_handshake_timeout=None): - """Handle an accepted connection. - - This is used by servers that accept connections outside of - asyncio but that use asyncio to handle connections. - - This method is a coroutine. When completed, the coroutine - returns a (transport, protocol) pair. - """ - if sock.type != socket.SOCK_STREAM: - raise ValueError(f'A Stream Socket was expected, got {sock!r}') - - if ssl_handshake_timeout is not None and not ssl: - raise ValueError( - 'ssl_handshake_timeout is only meaningful with ssl') - - transport, protocol = await self._create_connection_transport( - sock, protocol_factory, ssl, '', server_side=True, - ssl_handshake_timeout=ssl_handshake_timeout) - if self._debug: - # Get the socket from the transport because SSL transport closes - # the old socket and creates a new SSL socket - sock = transport.get_extra_info('socket') - logger.debug("%r handled: (%r, %r)", sock, transport, protocol) - return transport, protocol - - async def connect_read_pipe(self, protocol_factory, pipe): - protocol = protocol_factory() - waiter = self.create_future() - transport = self._make_read_pipe_transport(pipe, protocol, waiter) - - try: - await waiter - except: - transport.close() - raise - - if self._debug: - logger.debug('Read pipe %r connected: (%r, %r)', - pipe.fileno(), transport, protocol) - return transport, protocol - - async def connect_write_pipe(self, protocol_factory, pipe): - protocol = protocol_factory() - waiter = self.create_future() - transport = self._make_write_pipe_transport(pipe, protocol, waiter) - - try: - await waiter - except: - transport.close() - raise - - if self._debug: - logger.debug('Write pipe %r connected: (%r, %r)', - pipe.fileno(), transport, protocol) - return transport, protocol - - def _log_subprocess(self, msg, stdin, stdout, stderr): - info = [msg] - if stdin is not None: - info.append(f'stdin={_format_pipe(stdin)}') - if stdout is not None and stderr == subprocess.STDOUT: - info.append(f'stdout=stderr={_format_pipe(stdout)}') - else: - if stdout is not None: - info.append(f'stdout={_format_pipe(stdout)}') - if stderr is not None: - info.append(f'stderr={_format_pipe(stderr)}') - logger.debug(' '.join(info)) - - async def subprocess_shell(self, protocol_factory, cmd, *, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=False, - shell=True, bufsize=0, - encoding=None, errors=None, text=None, - **kwargs): - if not isinstance(cmd, (bytes, str)): - raise ValueError("cmd must be a string") - if universal_newlines: - raise ValueError("universal_newlines must be False") - if not shell: - raise ValueError("shell must be True") - if bufsize != 0: - raise ValueError("bufsize must be 0") - if text: - raise ValueError("text must be False") - if encoding is not None: - raise ValueError("encoding must be None") - if errors is not None: - raise ValueError("errors must be None") - - protocol = protocol_factory() - debug_log = None - if self._debug: - # don't log parameters: they may contain sensitive information - # (password) and may be too long - debug_log = 'run shell command %r' % cmd - self._log_subprocess(debug_log, stdin, stdout, stderr) - transport = await self._make_subprocess_transport( - protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) - if self._debug and debug_log is not None: - logger.info('%s: %r', debug_log, transport) - return transport, protocol - - async def subprocess_exec(self, protocol_factory, program, *args, - stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, universal_newlines=False, - shell=False, bufsize=0, - encoding=None, errors=None, text=None, - **kwargs): - if universal_newlines: - raise ValueError("universal_newlines must be False") - if shell: - raise ValueError("shell must be False") - if bufsize != 0: - raise ValueError("bufsize must be 0") - if text: - raise ValueError("text must be False") - if encoding is not None: - raise ValueError("encoding must be None") - if errors is not None: - raise ValueError("errors must be None") - - popen_args = (program,) + args - protocol = protocol_factory() - debug_log = None - if self._debug: - # don't log parameters: they may contain sensitive information - # (password) and may be too long - debug_log = f'execute program {program!r}' - self._log_subprocess(debug_log, stdin, stdout, stderr) - transport = await self._make_subprocess_transport( - protocol, popen_args, False, stdin, stdout, stderr, - bufsize, **kwargs) - if self._debug and debug_log is not None: - logger.info('%s: %r', debug_log, transport) - return transport, protocol - - def get_exception_handler(self): - """Return an exception handler, or None if the default one is in use. - """ - return self._exception_handler - - def set_exception_handler(self, handler): - """Set handler as the new event loop exception handler. - - If handler is None, the default exception handler will - be set. - - If handler is a callable object, it should have a - signature matching '(loop, context)', where 'loop' - will be a reference to the active event loop, 'context' - will be a dict object (see `call_exception_handler()` - documentation for details about context). - """ - if handler is not None and not callable(handler): - raise TypeError(f'A callable object or None is expected, ' - f'got {handler!r}') - self._exception_handler = handler - - def default_exception_handler(self, context): - """Default exception handler. - - This is called when an exception occurs and no exception - handler is set, and can be called by a custom exception - handler that wants to defer to the default behavior. - - This default handler logs the error message and other - context-dependent information. In debug mode, a truncated - stack trace is also appended showing where the given object - (e.g. a handle or future or task) was created, if any. - - The context parameter has the same meaning as in - `call_exception_handler()`. - """ - message = context.get('message') - if not message: - message = 'Unhandled exception in event loop' - - exception = context.get('exception') - if exception is not None: - exc_info = (type(exception), exception, exception.__traceback__) - else: - exc_info = False - - if ('source_traceback' not in context and - self._current_handle is not None and - self._current_handle._source_traceback): - context['handle_traceback'] = \ - self._current_handle._source_traceback - - log_lines = [message] - for key in sorted(context): - if key in {'message', 'exception'}: - continue - value = context[key] - if key == 'source_traceback': - tb = ''.join(traceback.format_list(value)) - value = 'Object created at (most recent call last):\n' - value += tb.rstrip() - elif key == 'handle_traceback': - tb = ''.join(traceback.format_list(value)) - value = 'Handle created at (most recent call last):\n' - value += tb.rstrip() - else: - value = repr(value) - log_lines.append(f'{key}: {value}') - - logger.error('\n'.join(log_lines), exc_info=exc_info) - - def call_exception_handler(self, context): - """Call the current event loop's exception handler. - - The context argument is a dict containing the following keys: - - - 'message': Error message; - - 'exception' (optional): Exception object; - - 'future' (optional): Future instance; - - 'task' (optional): Task instance; - - 'handle' (optional): Handle instance; - - 'protocol' (optional): Protocol instance; - - 'transport' (optional): Transport instance; - - 'socket' (optional): Socket instance; - - 'asyncgen' (optional): Asynchronous generator that caused - the exception. - - New keys maybe introduced in the future. - - Note: do not overload this method in an event loop subclass. - For custom exception handling, use the - `set_exception_handler()` method. - """ - if self._exception_handler is None: - try: - self.default_exception_handler(context) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException: - # Second protection layer for unexpected errors - # in the default implementation, as well as for subclassed - # event loops with overloaded "default_exception_handler". - logger.error('Exception in default exception handler', - exc_info=True) - else: - try: - self._exception_handler(self, context) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: - # Exception in the user set custom exception handler. - try: - # Let's try default handler. - self.default_exception_handler({ - 'message': 'Unhandled error in exception handler', - 'exception': exc, - 'context': context, - }) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException: - # Guard 'default_exception_handler' in case it is - # overloaded. - logger.error('Exception in default exception handler ' - 'while handling an unexpected error ' - 'in custom exception handler', - exc_info=True) - - def _add_callback(self, handle): - """Add a Handle to _scheduled (TimerHandle) or _ready.""" - assert isinstance(handle, events.Handle), 'A Handle is required here' - if handle._cancelled: - return - assert not isinstance(handle, events.TimerHandle) - self._ready.append(handle) - - def _add_callback_signalsafe(self, handle): - """Like _add_callback() but called from a signal handler.""" - self._add_callback(handle) - self._write_to_self() - - def _timer_handle_cancelled(self, handle): - """Notification that a TimerHandle has been cancelled.""" - if handle._scheduled: - self._timer_cancelled_count += 1 - - def _run_once(self): - """Run one full iteration of the event loop. - - This calls all currently ready callbacks, polls for I/O, - schedules the resulting callbacks, and finally schedules - 'call_later' callbacks. - """ - - sched_count = len(self._scheduled) - if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and - self._timer_cancelled_count / sched_count > - _MIN_CANCELLED_TIMER_HANDLES_FRACTION): - # Remove delayed calls that were cancelled if their number - # is too high - new_scheduled = [] - for handle in self._scheduled: - if handle._cancelled: - handle._scheduled = False - else: - new_scheduled.append(handle) - - heapq.heapify(new_scheduled) - self._scheduled = new_scheduled - self._timer_cancelled_count = 0 - else: - # Remove delayed calls that were cancelled from head of queue. - while self._scheduled and self._scheduled[0]._cancelled: - self._timer_cancelled_count -= 1 - handle = heapq.heappop(self._scheduled) - handle._scheduled = False - - timeout = None - if self._ready or self._stopping: - timeout = 0 - elif self._scheduled: - # Compute the desired timeout. - when = self._scheduled[0]._when - timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) - - event_list = self._selector.select(timeout) - self._process_events(event_list) - - # Handle 'later' callbacks that are ready. - end_time = self.time() + self._clock_resolution - while self._scheduled: - handle = self._scheduled[0] - if handle._when >= end_time: - break - handle = heapq.heappop(self._scheduled) - handle._scheduled = False - self._ready.append(handle) - - # This is the only place where callbacks are actually *called*. - # All other places just add them to ready. - # Note: We run all currently scheduled callbacks, but not any - # callbacks scheduled by callbacks run this time around -- - # they will be run the next time (after another I/O poll). - # Use an idiom that is thread-safe without using locks. - ntodo = len(self._ready) - for i in range(ntodo): - handle = self._ready.popleft() - if handle._cancelled: - continue - if self._debug: - try: - self._current_handle = handle - t0 = self.time() - handle._run() - dt = self.time() - t0 - if dt >= self.slow_callback_duration: - logger.warning('Executing %s took %.3f seconds', - _format_handle(handle), dt) - finally: - self._current_handle = None - else: - handle._run() - handle = None # Needed to break cycles when an exception occurs. - - def _set_coroutine_origin_tracking(self, enabled): - if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): - return - - if enabled: - self._coroutine_origin_tracking_saved_depth = ( - sys.get_coroutine_origin_tracking_depth()) - sys.set_coroutine_origin_tracking_depth( - constants.DEBUG_STACK_DEPTH) - else: - sys.set_coroutine_origin_tracking_depth( - self._coroutine_origin_tracking_saved_depth) - - self._coroutine_origin_tracking_enabled = enabled - - def get_debug(self): - return self._debug - - def set_debug(self, enabled): - self._debug = enabled - - if self.is_running(): - self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled) |