diff options
Diffstat (limited to 'Lib/asyncio/events.py')
-rw-r--r-- | Lib/asyncio/events.py | 395 |
1 files changed, 395 insertions, 0 deletions
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py new file mode 100644 index 0000000..9724615 --- /dev/null +++ b/Lib/asyncio/events.py @@ -0,0 +1,395 @@ +"""Event loop and event loop policy.""" + +__all__ = ['AbstractEventLoopPolicy', 'DefaultEventLoopPolicy', + 'AbstractEventLoop', 'AbstractServer', + 'Handle', 'TimerHandle', + 'get_event_loop_policy', 'set_event_loop_policy', + 'get_event_loop', 'set_event_loop', 'new_event_loop', + ] + +import subprocess +import sys +import threading +import socket + +from .log import asyncio_log + + +class Handle: + """Object returned by callback registration methods.""" + + def __init__(self, callback, args): + self._callback = callback + self._args = args + self._cancelled = False + + def __repr__(self): + res = 'Handle({}, {})'.format(self._callback, self._args) + if self._cancelled: + res += '<cancelled>' + return res + + def cancel(self): + self._cancelled = True + + def _run(self): + try: + self._callback(*self._args) + except Exception: + asyncio_log.exception('Exception in callback %s %r', + self._callback, self._args) + self = None # Needed to break cycles when an exception occurs. + + +def make_handle(callback, args): + # TODO: Inline this? + assert not isinstance(callback, Handle), 'A Handle is not a callback' + return Handle(callback, args) + + +class TimerHandle(Handle): + """Object returned by timed callback registration methods.""" + + def __init__(self, when, callback, args): + assert when is not None + super().__init__(callback, args) + + self._when = when + + def __repr__(self): + res = 'TimerHandle({}, {}, {})'.format(self._when, + self._callback, + self._args) + if self._cancelled: + res += '<cancelled>' + + return res + + def __hash__(self): + return hash(self._when) + + def __lt__(self, other): + return self._when < other._when + + def __le__(self, other): + if self._when < other._when: + return True + return self.__eq__(other) + + def __gt__(self, other): + return self._when > other._when + + def __ge__(self, other): + if self._when > other._when: + return True + return self.__eq__(other) + + def __eq__(self, other): + if isinstance(other, TimerHandle): + return (self._when == other._when and + self._callback == other._callback and + self._args == other._args and + self._cancelled == other._cancelled) + return NotImplemented + + def __ne__(self, other): + equal = self.__eq__(other) + return NotImplemented if equal is NotImplemented else not equal + + +class AbstractServer: + """Abstract server returned by create_service().""" + + def close(self): + """Stop serving. This leaves existing connections open.""" + return NotImplemented + + def wait_closed(self): + """Coroutine to wait until service is closed.""" + return NotImplemented + + +class AbstractEventLoop: + """Abstract event loop.""" + + # Running and stopping the event loop. + + def run_forever(self): + """Run the event loop until stop() is called.""" + raise NotImplementedError + + def run_until_complete(self, future): + """Run the event loop until a Future is done. + + Return the Future's result, or raise its exception. + """ + raise NotImplementedError + + def stop(self): + """Stop the event loop as soon as reasonable. + + Exactly how soon that is may depend on the implementation, but + no more I/O callbacks should be scheduled. + """ + raise NotImplementedError + + def is_running(self): + """Return whether the event loop is currently running.""" + raise NotImplementedError + + # Methods scheduling callbacks. All these return Handles. + + def call_soon(self, callback, *args): + return self.call_later(0, callback, *args) + + def call_later(self, delay, callback, *args): + raise NotImplementedError + + def call_at(self, when, callback, *args): + raise NotImplementedError + + def time(self): + raise NotImplementedError + + # Methods for interacting with threads. + + def call_soon_threadsafe(self, callback, *args): + raise NotImplementedError + + def run_in_executor(self, executor, callback, *args): + raise NotImplementedError + + def set_default_executor(self, executor): + raise NotImplementedError + + # Network I/O methods returning Futures. + + def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0): + raise NotImplementedError + + def getnameinfo(self, sockaddr, flags=0): + raise NotImplementedError + + def create_connection(self, protocol_factory, host=None, port=None, *, + ssl=None, family=0, proto=0, flags=0, sock=None, + local_addr=None): + raise NotImplementedError + + def create_server(self, protocol_factory, host=None, port=None, *, + family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, + sock=None, backlog=100, ssl=None, reuse_address=None): + """A coroutine which creates a TCP server bound to host and port. + + The return value is a Server object which can be used to stop + the service. + + If host is an empty string or None all interfaces are assumed + and a list of multiple sockets will be returned (most likely + one for IPv4 and another one for IPv6). + + family can be set to either AF_INET or AF_INET6 to force the + socket to use IPv4 or IPv6. If not set it will be determined + from host (defaults to AF_UNSPEC). + + flags is a bitmask for getaddrinfo(). + + sock can optionally be specified in order to use a preexisting + socket object. + + backlog is the maximum number of queued connections passed to + listen() (defaults to 100). + + ssl can be set to an SSLContext to enable SSL over the + accepted connections. + + reuse_address tells the kernel to reuse a local socket in + TIME_WAIT state, without waiting for its natural timeout to + expire. If not specified will automatically be set to True on + UNIX. + """ + raise NotImplementedError + + def create_datagram_endpoint(self, protocol_factory, + local_addr=None, remote_addr=None, *, + family=0, proto=0, flags=0): + raise NotImplementedError + + def connect_read_pipe(self, protocol_factory, pipe): + """Register read pipe in eventloop. + + protocol_factory should instantiate object with Protocol interface. + pipe is file-like object already switched to nonblocking. + Return pair (transport, protocol), where transport support + ReadTransport ABC""" + # The reason to accept file-like object instead of just file descriptor + # is: we need to own pipe and close it at transport finishing + # Can got complicated errors if pass f.fileno(), + # close fd in pipe transport then close f and vise versa. + raise NotImplementedError + + def connect_write_pipe(self, protocol_factory, pipe): + """Register write pipe in eventloop. + + protocol_factory should instantiate object with BaseProtocol interface. + Pipe is file-like object already switched to nonblocking. + Return pair (transport, protocol), where transport support + WriteTransport ABC""" + # The reason to accept file-like object instead of just file descriptor + # is: we need to own pipe and close it at transport finishing + # Can got complicated errors if pass f.fileno(), + # close fd in pipe transport then close f and vise versa. + raise NotImplementedError + + def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + **kwargs): + raise NotImplementedError + + def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + **kwargs): + raise NotImplementedError + + # Ready-based callback registration methods. + # The add_*() methods return None. + # The remove_*() methods return True if something was removed, + # False if there was nothing to delete. + + def add_reader(self, fd, callback, *args): + raise NotImplementedError + + def remove_reader(self, fd): + raise NotImplementedError + + def add_writer(self, fd, callback, *args): + raise NotImplementedError + + def remove_writer(self, fd): + raise NotImplementedError + + # Completion based I/O methods returning Futures. + + def sock_recv(self, sock, nbytes): + raise NotImplementedError + + def sock_sendall(self, sock, data): + raise NotImplementedError + + def sock_connect(self, sock, address): + raise NotImplementedError + + def sock_accept(self, sock): + raise NotImplementedError + + # Signal handling. + + def add_signal_handler(self, sig, callback, *args): + raise NotImplementedError + + def remove_signal_handler(self, sig): + raise NotImplementedError + + +class AbstractEventLoopPolicy: + """Abstract policy for accessing the event loop.""" + + def get_event_loop(self): + """XXX""" + raise NotImplementedError + + def set_event_loop(self, loop): + """XXX""" + raise NotImplementedError + + def new_event_loop(self): + """XXX""" + raise NotImplementedError + + +class DefaultEventLoopPolicy(threading.local, AbstractEventLoopPolicy): + """Default policy implementation for accessing the event loop. + + In this policy, each thread has its own event loop. However, we + only automatically create an event loop by default for the main + thread; other threads by default have no event loop. + + Other policies may have different rules (e.g. a single global + event loop, or automatically creating an event loop per thread, or + using some other notion of context to which an event loop is + associated). + """ + + _loop = None + _set_called = False + + def get_event_loop(self): + """Get the event loop. + + This may be None or an instance of EventLoop. + """ + if (self._loop is None and + not self._set_called and + isinstance(threading.current_thread(), threading._MainThread)): + self._loop = self.new_event_loop() + assert self._loop is not None, \ + ('There is no current event loop in thread %r.' % + threading.current_thread().name) + return self._loop + + def set_event_loop(self, loop): + """Set the event loop.""" + # TODO: The isinstance() test violates the PEP. + self._set_called = True + assert loop is None or isinstance(loop, AbstractEventLoop) + self._loop = loop + + def new_event_loop(self): + """Create a new event loop. + + You must call set_event_loop() to make this the current event + loop. + """ + if sys.platform == 'win32': # pragma: no cover + from . import windows_events + return windows_events.SelectorEventLoop() + else: # pragma: no cover + from . import unix_events + return unix_events.SelectorEventLoop() + + +# Event loop policy. The policy itself is always global, even if the +# policy's rules say that there is an event loop per thread (or other +# notion of context). The default policy is installed by the first +# call to get_event_loop_policy(). +_event_loop_policy = None + + +def get_event_loop_policy(): + """XXX""" + global _event_loop_policy + if _event_loop_policy is None: + _event_loop_policy = DefaultEventLoopPolicy() + return _event_loop_policy + + +def set_event_loop_policy(policy): + """XXX""" + global _event_loop_policy + # TODO: The isinstance() test violates the PEP. + assert policy is None or isinstance(policy, AbstractEventLoopPolicy) + _event_loop_policy = policy + + +def get_event_loop(): + """XXX""" + return get_event_loop_policy().get_event_loop() + + +def set_event_loop(loop): + """XXX""" + get_event_loop_policy().set_event_loop(loop) + + +def new_event_loop(): + """XXX""" + return get_event_loop_policy().new_event_loop() |