diff options
author | Charles-François Natali <cf.natali@gmail.com> | 2013-09-04 17:02:49 (GMT) |
---|---|---|
committer | Charles-François Natali <cf.natali@gmail.com> | 2013-09-04 17:02:49 (GMT) |
commit | 243d8d85debaa319a2be0143003a9e881a0f5646 (patch) | |
tree | 351cdb6690d4c6cc8bdd34ec56c15cdf882e23f6 | |
parent | af722bf9cf7094576303d4a7a2fc38200a12d773 (diff) | |
download | cpython-243d8d85debaa319a2be0143003a9e881a0f5646.zip cpython-243d8d85debaa319a2be0143003a9e881a0f5646.tar.gz cpython-243d8d85debaa319a2be0143003a9e881a0f5646.tar.bz2 |
Issue #16853: Add new selectors module.
-rw-r--r-- | Doc/library/concurrency.rst | 1 | ||||
-rw-r--r-- | Doc/library/selectors.rst | 231 | ||||
-rw-r--r-- | Doc/whatsnew/3.4.rst | 7 | ||||
-rw-r--r-- | Lib/selectors.py | 405 | ||||
-rw-r--r-- | Lib/test/test_selectors.py | 390 | ||||
-rw-r--r-- | Misc/NEWS | 2 |
6 files changed, 1033 insertions, 3 deletions
diff --git a/Doc/library/concurrency.rst b/Doc/library/concurrency.rst index f2a512f..2d69107 100644 --- a/Doc/library/concurrency.rst +++ b/Doc/library/concurrency.rst @@ -21,6 +21,7 @@ multitasking). Here's an overview: sched.rst queue.rst select.rst + selectors.rst The following are support modules for some of the above services: diff --git a/Doc/library/selectors.rst b/Doc/library/selectors.rst new file mode 100644 index 0000000..ece5e7d --- /dev/null +++ b/Doc/library/selectors.rst @@ -0,0 +1,231 @@ +:mod:`selectors` -- High-level I/O multiplexing +=============================================== + +.. module:: selectors + :synopsis: High-level I/O multiplexing. + +.. versionadded:: 3.4 + + +Introduction +------------ + +This module allows high-level and efficient I/O multiplexing, built upon the +:mod:`select` module primitives. Users are encouraged to use this module +instead, unless they want precise control over the OS-level primitives used. + +It defines a :class:`BaseSelector` abstract base class, along with several +concrete implementations (:class:`KqueueSelector`, :class:`EpollSelector`...), +that can be used to wait for I/O readiness notification on multiple file +objects. In the following, "file object" refers to any object with a +:meth:`fileno()` method, or a raw file descriptor. See :term:`file object`. + +:class:`DefaultSelector` is an alias to the most efficient implementation +available on the current platform: this should be the default choice for most +users. + +.. note:: + The type of file objects supported depends on the platform: on Windows, + sockets are supported, but not pipes, whereas on Unix, both are supported + (some other types may be supported as well, such as fifos or special file + devices). + +.. seealso:: + + :mod:`select` + Low-level I/O multiplexing module. + + +Classes +------- + +Classes hierarchy:: + + BaseSelector + +-- SelectSelector + +-- PollSelector + +-- EpollSelector + +-- KqueueSelector + + +In the following, *events* is a bitwise mask indicating which I/O events should +be waited for on a given file object. It can be a combination of the constants +below: + + +-----------------------+-----------------------------------------------+ + | Constant | Meaning | + +=======================+===============================================+ + | :const:`EVENT_READ` | Available for read | + +-----------------------+-----------------------------------------------+ + | :const:`EVENT_WRITE` | Available for write | + +-----------------------+-----------------------------------------------+ + + +.. class:: SelectorKey + + A :class:`SelectorKey` is a :class:`~collections.namedtuple` used to + associate a file object to its underlying file decriptor, selected event + mask and attached data. It is returned by several :class:`BaseSelector` + methods. + + .. attribute:: fileobj + + File object registered. + + .. attribute:: fd + + Underlying file descriptor. + + .. attribute:: events + + Events that must be waited for this file object. + + .. attribute:: data + + Optional opaque data associated to this file object: for example, this + could be used to store per-client session. + + +.. class:: BaseSelector + + A :class:`BaseSelector` is used to wait for I/O event readiness on multiple + file objects. It supports file stream registration, unregistration, and a + method to wait for I/O events on those streams, with an optional timeout. + It's an abstract base class, so cannot be instantiated. Use + :class:`DefaultSelector` instead, or one of :class:`SelectSelector`, + :class:`KqueueSelector` etc. if you want to specifically use an + implementation, and your platform supports it. + :class:`BaseSelector` and its concrete implementations support the + :term:`context manager` protocol. + + .. method:: register(fileobj, events, data=None) + + Register a file object for selection, monitoring it for I/O events. + + *fileobj* is the file object to monitor. + *events* is a bitwise mask of events to monitor. + *data* is an opaque object. + + This returns a new :class:`SelectorKey` instance, or raises a + :exc:`ValueError` in case of invalid event mask or file descriptor, or + :exc:`KeyError` if the file object is already registered. + + .. method:: unregister(fileobj) + + Unregister a file object from selection, removing it from monitoring. A + file object shall be unregistered prior to being closed. + + *fileobj* must be a file object previously registered. + + This returns the associated :class:`SelectorKey` instance, or raises a + :exc:`KeyError` if the file object is not registered. + + .. method:: modify(fileobj, events, data=None) + + Change a registered file object monitored events or attached data. + + This is equivalent to :meth:`BaseSelector.unregister(fileobj)` followed + by :meth:`BaseSelector.register(fileobj, events, data)`, except that it + can be implemented more efficiently. + + This returns a new :class:`SelectorKey` instance, or raises a + :exc:`ValueError` in case of invalid event mask or file descriptor, or + :exc:`KeyError` if the file object is not registered. + + .. method:: select(timeout=None) + + Wait until some registered file objects become ready, or the timeout + expires. + + If ``timeout > 0``, this specifies the maximum wait time, in seconds. + If ``timeout <= 0``, the call won't block, and will report the currently + ready file objects. + If *timeout* is ``None``, the call will block until a monitored file object + becomes ready. + + This returns a list of ``(key, events)`` tuple, one for each ready file + object. + + *key* is the :class:`SelectorKey` instance corresponding to a ready file + object. + *events* is a bitmask of events ready on this file object. + + .. method:: close() + + Close the selector. + + This must be called to make sure that any underlying resource is freed. + The selector shall not be used once it has been closed. + + .. method:: get_key(fileobj) + + Return the key associated to a registered file object. + + This returns the :class:`SelectorKey` instance associated to this file + object, or raises :exc:`KeyError` if the file object is not registered. + + +.. class:: DefaultSelector() + + The default selector class, using the most efficient implementation + available on the current platform. This should be the default choice for + most users. + + +.. class:: SelectSelector() + + :func:`select.select`-based selector. + + +.. class:: PollSelector() + + :func:`select.poll`-based selector. + + +.. class:: EpollSelector() + + :func:`select.epoll`-based selector. + + .. method:: fileno() + + This returns the file descriptor used by the underlying + :func:`select.epoll` object. + + +.. class:: KqueueSelector() + + :func:`select.kqueue`-based selector. + + .. method:: fileno() + + This returns the file descriptor used by the underlying + :func:`select.kqueue` object. + + +Examples of selector usage:: + + >>> import selectors + >>> import socket + >>> + >>> s = selectors.DefaultSelector() + >>> r, w = socket.socketpair() + >>> + >>> s.register(r, selectors.EVENT_READ) + SelectorKey(fileobj=<socket.socket fd=4, family=1, type=1, proto=0>, fd=4, events=1, data=None) + >>> s.register(w, selectors.EVENT_WRITE) + SelectorKey(fileobj=<socket.socket fd=5, family=1, type=1, proto=0>, fd=5, events=2, data=None) + >>> + >>> print(s.select()) + [(SelectorKey(fileobj=<socket.socket fd=5, family=1, type=1, proto=0>, fd=5, events=2, data=None), 2)] + >>> + >>> for key, events in s.select(): + ... if events & selectors.EVENT_WRITE: + ... key.fileobj.send(b'spam') + ... + 4 + >>> for key, events in s.select(): + ... if events & selectors.EVENT_READ: + ... print(key.fileobj.recv(1024)) + ... + b'spam' + >>> s.close() diff --git a/Doc/whatsnew/3.4.rst b/Doc/whatsnew/3.4.rst index 427083a..e15e534 100644 --- a/Doc/whatsnew/3.4.rst +++ b/Doc/whatsnew/3.4.rst @@ -174,10 +174,11 @@ Some smaller changes made to the core Python language are: New Modules =========== -.. module name -.. ----------- +selectors +--------- -* None yet. +The new :mod:`selectors` module allows high-level and efficient I/O +multiplexing, built upon the :mod:`select` module primitives. Improved Modules diff --git a/Lib/selectors.py b/Lib/selectors.py new file mode 100644 index 0000000..fe027f0 --- /dev/null +++ b/Lib/selectors.py @@ -0,0 +1,405 @@ +"""Selectors module. + +This module allows high-level and efficient I/O multiplexing, built upon the +`select` module primitives. +""" + + +from abc import ABCMeta, abstractmethod +from collections import namedtuple +import functools +import select +import sys + + +# generic events, that must be mapped to implementation-specific ones +EVENT_READ = (1 << 0) +EVENT_WRITE = (1 << 1) + + +def _fileobj_to_fd(fileobj): + """Return a file descriptor from a file object. + + Parameters: + fileobj -- file object or file descriptor + + Returns: + corresponding file descriptor + """ + if isinstance(fileobj, int): + fd = fileobj + else: + try: + fd = int(fileobj.fileno()) + except (AttributeError, TypeError, ValueError): + raise ValueError("Invalid file object: " + "{!r}".format(fileobj)) from None + if fd < 0: + raise ValueError("Invalid file descriptor: {}".format(fd)) + return fd + + +SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) +"""Object used to associate a file object to its backing file descriptor, +selected event mask and attached data.""" + + +class BaseSelector(metaclass=ABCMeta): + """Base selector class. + + A selector supports registering file objects to be monitored for specific + I/O events. + + A file object is a file descriptor or any object with a `fileno()` method. + An arbitrary object can be attached to the file object, which can be used + for example to store context information, a callback, etc. + + A selector can use various implementations (select(), poll(), epoll()...) + depending on the platform. The default `Selector` class uses the most + performant implementation on the current platform. + """ + + def __init__(self): + # this maps file descriptors to keys + self._fd_to_key = {} + + def register(self, fileobj, events, data=None): + """Register a file object. + + Parameters: + fileobj -- file object or file descriptor + events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) + data -- attached data + + Returns: + SelectorKey instance + """ + if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): + raise ValueError("Invalid events: {!r}".format(events)) + + key = SelectorKey(fileobj, _fileobj_to_fd(fileobj), events, data) + + if key.fd in self._fd_to_key: + raise KeyError("{!r} (FD {}) is already " + "registered".format(fileobj, key.fd)) + + self._fd_to_key[key.fd] = key + return key + + def unregister(self, fileobj): + """Unregister a file object. + + Parameters: + fileobj -- file object or file descriptor + + Returns: + SelectorKey instance + """ + try: + key = self._fd_to_key.pop(_fileobj_to_fd(fileobj)) + except KeyError: + raise KeyError("{!r} is not registered".format(fileobj)) from None + return key + + def modify(self, fileobj, events, data=None): + """Change a registered file object monitored events or attached data. + + Parameters: + fileobj -- file object or file descriptor + events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) + data -- attached data + + Returns: + SelectorKey instance + """ + # TODO: Subclasses can probably optimize this even further. + try: + key = self._fd_to_key[_fileobj_to_fd(fileobj)] + except KeyError: + raise KeyError("{!r} is not registered".format(fileobj)) from None + if events != key.events or data != key.data: + # TODO: If only the data changed, use a shortcut that only + # updates the data. + self.unregister(fileobj) + return self.register(fileobj, events, data) + else: + return key + + @abstractmethod + def select(self, timeout=None): + """Perform the actual selection, until some monitored file objects are + ready or a timeout expires. + + Parameters: + timeout -- if timeout > 0, this specifies the maximum wait time, in + seconds + if timeout <= 0, the select() call won't block, and will + report the currently ready file objects + if timeout is None, select() will block until a monitored + file object becomes ready + + Returns: + list of (key, events) for ready file objects + `events` is a bitwise mask of EVENT_READ|EVENT_WRITE + """ + raise NotImplementedError() + + def close(self): + """Close the selector. + + This must be called to make sure that any underlying resource is freed. + """ + self._fd_to_key.clear() + + def get_key(self, fileobj): + """Return the key associated to a registered file object. + + Returns: + SelectorKey for this file object + """ + try: + return self._fd_to_key[_fileobj_to_fd(fileobj)] + except KeyError: + raise KeyError("{!r} is not registered".format(fileobj)) from None + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + def _key_from_fd(self, fd): + """Return the key associated to a given file descriptor. + + Parameters: + fd -- file descriptor + + Returns: + corresponding key, or None if not found + """ + try: + return self._fd_to_key[fd] + except KeyError: + return None + + +class SelectSelector(BaseSelector): + """Select-based selector.""" + + def __init__(self): + super().__init__() + self._readers = set() + self._writers = set() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + if events & EVENT_READ: + self._readers.add(key.fd) + if events & EVENT_WRITE: + self._writers.add(key.fd) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._readers.discard(key.fd) + self._writers.discard(key.fd) + return key + + if sys.platform == 'win32': + def _select(self, r, w, _, timeout=None): + r, w, x = select.select(r, w, w, timeout) + return r, w + x, [] + else: + _select = select.select + + def select(self, timeout=None): + timeout = None if timeout is None else max(timeout, 0) + ready = [] + try: + r, w, _ = self._select(self._readers, self._writers, [], timeout) + except InterruptedError: + return ready + r = set(r) + w = set(w) + for fd in r | w: + events = 0 + if fd in r: + events |= EVENT_READ + if fd in w: + events |= EVENT_WRITE + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + +if hasattr(select, 'poll'): + + class PollSelector(BaseSelector): + """Poll-based selector.""" + + def __init__(self): + super().__init__() + self._poll = select.poll() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + poll_events = 0 + if events & EVENT_READ: + poll_events |= select.POLLIN + if events & EVENT_WRITE: + poll_events |= select.POLLOUT + self._poll.register(key.fd, poll_events) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._poll.unregister(key.fd) + return key + + def select(self, timeout=None): + timeout = None if timeout is None else max(int(1000 * timeout), 0) + ready = [] + try: + fd_event_list = self._poll.poll(timeout) + except InterruptedError: + return ready + for fd, event in fd_event_list: + events = 0 + if event & ~select.POLLIN: + events |= EVENT_WRITE + if event & ~select.POLLOUT: + events |= EVENT_READ + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + +if hasattr(select, 'epoll'): + + class EpollSelector(BaseSelector): + """Epoll-based selector.""" + + def __init__(self): + super().__init__() + self._epoll = select.epoll() + + def fileno(self): + return self._epoll.fileno() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + epoll_events = 0 + if events & EVENT_READ: + epoll_events |= select.EPOLLIN + if events & EVENT_WRITE: + epoll_events |= select.EPOLLOUT + self._epoll.register(key.fd, epoll_events) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._epoll.unregister(key.fd) + return key + + def select(self, timeout=None): + timeout = -1 if timeout is None else max(timeout, 0) + max_ev = len(self._fd_to_key) + ready = [] + try: + fd_event_list = self._epoll.poll(timeout, max_ev) + except InterruptedError: + return ready + for fd, event in fd_event_list: + events = 0 + if event & ~select.EPOLLIN: + events |= EVENT_WRITE + if event & ~select.EPOLLOUT: + events |= EVENT_READ + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + def close(self): + super().close() + self._epoll.close() + + +if hasattr(select, 'kqueue'): + + class KqueueSelector(BaseSelector): + """Kqueue-based selector.""" + + def __init__(self): + super().__init__() + self._kqueue = select.kqueue() + + def fileno(self): + return self._kqueue.fileno() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + if events & EVENT_READ: + kev = select.kevent(key.fd, select.KQ_FILTER_READ, + select.KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + if events & EVENT_WRITE: + kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, + select.KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + if key.events & EVENT_READ: + kev = select.kevent(key.fd, select.KQ_FILTER_READ, + select.KQ_EV_DELETE) + self._kqueue.control([kev], 0, 0) + if key.events & EVENT_WRITE: + kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, + select.KQ_EV_DELETE) + self._kqueue.control([kev], 0, 0) + return key + + def select(self, timeout=None): + timeout = None if timeout is None else max(timeout, 0) + max_ev = len(self._fd_to_key) + ready = [] + try: + kev_list = self._kqueue.control(None, max_ev, timeout) + except InterruptedError: + return ready + for kev in kev_list: + fd = kev.ident + flag = kev.filter + events = 0 + if flag == select.KQ_FILTER_READ: + events |= EVENT_READ + if flag == select.KQ_FILTER_WRITE: + events |= EVENT_WRITE + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + def close(self): + super().close() + self._kqueue.close() + + +# Choose the best implementation: roughly, epoll|kqueue > poll > select. +# select() also can't accept a FD > FD_SETSIZE (usually around 1024) +if 'KqueueSelector' in globals(): + DefaultSelector = KqueueSelector +elif 'EpollSelector' in globals(): + DefaultSelector = EpollSelector +elif 'PollSelector' in globals(): + DefaultSelector = PollSelector +else: + DefaultSelector = SelectSelector diff --git a/Lib/test/test_selectors.py b/Lib/test/test_selectors.py new file mode 100644 index 0000000..2657a50 --- /dev/null +++ b/Lib/test/test_selectors.py @@ -0,0 +1,390 @@ +import errno +import random +import selectors +import signal +import socket +from test import support +from time import sleep +import unittest +try: + from time import monotonic as time +except ImportError: + from time import time as time +try: + import resource +except ImportError: + resource = None + + +if hasattr(socket, 'socketpair'): + socketpair = socket.socketpair +else: + def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0): + with socket.socket(family, type, proto) as l: + l.bind((support.HOST, 0)) + l.listen(3) + c = socket.socket(family, type, proto) + try: + c.connect(l.getsockname()) + caddr = c.getsockname() + while True: + a, addr = l.accept() + # check that we've got the correct client + if addr == caddr: + return c, a + a.close() + except OSError: + c.close() + raise + + +def find_ready_matching(ready, flag): + match = [] + for key, events in ready: + if events & flag: + match.append(key.fileobj) + return match + + +class BaseSelectorTestCase(unittest.TestCase): + + def test_register(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = socketpair() + self.addCleanup(rd.close) + self.addCleanup(wr.close) + + key = s.register(rd, selectors.EVENT_READ, "data") + self.assertIsInstance(key, selectors.SelectorKey) + self.assertEqual(key.fileobj, rd) + self.assertEqual(key.fd, rd.fileno()) + self.assertEqual(key.events, selectors.EVENT_READ) + self.assertEqual(key.data, "data") + + # register an unknown event + self.assertRaises(ValueError, s.register, 0, 999999) + + # register an invalid FD + self.assertRaises(ValueError, s.register, -10, selectors.EVENT_READ) + + # register twice + self.assertRaises(KeyError, s.register, rd, selectors.EVENT_READ) + + # register the same FD, but with a different object + self.assertRaises(KeyError, s.register, rd.fileno(), + selectors.EVENT_READ) + + def test_unregister(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = socketpair() + self.addCleanup(rd.close) + self.addCleanup(wr.close) + + s.register(rd, selectors.EVENT_READ) + s.unregister(rd) + + # unregister an unknown file obj + self.assertRaises(KeyError, s.unregister, 999999) + + # unregister twice + self.assertRaises(KeyError, s.unregister, rd) + + def test_modify(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = socketpair() + self.addCleanup(rd.close) + self.addCleanup(wr.close) + + key = s.register(rd, selectors.EVENT_READ) + + # modify events + key2 = s.modify(rd, selectors.EVENT_WRITE) + self.assertNotEqual(key.events, key2.events) + self.assertEqual(key2, s.get_key(rd)) + + s.unregister(rd) + + # modify data + d1 = object() + d2 = object() + + key = s.register(rd, selectors.EVENT_READ, d1) + key2 = s.modify(rd, selectors.EVENT_READ, d2) + self.assertEqual(key.events, key2.events) + self.assertNotEqual(key.data, key2.data) + self.assertEqual(key2, s.get_key(rd)) + self.assertEqual(key2.data, d2) + + # modify unknown file obj + self.assertRaises(KeyError, s.modify, 999999, selectors.EVENT_READ) + + def test_close(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = socketpair() + self.addCleanup(rd.close) + self.addCleanup(wr.close) + + s.register(rd, selectors.EVENT_READ) + s.register(wr, selectors.EVENT_WRITE) + + s.close() + self.assertRaises(KeyError, s.get_key, rd) + self.assertRaises(KeyError, s.get_key, wr) + + def test_get_key(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = socketpair() + self.addCleanup(rd.close) + self.addCleanup(wr.close) + + key = s.register(rd, selectors.EVENT_READ, "data") + self.assertEqual(key, s.get_key(rd)) + + # unknown file obj + self.assertRaises(KeyError, s.get_key, 999999) + + def test_select(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = socketpair() + self.addCleanup(rd.close) + self.addCleanup(wr.close) + + s.register(rd, selectors.EVENT_READ) + wr_key = s.register(wr, selectors.EVENT_WRITE) + + result = s.select() + for key, events in result: + self.assertTrue(isinstance(key, selectors.SelectorKey)) + self.assertTrue(events) + self.assertFalse(events & ~(selectors.EVENT_READ | + selectors.EVENT_WRITE)) + + self.assertEqual([(wr_key, selectors.EVENT_WRITE)], result) + + def test_context_manager(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = socketpair() + self.addCleanup(rd.close) + self.addCleanup(wr.close) + + with s as sel: + sel.register(rd, selectors.EVENT_READ) + sel.register(wr, selectors.EVENT_WRITE) + + self.assertRaises(KeyError, s.get_key, rd) + self.assertRaises(KeyError, s.get_key, wr) + + def test_fileno(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + if hasattr(s, 'fileno'): + fd = s.fileno() + self.assertTrue(isinstance(fd, int)) + self.assertGreaterEqual(fd, 0) + + def test_selector(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + NUM_SOCKETS = 12 + MSG = b" This is a test." + MSG_LEN = len(MSG) + readers = [] + writers = [] + r2w = {} + w2r = {} + + for i in range(NUM_SOCKETS): + rd, wr = socketpair() + self.addCleanup(rd.close) + self.addCleanup(wr.close) + s.register(rd, selectors.EVENT_READ) + s.register(wr, selectors.EVENT_WRITE) + readers.append(rd) + writers.append(wr) + r2w[rd] = wr + w2r[wr] = rd + + bufs = [] + + while writers: + ready = s.select() + ready_writers = find_ready_matching(ready, selectors.EVENT_WRITE) + if not ready_writers: + self.fail("no sockets ready for writing") + wr = random.choice(ready_writers) + wr.send(MSG) + + for i in range(10): + ready = s.select() + ready_readers = find_ready_matching(ready, + selectors.EVENT_READ) + if ready_readers: + break + # there might be a delay between the write to the write end and + # the read end is reported ready + sleep(0.1) + else: + self.fail("no sockets ready for reading") + self.assertEqual([w2r[wr]], ready_readers) + rd = ready_readers[0] + buf = rd.recv(MSG_LEN) + self.assertEqual(len(buf), MSG_LEN) + bufs.append(buf) + s.unregister(r2w[rd]) + s.unregister(rd) + writers.remove(r2w[rd]) + + self.assertEqual(bufs, [MSG] * NUM_SOCKETS) + + def test_timeout(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = socketpair() + self.addCleanup(rd.close) + self.addCleanup(wr.close) + + s.register(wr, selectors.EVENT_WRITE) + t = time() + self.assertEqual(1, len(s.select(0))) + self.assertEqual(1, len(s.select(-1))) + self.assertTrue(time() - t < 0.5) + + s.unregister(wr) + s.register(rd, selectors.EVENT_READ) + t = time() + self.assertFalse(s.select(0)) + self.assertFalse(s.select(-1)) + self.assertTrue(time() - t < 0.5) + + t = time() + self.assertFalse(s.select(1)) + self.assertTrue(0.5 < time() - t < 1.5) + + @unittest.skipUnless(hasattr(signal, "alarm"), + "signal.alarm() required for this test") + def test_interrupted_retry(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = socketpair() + self.addCleanup(rd.close) + self.addCleanup(wr.close) + + orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None) + self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler) + self.addCleanup(signal.alarm, 0) + + signal.alarm(1) + + s.register(rd, selectors.EVENT_READ) + t = time() + self.assertFalse(s.select(2)) + self.assertLess(time() - t, 2.5) + + +class ScalableSelectorMixIn: + + @support.requires_mac_ver(10, 5) + @unittest.skipUnless(resource, "Test needs resource module") + def test_above_fd_setsize(self): + # A scalable implementation should have no problem with more than + # FD_SETSIZE file descriptors. Since we don't know the value, we just + # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling. + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + try: + resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard)) + self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE, + (soft, hard)) + NUM_FDS = hard + except OSError: + NUM_FDS = soft + + # guard for already allocated FDs (stdin, stdout...) + NUM_FDS -= 32 + + s = self.SELECTOR() + self.addCleanup(s.close) + + for i in range(NUM_FDS // 2): + try: + rd, wr = socketpair() + except OSError: + # too many FDs, skip - note that we should only catch EMFILE + # here, but apparently *BSD and Solaris can fail upon connect() + # or bind() with EADDRNOTAVAIL, so let's be safe + self.skipTest("FD limit reached") + + self.addCleanup(rd.close) + self.addCleanup(wr.close) + + try: + s.register(rd, selectors.EVENT_READ) + s.register(wr, selectors.EVENT_WRITE) + except OSError as e: + if e.errno == errno.ENOSPC: + # this can be raised by epoll if we go over + # fs.epoll.max_user_watches sysctl + self.skipTest("FD limit reached") + raise + + self.assertEqual(NUM_FDS // 2, len(s.select())) + + +class DefaultSelectorTestCase(BaseSelectorTestCase): + + SELECTOR = selectors.DefaultSelector + + +class SelectSelectorTestCase(BaseSelectorTestCase): + + SELECTOR = selectors.SelectSelector + + +@unittest.skipUnless(hasattr(selectors, 'PollSelector'), + "Test needs selectors.PollSelector") +class PollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn): + + SELECTOR = getattr(selectors, 'PollSelector', None) + + +@unittest.skipUnless(hasattr(selectors, 'EpollSelector'), + "Test needs selectors.EpollSelector") +class EpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn): + + SELECTOR = getattr(selectors, 'EpollSelector', None) + + +@unittest.skipUnless(hasattr(selectors, 'KqueueSelector'), + "Test needs selectors.KqueueSelector)") +class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn): + + SELECTOR = getattr(selectors, 'KqueueSelector', None) + + +def test_main(): + tests = [DefaultSelectorTestCase, SelectSelectorTestCase, + PollSelectorTestCase, EpollSelectorTestCase, + KqueueSelectorTestCase] + support.run_unittest(*tests) + support.reap_children() + + +if __name__ == "__main__": + test_main() @@ -54,6 +54,8 @@ Core and Builtins Library ------- +- Issue #16853: Add new selectors module. + - Issue #18882: Add threading.main_thread() function. - Issue #18901: The sunau getparams method now returns a namedtuple rather than |