diff options
Diffstat (limited to 'Lib/asyncio/events.py')
-rw-r--r-- | Lib/asyncio/events.py | 796 |
1 files changed, 0 insertions, 796 deletions
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py deleted file mode 100644 index c7343f5..0000000 --- a/Lib/asyncio/events.py +++ /dev/null @@ -1,796 +0,0 @@ -"""Event loop and event loop policy.""" - -__all__ = ( - 'AbstractEventLoopPolicy', - 'AbstractEventLoop', 'AbstractServer', - 'Handle', 'TimerHandle', - 'get_event_loop_policy', 'set_event_loop_policy', - 'get_event_loop', 'set_event_loop', 'new_event_loop', - 'get_child_watcher', 'set_child_watcher', - '_set_running_loop', 'get_running_loop', - '_get_running_loop', -) - -import contextvars -import os -import socket -import subprocess -import sys -import threading - -from . import format_helpers -from . import exceptions - - -class Handle: - """Object returned by callback registration methods.""" - - __slots__ = ('_callback', '_args', '_cancelled', '_loop', - '_source_traceback', '_repr', '__weakref__', - '_context') - - def __init__(self, callback, args, loop, context=None): - if context is None: - context = contextvars.copy_context() - self._context = context - self._loop = loop - self._callback = callback - self._args = args - self._cancelled = False - self._repr = None - if self._loop.get_debug(): - self._source_traceback = format_helpers.extract_stack( - sys._getframe(1)) - else: - self._source_traceback = None - - def _repr_info(self): - info = [self.__class__.__name__] - if self._cancelled: - info.append('cancelled') - if self._callback is not None: - info.append(format_helpers._format_callback_source( - self._callback, self._args)) - if self._source_traceback: - frame = self._source_traceback[-1] - info.append(f'created at {frame[0]}:{frame[1]}') - return info - - def __repr__(self): - if self._repr is not None: - return self._repr - info = self._repr_info() - return '<{}>'.format(' '.join(info)) - - def cancel(self): - if not self._cancelled: - self._cancelled = True - if self._loop.get_debug(): - # Keep a representation in debug mode to keep callback and - # parameters. For example, to log the warning - # "Executing <Handle...> took 2.5 second" - self._repr = repr(self) - self._callback = None - self._args = None - - def cancelled(self): - return self._cancelled - - def _run(self): - try: - self._context.run(self._callback, *self._args) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: - cb = format_helpers._format_callback_source( - self._callback, self._args) - msg = f'Exception in callback {cb}' - context = { - 'message': msg, - 'exception': exc, - 'handle': self, - } - if self._source_traceback: - context['source_traceback'] = self._source_traceback - self._loop.call_exception_handler(context) - self = None # Needed to break cycles when an exception occurs. - - -class TimerHandle(Handle): - """Object returned by timed callback registration methods.""" - - __slots__ = ['_scheduled', '_when'] - - def __init__(self, when, callback, args, loop, context=None): - assert when is not None - super().__init__(callback, args, loop, context) - if self._source_traceback: - del self._source_traceback[-1] - self._when = when - self._scheduled = False - - def _repr_info(self): - info = super()._repr_info() - pos = 2 if self._cancelled else 1 - info.insert(pos, f'when={self._when}') - return info - - def __hash__(self): - return hash(self._when) - - def __lt__(self, other): - if isinstance(other, TimerHandle): - return self._when < other._when - return NotImplemented - - def __le__(self, other): - if isinstance(other, TimerHandle): - return self._when < other._when or self.__eq__(other) - return NotImplemented - - def __gt__(self, other): - if isinstance(other, TimerHandle): - return self._when > other._when - return NotImplemented - - def __ge__(self, other): - if isinstance(other, TimerHandle): - return self._when > other._when or self.__eq__(other) - return NotImplemented - - def __eq__(self, other): - if isinstance(other, TimerHandle): - return (self._when == other._when and - self._callback == other._callback and - self._args == other._args and - self._cancelled == other._cancelled) - return NotImplemented - - def cancel(self): - if not self._cancelled: - self._loop._timer_handle_cancelled(self) - super().cancel() - - def when(self): - """Return a scheduled callback time. - - The time is an absolute timestamp, using the same time - reference as loop.time(). - """ - return self._when - - -class AbstractServer: - """Abstract server returned by create_server().""" - - def close(self): - """Stop serving. This leaves existing connections open.""" - raise NotImplementedError - - def get_loop(self): - """Get the event loop the Server object is attached to.""" - raise NotImplementedError - - def is_serving(self): - """Return True if the server is accepting connections.""" - raise NotImplementedError - - async def start_serving(self): - """Start accepting connections. - - This method is idempotent, so it can be called when - the server is already being serving. - """ - raise NotImplementedError - - async def serve_forever(self): - """Start accepting connections until the coroutine is cancelled. - - The server is closed when the coroutine is cancelled. - """ - raise NotImplementedError - - async def wait_closed(self): - """Coroutine to wait until service is closed.""" - raise NotImplementedError - - async def __aenter__(self): - return self - - async def __aexit__(self, *exc): - self.close() - await self.wait_closed() - - -class AbstractEventLoop: - """Abstract event loop.""" - - # Running and stopping the event loop. - - def run_forever(self): - """Run the event loop until stop() is called.""" - raise NotImplementedError - - def run_until_complete(self, future): - """Run the event loop until a Future is done. - - Return the Future's result, or raise its exception. - """ - raise NotImplementedError - - def stop(self): - """Stop the event loop as soon as reasonable. - - Exactly how soon that is may depend on the implementation, but - no more I/O callbacks should be scheduled. - """ - raise NotImplementedError - - def is_running(self): - """Return whether the event loop is currently running.""" - raise NotImplementedError - - def is_closed(self): - """Returns True if the event loop was closed.""" - raise NotImplementedError - - def close(self): - """Close the loop. - - The loop should not be running. - - This is idempotent and irreversible. - - No other methods should be called after this one. - """ - raise NotImplementedError - - async def shutdown_asyncgens(self): - """Shutdown all active asynchronous generators.""" - raise NotImplementedError - - async def shutdown_default_executor(self): - """Schedule the shutdown of the default executor.""" - raise NotImplementedError - - # Methods scheduling callbacks. All these return Handles. - - def _timer_handle_cancelled(self, handle): - """Notification that a TimerHandle has been cancelled.""" - raise NotImplementedError - - def call_soon(self, callback, *args): - return self.call_later(0, callback, *args) - - def call_later(self, delay, callback, *args): - raise NotImplementedError - - def call_at(self, when, callback, *args): - raise NotImplementedError - - def time(self): - raise NotImplementedError - - def create_future(self): - raise NotImplementedError - - # Method scheduling a coroutine object: create a task. - - def create_task(self, coro, *, name=None): - raise NotImplementedError - - # Methods for interacting with threads. - - def call_soon_threadsafe(self, callback, *args): - raise NotImplementedError - - async def run_in_executor(self, executor, func, *args): - raise NotImplementedError - - def set_default_executor(self, executor): - raise NotImplementedError - - # Network I/O methods returning Futures. - - async def getaddrinfo(self, host, port, *, - family=0, type=0, proto=0, flags=0): - raise NotImplementedError - - async def getnameinfo(self, sockaddr, flags=0): - raise NotImplementedError - - async def create_connection( - self, protocol_factory, host=None, port=None, - *, ssl=None, family=0, proto=0, - flags=0, sock=None, local_addr=None, - server_hostname=None, - ssl_handshake_timeout=None, - happy_eyeballs_delay=None, interleave=None): - raise NotImplementedError - - async def create_server( - self, protocol_factory, host=None, port=None, - *, family=socket.AF_UNSPEC, - flags=socket.AI_PASSIVE, sock=None, backlog=100, - ssl=None, reuse_address=None, reuse_port=None, - ssl_handshake_timeout=None, - start_serving=True): - """A coroutine which creates a TCP server bound to host and port. - - The return value is a Server object which can be used to stop - the service. - - If host is an empty string or None all interfaces are assumed - and a list of multiple sockets will be returned (most likely - one for IPv4 and another one for IPv6). The host parameter can also be - a sequence (e.g. list) of hosts to bind to. - - family can be set to either AF_INET or AF_INET6 to force the - socket to use IPv4 or IPv6. If not set it will be determined - from host (defaults to AF_UNSPEC). - - flags is a bitmask for getaddrinfo(). - - sock can optionally be specified in order to use a preexisting - socket object. - - backlog is the maximum number of queued connections passed to - listen() (defaults to 100). - - ssl can be set to an SSLContext to enable SSL over the - accepted connections. - - reuse_address tells the kernel to reuse a local socket in - TIME_WAIT state, without waiting for its natural timeout to - expire. If not specified will automatically be set to True on - UNIX. - - reuse_port tells the kernel to allow this endpoint to be bound to - the same port as other existing endpoints are bound to, so long as - they all set this flag when being created. This option is not - supported on Windows. - - ssl_handshake_timeout is the time in seconds that an SSL server - will wait for completion of the SSL handshake before aborting the - connection. Default is 60s. - - start_serving set to True (default) causes the created server - to start accepting connections immediately. When set to False, - the user should await Server.start_serving() or Server.serve_forever() - to make the server to start accepting connections. - """ - raise NotImplementedError - - async def sendfile(self, transport, file, offset=0, count=None, - *, fallback=True): - """Send a file through a transport. - - Return an amount of sent bytes. - """ - raise NotImplementedError - - async def start_tls(self, transport, protocol, sslcontext, *, - server_side=False, - server_hostname=None, - ssl_handshake_timeout=None): - """Upgrade a transport to TLS. - - Return a new transport that *protocol* should start using - immediately. - """ - raise NotImplementedError - - async def create_unix_connection( - self, protocol_factory, path=None, *, - ssl=None, sock=None, - server_hostname=None, - ssl_handshake_timeout=None): - raise NotImplementedError - - async def create_unix_server( - self, protocol_factory, path=None, *, - sock=None, backlog=100, ssl=None, - ssl_handshake_timeout=None, - start_serving=True): - """A coroutine which creates a UNIX Domain Socket server. - - The return value is a Server object, which can be used to stop - the service. - - path is a str, representing a file systsem path to bind the - server socket to. - - sock can optionally be specified in order to use a preexisting - socket object. - - backlog is the maximum number of queued connections passed to - listen() (defaults to 100). - - ssl can be set to an SSLContext to enable SSL over the - accepted connections. - - ssl_handshake_timeout is the time in seconds that an SSL server - will wait for the SSL handshake to complete (defaults to 60s). - - start_serving set to True (default) causes the created server - to start accepting connections immediately. When set to False, - the user should await Server.start_serving() or Server.serve_forever() - to make the server to start accepting connections. - """ - raise NotImplementedError - - async def create_datagram_endpoint(self, protocol_factory, - local_addr=None, remote_addr=None, *, - family=0, proto=0, flags=0, - reuse_address=None, reuse_port=None, - allow_broadcast=None, sock=None): - """A coroutine which creates a datagram endpoint. - - This method will try to establish the endpoint in the background. - When successful, the coroutine returns a (transport, protocol) pair. - - protocol_factory must be a callable returning a protocol instance. - - socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on - host (or family if specified), socket type SOCK_DGRAM. - - reuse_address tells the kernel to reuse a local socket in - TIME_WAIT state, without waiting for its natural timeout to - expire. If not specified it will automatically be set to True on - UNIX. - - reuse_port tells the kernel to allow this endpoint to be bound to - the same port as other existing endpoints are bound to, so long as - they all set this flag when being created. This option is not - supported on Windows and some UNIX's. If the - :py:data:`~socket.SO_REUSEPORT` constant is not defined then this - capability is unsupported. - - allow_broadcast tells the kernel to allow this endpoint to send - messages to the broadcast address. - - sock can optionally be specified in order to use a preexisting - socket object. - """ - raise NotImplementedError - - # Pipes and subprocesses. - - async def connect_read_pipe(self, protocol_factory, pipe): - """Register read pipe in event loop. Set the pipe to non-blocking mode. - - protocol_factory should instantiate object with Protocol interface. - pipe is a file-like object. - Return pair (transport, protocol), where transport supports the - ReadTransport interface.""" - # The reason to accept file-like object instead of just file descriptor - # is: we need to own pipe and close it at transport finishing - # Can got complicated errors if pass f.fileno(), - # close fd in pipe transport then close f and vise versa. - raise NotImplementedError - - async def connect_write_pipe(self, protocol_factory, pipe): - """Register write pipe in event loop. - - protocol_factory should instantiate object with BaseProtocol interface. - Pipe is file-like object already switched to nonblocking. - Return pair (transport, protocol), where transport support - WriteTransport interface.""" - # The reason to accept file-like object instead of just file descriptor - # is: we need to own pipe and close it at transport finishing - # Can got complicated errors if pass f.fileno(), - # close fd in pipe transport then close f and vise versa. - raise NotImplementedError - - async def subprocess_shell(self, protocol_factory, cmd, *, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - **kwargs): - raise NotImplementedError - - async def subprocess_exec(self, protocol_factory, *args, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - **kwargs): - raise NotImplementedError - - # Ready-based callback registration methods. - # The add_*() methods return None. - # The remove_*() methods return True if something was removed, - # False if there was nothing to delete. - - def add_reader(self, fd, callback, *args): - raise NotImplementedError - - def remove_reader(self, fd): - raise NotImplementedError - - def add_writer(self, fd, callback, *args): - raise NotImplementedError - - def remove_writer(self, fd): - raise NotImplementedError - - # Completion based I/O methods returning Futures. - - async def sock_recv(self, sock, nbytes): - raise NotImplementedError - - async def sock_recv_into(self, sock, buf): - raise NotImplementedError - - async def sock_sendall(self, sock, data): - raise NotImplementedError - - async def sock_connect(self, sock, address): - raise NotImplementedError - - async def sock_accept(self, sock): - raise NotImplementedError - - async def sock_sendfile(self, sock, file, offset=0, count=None, - *, fallback=None): - raise NotImplementedError - - # Signal handling. - - def add_signal_handler(self, sig, callback, *args): - raise NotImplementedError - - def remove_signal_handler(self, sig): - raise NotImplementedError - - # Task factory. - - def set_task_factory(self, factory): - raise NotImplementedError - - def get_task_factory(self): - raise NotImplementedError - - # Error handlers. - - def get_exception_handler(self): - raise NotImplementedError - - def set_exception_handler(self, handler): - raise NotImplementedError - - def default_exception_handler(self, context): - raise NotImplementedError - - def call_exception_handler(self, context): - raise NotImplementedError - - # Debug flag management. - - def get_debug(self): - raise NotImplementedError - - def set_debug(self, enabled): - raise NotImplementedError - - -class AbstractEventLoopPolicy: - """Abstract policy for accessing the event loop.""" - - def get_event_loop(self): - """Get the event loop for the current context. - - Returns an event loop object implementing the BaseEventLoop interface, - or raises an exception in case no event loop has been set for the - current context and the current policy does not specify to create one. - - It should never return None.""" - raise NotImplementedError - - def set_event_loop(self, loop): - """Set the event loop for the current context to loop.""" - raise NotImplementedError - - def new_event_loop(self): - """Create and return a new event loop object according to this - policy's rules. If there's need to set this loop as the event loop for - the current context, set_event_loop must be called explicitly.""" - raise NotImplementedError - - # Child processes handling (Unix only). - - def get_child_watcher(self): - "Get the watcher for child processes." - raise NotImplementedError - - def set_child_watcher(self, watcher): - """Set the watcher for child processes.""" - raise NotImplementedError - - -class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): - """Default policy implementation for accessing the event loop. - - In this policy, each thread has its own event loop. However, we - only automatically create an event loop by default for the main - thread; other threads by default have no event loop. - - Other policies may have different rules (e.g. a single global - event loop, or automatically creating an event loop per thread, or - using some other notion of context to which an event loop is - associated). - """ - - _loop_factory = None - - class _Local(threading.local): - _loop = None - _set_called = False - - def __init__(self): - self._local = self._Local() - - def get_event_loop(self): - """Get the event loop for the current context. - - Returns an instance of EventLoop or raises an exception. - """ - if (self._local._loop is None and - not self._local._set_called and - threading.current_thread() is threading.main_thread()): - self.set_event_loop(self.new_event_loop()) - - if self._local._loop is None: - raise RuntimeError('There is no current event loop in thread %r.' - % threading.current_thread().name) - - return self._local._loop - - def set_event_loop(self, loop): - """Set the event loop.""" - self._local._set_called = True - assert loop is None or isinstance(loop, AbstractEventLoop) - self._local._loop = loop - - def new_event_loop(self): - """Create a new event loop. - - You must call set_event_loop() to make this the current event - loop. - """ - return self._loop_factory() - - -# Event loop policy. The policy itself is always global, even if the -# policy's rules say that there is an event loop per thread (or other -# notion of context). The default policy is installed by the first -# call to get_event_loop_policy(). -_event_loop_policy = None - -# Lock for protecting the on-the-fly creation of the event loop policy. -_lock = threading.Lock() - - -# A TLS for the running event loop, used by _get_running_loop. -class _RunningLoop(threading.local): - loop_pid = (None, None) - - -_running_loop = _RunningLoop() - - -def get_running_loop(): - """Return the running event loop. Raise a RuntimeError if there is none. - - This function is thread-specific. - """ - # NOTE: this function is implemented in C (see _asynciomodule.c) - loop = _get_running_loop() - if loop is None: - raise RuntimeError('no running event loop') - return loop - - -def _get_running_loop(): - """Return the running event loop or None. - - This is a low-level function intended to be used by event loops. - This function is thread-specific. - """ - # NOTE: this function is implemented in C (see _asynciomodule.c) - running_loop, pid = _running_loop.loop_pid - if running_loop is not None and pid == os.getpid(): - return running_loop - - -def _set_running_loop(loop): - """Set the running event loop. - - This is a low-level function intended to be used by event loops. - This function is thread-specific. - """ - # NOTE: this function is implemented in C (see _asynciomodule.c) - _running_loop.loop_pid = (loop, os.getpid()) - - -def _init_event_loop_policy(): - global _event_loop_policy - with _lock: - if _event_loop_policy is None: # pragma: no branch - from . import DefaultEventLoopPolicy - _event_loop_policy = DefaultEventLoopPolicy() - - -def get_event_loop_policy(): - """Get the current event loop policy.""" - if _event_loop_policy is None: - _init_event_loop_policy() - return _event_loop_policy - - -def set_event_loop_policy(policy): - """Set the current event loop policy. - - If policy is None, the default policy is restored.""" - global _event_loop_policy - assert policy is None or isinstance(policy, AbstractEventLoopPolicy) - _event_loop_policy = policy - - -def get_event_loop(): - """Return an asyncio event loop. - - When called from a coroutine or a callback (e.g. scheduled with call_soon - or similar API), this function will always return the running event loop. - - If there is no running event loop set, the function will return - the result of `get_event_loop_policy().get_event_loop()` call. - """ - # NOTE: this function is implemented in C (see _asynciomodule.c) - current_loop = _get_running_loop() - if current_loop is not None: - return current_loop - return get_event_loop_policy().get_event_loop() - - -def set_event_loop(loop): - """Equivalent to calling get_event_loop_policy().set_event_loop(loop).""" - get_event_loop_policy().set_event_loop(loop) - - -def new_event_loop(): - """Equivalent to calling get_event_loop_policy().new_event_loop().""" - return get_event_loop_policy().new_event_loop() - - -def get_child_watcher(): - """Equivalent to calling get_event_loop_policy().get_child_watcher().""" - return get_event_loop_policy().get_child_watcher() - - -def set_child_watcher(watcher): - """Equivalent to calling - get_event_loop_policy().set_child_watcher(watcher).""" - return get_event_loop_policy().set_child_watcher(watcher) - - -# Alias pure-Python implementations for testing purposes. -_py__get_running_loop = _get_running_loop -_py__set_running_loop = _set_running_loop -_py_get_running_loop = get_running_loop -_py_get_event_loop = get_event_loop - - -try: - # get_event_loop() is one of the most frequently called - # functions in asyncio. Pure Python implementation is - # about 4 times slower than C-accelerated. - from _asyncio import (_get_running_loop, _set_running_loop, - get_running_loop, get_event_loop) -except ImportError: - pass -else: - # Alias C implementations for testing purposes. - _c__get_running_loop = _get_running_loop - _c__set_running_loop = _set_running_loop - _c_get_running_loop = get_running_loop - _c_get_event_loop = get_event_loop |