summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPablo Galindo <Pablogsal@gmail.com>2019-03-16 22:34:24 (GMT)
committerGitHub <noreply@github.com>2019-03-16 22:34:24 (GMT)
commit7c994549dcffd0d9d3bb37475e6374f356e7240e (patch)
tree7b59c744c1900c05e920b1eeca1526d9fa886f87
parent962bdeab191ee64459caa199209331005797ea7a (diff)
downloadcpython-7c994549dcffd0d9d3bb37475e6374f356e7240e.zip
cpython-7c994549dcffd0d9d3bb37475e6374f356e7240e.tar.gz
cpython-7c994549dcffd0d9d3bb37475e6374f356e7240e.tar.bz2
bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool (#11488)
* bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool * Use self-pipe pattern to avoid polling for changes * Refactor some variable names and add comments * Restore timeout and poll * Use reader object only on wait() * Recompute worker sentinels every time * Remove timeout and use change notifier * Refactor some methods to be overloaded by the ThreadPool, document the cache class and fix typos
-rw-r--r--Lib/multiprocessing/pool.py88
-rw-r--r--Misc/NEWS.d/next/Library/2019-01-09-23-43-08.bpo-35493.kEcRGE.rst3
2 files changed, 80 insertions, 11 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 18a56f8..665ca06 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -21,11 +21,13 @@ import threading
import time
import traceback
import warnings
+from queue import Empty
# If threading is available then ThreadPool should be provided. Therefore
# we avoid top-level imports which are liable to fail on some systems.
from . import util
from . import get_context, TimeoutError
+from .connection import wait
#
# Constants representing the state of a pool
@@ -145,6 +147,29 @@ def _helper_reraises_exception(ex):
# Class representing a process pool
#
+class _PoolCache(dict):
+ """
+ Class that implements a cache for the Pool class that will notify
+ the pool management threads every time the cache is emptied. The
+ notification is done by the use of a queue that is provided when
+ instantiating the cache.
+ """
+ def __init__(self, *args, notifier=None, **kwds):
+ self.notifier = notifier
+ super().__init__(*args, **kwds)
+
+ def __delitem__(self, item):
+ super().__delitem__(item)
+
+ # Notify that the cache is empty. This is important because the
+ # pool keeps maintaining workers until the cache gets drained. This
+ # eliminates a race condition in which a task is finished after the
+ # the pool's _handle_workers method has enter another iteration of the
+ # loop. In this situation, the only event that can wake up the pool
+ # is the cache to be emptied (no more tasks available).
+ if not self:
+ self.notifier.put(None)
+
class Pool(object):
'''
Class which supports an async version of applying functions to arguments.
@@ -165,7 +190,11 @@ class Pool(object):
self._ctx = context or get_context()
self._setup_queues()
self._taskqueue = queue.SimpleQueue()
- self._cache = {}
+ # The _change_notifier queue exist to wake up self._handle_workers()
+ # when the cache (self._cache) is empty or when there is a change in
+ # the _state variable of the thread that runs _handle_workers.
+ self._change_notifier = self._ctx.SimpleQueue()
+ self._cache = _PoolCache(notifier=self._change_notifier)
self._maxtasksperchild = maxtasksperchild
self._initializer = initializer
self._initargs = initargs
@@ -189,12 +218,14 @@ class Pool(object):
p.join()
raise
+ sentinels = self._get_sentinels()
+
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
args=(self._cache, self._taskqueue, self._ctx, self.Process,
self._processes, self._pool, self._inqueue, self._outqueue,
self._initializer, self._initargs, self._maxtasksperchild,
- self._wrap_exception)
+ self._wrap_exception, sentinels, self._change_notifier)
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
@@ -221,7 +252,7 @@ class Pool(object):
self._terminate = util.Finalize(
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
- self._worker_handler, self._task_handler,
+ self._change_notifier, self._worker_handler, self._task_handler,
self._result_handler, self._cache),
exitpriority=15
)
@@ -233,6 +264,8 @@ class Pool(object):
if self._state == RUN:
_warn(f"unclosed running multiprocessing pool {self!r}",
ResourceWarning, source=self)
+ if getattr(self, '_change_notifier', None) is not None:
+ self._change_notifier.put(None)
def __repr__(self):
cls = self.__class__
@@ -240,6 +273,16 @@ class Pool(object):
f'state={self._state} '
f'pool_size={len(self._pool)}>')
+ def _get_sentinels(self):
+ task_queue_sentinels = [self._outqueue._reader]
+ self_notifier_sentinels = [self._change_notifier._reader]
+ return [*task_queue_sentinels, *self_notifier_sentinels]
+
+ @staticmethod
+ def _get_worker_sentinels(workers):
+ return [worker.sentinel for worker in
+ workers if hasattr(worker, "sentinel")]
+
@staticmethod
def _join_exited_workers(pool):
"""Cleanup after any worker processes which have exited due to reaching
@@ -452,18 +495,28 @@ class Pool(object):
return result
@staticmethod
- def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
- inqueue, outqueue, initializer, initargs,
- maxtasksperchild, wrap_exception):
+ def _wait_for_updates(sentinels, change_notifier, timeout=None):
+ wait(sentinels, timeout=timeout)
+ while not change_notifier.empty():
+ change_notifier.get()
+
+ @classmethod
+ def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
+ pool, inqueue, outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception, sentinels,
+ change_notifier):
thread = threading.current_thread()
# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
while thread._state == RUN or (cache and thread._state != TERMINATE):
- Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
- outqueue, initializer, initargs,
- maxtasksperchild, wrap_exception)
- time.sleep(0.1)
+ cls._maintain_pool(ctx, Process, processes, pool, inqueue,
+ outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception)
+
+ current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]
+
+ cls._wait_for_updates(current_sentinels, change_notifier)
# send sentinel to stop workers
taskqueue.put(None)
util.debug('worker handler exiting')
@@ -593,11 +646,13 @@ class Pool(object):
if self._state == RUN:
self._state = CLOSE
self._worker_handler._state = CLOSE
+ self._change_notifier.put(None)
def terminate(self):
util.debug('terminating pool')
self._state = TERMINATE
self._worker_handler._state = TERMINATE
+ self._change_notifier.put(None)
self._terminate()
def join(self):
@@ -622,7 +677,7 @@ class Pool(object):
time.sleep(0)
@classmethod
- def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
+ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
worker_handler, task_handler, result_handler, cache):
# this is guaranteed to only be called once
util.debug('finalizing pool')
@@ -638,6 +693,7 @@ class Pool(object):
"Cannot have cache with result_hander not alive")
result_handler._state = TERMINATE
+ change_notifier.put(None)
outqueue.put(None) # sentinel
# We must wait for the worker handler to exit before terminating
@@ -871,6 +927,13 @@ class ThreadPool(Pool):
self._quick_put = self._inqueue.put
self._quick_get = self._outqueue.get
+ def _get_sentinels(self):
+ return [self._change_notifier._reader]
+
+ @staticmethod
+ def _get_worker_sentinels(workers):
+ return []
+
@staticmethod
def _help_stuff_finish(inqueue, task_handler, size):
# drain inqueue, and put sentinels at its head to make workers finish
@@ -881,3 +944,6 @@ class ThreadPool(Pool):
pass
for i in range(size):
inqueue.put(None)
+
+ def _wait_for_updates(self, sentinels, change_notifier, timeout):
+ time.sleep(timeout)
diff --git a/Misc/NEWS.d/next/Library/2019-01-09-23-43-08.bpo-35493.kEcRGE.rst b/Misc/NEWS.d/next/Library/2019-01-09-23-43-08.bpo-35493.kEcRGE.rst
new file mode 100644
index 0000000..fa408c8
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2019-01-09-23-43-08.bpo-35493.kEcRGE.rst
@@ -0,0 +1,3 @@
+Use :func:`multiprocessing.connection.wait` instead of polling each 0.2
+seconds for worker updates in :class:`multiprocessing.Pool`. Patch by Pablo
+Galindo.
'>531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887
"""Event loop using a selector and related classes.

A selector is a "notify-when-ready" multiplexer.  For a subclass which
also includes support for signal handling, see the unix_events sub-module.
"""

__all__ = ['BaseSelectorEventLoop']

import collections
import errno
import socket
try:
    import ssl
except ImportError:  # pragma: no cover
    ssl = None

from . import base_events
from . import constants
from . import events
from . import futures
from . import selectors
from . import transports
from .log import logger


class BaseSelectorEventLoop(base_events.BaseEventLoop):
    """Selector event loop.

    See events.EventLoop for API specification.
    """

    def __init__(self, selector=None):
        super().__init__()

        if selector is None:
            selector = selectors.DefaultSelector()
        logger.debug('Using selector: %s', selector.__class__.__name__)
        self._selector = selector
        self._make_self_pipe()

    def _make_socket_transport(self, sock, protocol, waiter=None, *,
                               extra=None, server=None):
        return _SelectorSocketTransport(self, sock, protocol, waiter,
                                        extra, server)

    def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
                            server_side=False, server_hostname=None,
                            extra=None, server=None):
        return _SelectorSslTransport(
            self, rawsock, protocol, sslcontext, waiter,
            server_side, server_hostname, extra, server)

    def _make_datagram_transport(self, sock, protocol,
                                 address=None, extra=None):
        return _SelectorDatagramTransport(self, sock, protocol, address, extra)

    def close(self):
        if self._selector is not None:
            self._close_self_pipe()
            self._selector.close()
            self._selector = None
            super().close()

    def _socketpair(self):
        raise NotImplementedError

    def _close_self_pipe(self):
        self.remove_reader(self._ssock.fileno())
        self._ssock.close()
        self._ssock = None
        self._csock.close()
        self._csock = None
        self._internal_fds -= 1

    def _make_self_pipe(self):
        # A self-socket, really. :-)
        self._ssock, self._csock = self._socketpair()
        self._ssock.setblocking(False)
        self._csock.setblocking(False)
        self._internal_fds += 1
        self.add_reader(self._ssock.fileno(), self._read_from_self)

    def _read_from_self(self):
        try:
            self._ssock.recv(1)
        except (BlockingIOError, InterruptedError):
            pass

    def _write_to_self(self):
        try:
            self._csock.send(b'x')
        except (BlockingIOError, InterruptedError):
            pass

    def _start_serving(self, protocol_factory, sock,
                       sslcontext=None, server=None):
        self.add_reader(sock.fileno(), self._accept_connection,
                        protocol_factory, sock, sslcontext, server)

    def _accept_connection(self, protocol_factory, sock,
                           sslcontext=None, server=None):
        try:
            conn, addr = sock.accept()
            conn.setblocking(False)
        except (BlockingIOError, InterruptedError, ConnectionAbortedError):
            pass  # False alarm.
        except OSError as exc:
            # There's nowhere to send the error, so just log it.
            # TODO: Someone will want an error handler for this.
            if exc.errno in (errno.EMFILE, errno.ENFILE,
                             errno.ENOBUFS, errno.ENOMEM):
                # Some platforms (e.g. Linux keep reporting the FD as
                # ready, so we remove the read handler temporarily.
                # We'll try again in a while.
                logger.exception('Accept out of system resource (%s)', exc)
                self.remove_reader(sock.fileno())
                self.call_later(constants.ACCEPT_RETRY_DELAY,
                                self._start_serving,
                                protocol_factory, sock, sslcontext, server)
            else:
                raise  # The event loop will catch, log and ignore it.
        else:
            if sslcontext:
                self._make_ssl_transport(
                    conn, protocol_factory(), sslcontext, None,
                    server_side=True, extra={'peername': addr}, server=server)
            else:
                self._make_socket_transport(
                    conn, protocol_factory(), extra={'peername': addr},
                    server=server)
        # It's now up to the protocol to handle the connection.

    def add_reader(self, fd, callback, *args):
        """Add a reader callback."""
        handle = events.Handle(callback, args)
        try:
            key = self._selector.get_key(fd)
        except KeyError:
            self._selector.register(fd, selectors.EVENT_READ,
                                    (handle, None))
        else:
            mask, (reader, writer) = key.events, key.data
            self._selector.modify(fd, mask | selectors.EVENT_READ,
                                  (handle, writer))
            if reader is not None:
                reader.cancel()

    def remove_reader(self, fd):
        """Remove a reader callback."""
        try:
            key = self._selector.get_key(fd)
        except KeyError:
            return False
        else:
            mask, (reader, writer) = key.events, key.data
            mask &= ~selectors.EVENT_READ
            if not mask:
                self._selector.unregister(fd)
            else:
                self._selector.modify(fd, mask, (None, writer))

            if reader is not None:
                reader.cancel()
                return True
            else:
                return False

    def add_writer(self, fd, callback, *args):
        """Add a writer callback.."""
        handle = events.Handle(callback, args)
        try:
            key = self._selector.get_key(fd)
        except KeyError:
            self._selector.register(fd, selectors.EVENT_WRITE,
                                    (None, handle))
        else:
            mask, (reader, writer) = key.events, key.data
            self._selector.modify(fd, mask | selectors.EVENT_WRITE,
                                  (reader, handle))
            if writer is not None:
                writer.cancel()

    def remove_writer(self, fd):
        """Remove a writer callback."""
        try:
            key = self._selector.get_key(fd)
        except KeyError:
            return False
        else:
            mask, (reader, writer) = key.events, key.data
            # Remove both writer and connector.
            mask &= ~selectors.EVENT_WRITE
            if not mask:
                self._selector.unregister(fd)
            else:
                self._selector.modify(fd, mask, (reader, None))

            if writer is not None:
                writer.cancel()
                return True
            else:
                return False

    def sock_recv(self, sock, n):
        """XXX"""
        fut = futures.Future(loop=self)
        self._sock_recv(fut, False, sock, n)
        return fut

    def _sock_recv(self, fut, registered, sock, n):
        # _sock_recv() can add itself as an I/O callback if the operation can't
        # be done immediatly. Don't use it directly, call sock_recv().
        fd = sock.fileno()
        if registered:
            # Remove the callback early.  It should be rare that the
            # selector says the fd is ready but the call still returns
            # EAGAIN, and I am willing to take a hit in that case in
            # order to simplify the common case.
            self.remove_reader(fd)
        if fut.cancelled():
            return
        try:
            data = sock.recv(n)
        except (BlockingIOError, InterruptedError):
            self.add_reader(fd, self._sock_recv, fut, True, sock, n)
        except Exception as exc:
            fut.set_exception(exc)
        else:
            fut.set_result(data)

    def sock_sendall(self, sock, data):
        """XXX"""
        fut = futures.Future(loop=self)
        if data:
            self._sock_sendall(fut, False, sock, data)
        else:
            fut.set_result(None)
        return fut

    def _sock_sendall(self, fut, registered, sock, data):
        fd = sock.fileno()

        if registered:
            self.remove_writer(fd)
        if fut.cancelled():
            return

        try:
            n = sock.send(data)
        except (BlockingIOError, InterruptedError):
            n = 0
        except Exception as exc:
            fut.set_exception(exc)
            return

        if n == len(data):
            fut.set_result(None)
        else:
            if n:
                data = data[n:]
            self.add_writer(fd, self._sock_sendall, fut, True, sock, data)

    def sock_connect(self, sock, address):
        """XXX"""
        fut = futures.Future(loop=self)
        try:
            base_events._check_resolved_address(sock, address)
        except ValueError as err:
            fut.set_exception(err)
        else:
            self._sock_connect(fut, False, sock, address)
        return fut

    def _sock_connect(self, fut, registered, sock, address):
        fd = sock.fileno()
        if registered:
            self.remove_writer(fd)
        if fut.cancelled():
            return
        try:
            if not registered:
                # First time around.
                sock.connect(address)
            else:
                err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
                if err != 0:
                    # Jump to the except clause below.
                    raise OSError(err, 'Connect call failed %s' % (address,))
        except (BlockingIOError, InterruptedError):
            self.add_writer(fd, self._sock_connect, fut, True, sock, address)
        except Exception as exc:
            fut.set_exception(exc)
        else:
            fut.set_result(None)

    def sock_accept(self, sock):
        """XXX"""
        fut = futures.Future(loop=self)
        self._sock_accept(fut, False, sock)
        return fut

    def _sock_accept(self, fut, registered, sock):
        fd = sock.fileno()
        if registered:
            self.remove_reader(fd)
        if fut.cancelled():
            return
        try:
            conn, address = sock.accept()
            conn.setblocking(False)
        except (BlockingIOError, InterruptedError):
            self.add_reader(fd, self._sock_accept, fut, True, sock)
        except Exception as exc:
            fut.set_exception(exc)
        else:
            fut.set_result((conn, address))

    def _process_events(self, event_list):
        for key, mask in event_list:
            fileobj, (reader, writer) = key.fileobj, key.data
            if mask & selectors.EVENT_READ and reader is not None:
                if reader._cancelled:
                    self.remove_reader(fileobj)
                else:
                    self._add_callback(reader)
            if mask & selectors.EVENT_WRITE and writer is not None:
                if writer._cancelled:
                    self.remove_writer(fileobj)
                else:
                    self._add_callback(writer)

    def _stop_serving(self, sock):
        self.remove_reader(sock.fileno())
        sock.close()


class _FlowControlMixin(transports.Transport):
    """All the logic for (write) flow control in a mix-in base class.

    The subclass must implement get_write_buffer_size().  It must call
    _maybe_pause_protocol() whenever the write buffer size increases,
    and _maybe_resume_protocol() whenever it decreases.  It may also
    override set_write_buffer_limits() (e.g. to specify different
    defaults).

    The subclass constructor must call super().__init__(extra).  This
    will call set_write_buffer_limits().

    The user may call set_write_buffer_limits() and
    get_write_buffer_size(), and their protocol's pause_writing() and
    resume_writing() may be called.
    """

    def __init__(self, extra=None):
        super().__init__(extra)
        self._protocol_paused = False
        self.set_write_buffer_limits()

    def _maybe_pause_protocol(self):
        size = self.get_write_buffer_size()
        if size <= self._high_water:
            return
        if not self._protocol_paused:
            self._protocol_paused = True
            try:
                self._protocol.pause_writing()
            except Exception:
                logger.exception('pause_writing() failed')

    def _maybe_resume_protocol(self):
        if (self._protocol_paused and
            self.get_write_buffer_size() <= self._low_water):
            self._protocol_paused = False
            try:
                self._protocol.resume_writing()
            except Exception:
                logger.exception('resume_writing() failed')

    def set_write_buffer_limits(self, high=None, low=None):
        if high is None:
            if low is None:
                high = 64*1024
            else:
                high = 4*low
        if low is None:
            low = high // 4
        if not high >= low >= 0:
            raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
                             (high, low))
        self._high_water = high
        self._low_water = low

    def get_write_buffer_size(self):
        raise NotImplementedError


class _SelectorTransport(_FlowControlMixin, transports.Transport):

    max_size = 256 * 1024  # Buffer size passed to recv().

    _buffer_factory = bytearray  # Constructs initial value for self._buffer.

    def __init__(self, loop, sock, protocol, extra, server=None):
        super().__init__(extra)
        self._extra['socket'] = sock
        self._extra['sockname'] = sock.getsockname()
        if 'peername' not in self._extra:
            try:
                self._extra['peername'] = sock.getpeername()
            except socket.error:
                self._extra['peername'] = None
        self._loop = loop
        self._sock = sock
        self._sock_fd = sock.fileno()
        self._protocol = protocol
        self._server = server
        self._buffer = self._buffer_factory()
        self._conn_lost = 0  # Set when call to connection_lost scheduled.
        self._closing = False  # Set when close() called.
        if self._server is not None:
            self._server.attach(self)

    def abort(self):
        self._force_close(None)

    def close(self):
        if self._closing:
            return
        self._closing = True
        self._loop.remove_reader(self._sock_fd)
        if not self._buffer:
            self._conn_lost += 1
            self._loop.call_soon(self._call_connection_lost, None)

    def _fatal_error(self, exc):
        # Should be called from exception handler only.
        if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
            logger.exception('Fatal error for %s', self)
        self._force_close(exc)

    def _force_close(self, exc):
        if self._conn_lost:
            return
        if self._buffer:
            self._buffer.clear()
            self._loop.remove_writer(self._sock_fd)
        if not self._closing:
            self._closing = True
            self._loop.remove_reader(self._sock_fd)
        self._conn_lost += 1
        self._loop.call_soon(self._call_connection_lost, exc)

    def _call_connection_lost(self, exc):
        try:
            self._protocol.connection_lost(exc)
        finally:
            self._sock.close()
            self._sock = None
            self._protocol = None
            self._loop = None
            server = self._server
            if server is not None:
                server.detach(self)
                self._server = None

    def get_write_buffer_size(self):
        return len(self._buffer)


class _SelectorSocketTransport(_SelectorTransport):

    def __init__(self, loop, sock, protocol, waiter=None,
                 extra=None, server=None):
        super().__init__(loop, sock, protocol, extra, server)
        self._eof = False
        self._paused = False

        self._loop.add_reader(self._sock_fd, self._read_ready)
        self._loop.call_soon(self._protocol.connection_made, self)
        if waiter is not None:
            self._loop.call_soon(waiter.set_result, None)

    def pause_reading(self):
        if self._closing:
            raise RuntimeError('Cannot pause_reading() when closing')
        if self._paused:
            raise RuntimeError('Already paused')
        self._paused = True
        self._loop.remove_reader(self._sock_fd)

    def resume_reading(self):
        if not self._paused:
            raise RuntimeError('Not paused')
        self._paused = False
        if self._closing:
            return
        self._loop.add_reader(self._sock_fd, self._read_ready)

    def _read_ready(self):
        try:
            data = self._sock.recv(self.max_size)
        except (BlockingIOError, InterruptedError):
            pass
        except Exception as exc:
            self._fatal_error(exc)
        else:
            if data:
                self._protocol.data_received(data)
            else:
                keep_open = self._protocol.eof_received()
                if keep_open:
                    # We're keeping the connection open so the
                    # protocol can write more, but we still can't
                    # receive more, so remove the reader callback.
                    self._loop.remove_reader(self._sock_fd)
                else:
                    self.close()

    def write(self, data):
        if not isinstance(data, (bytes, bytearray, memoryview)):
            raise TypeError('data argument must be byte-ish (%r)',
                            type(data))
        if self._eof:
            raise RuntimeError('Cannot call write() after write_eof()')
        if not data:
            return

        if self._conn_lost:
            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
                logger.warning('socket.send() raised exception.')
            self._conn_lost += 1
            return

        if not self._buffer:
            # Optimization: try to send now.
            try:
                n = self._sock.send(data)
            except (BlockingIOError, InterruptedError):
                pass
            except Exception as exc:
                self._fatal_error(exc)
                return
            else:
                data = data[n:]
                if not data:
                    return
            # Not all was written; register write handler.
            self._loop.add_writer(self._sock_fd, self._write_ready)

        # Add it to the buffer.
        self._buffer.extend(data)
        self._maybe_pause_protocol()

    def _write_ready(self):
        assert self._buffer, 'Data should not be empty'

        try:
            n = self._sock.send(self._buffer)
        except (BlockingIOError, InterruptedError):
            pass
        except Exception as exc:
            self._loop.remove_writer(self._sock_fd)
            self._buffer.clear()
            self._fatal_error(exc)
        else:
            if n:
                del self._buffer[:n]
            self._maybe_resume_protocol()  # May append to buffer.
            if not self._buffer:
                self._loop.remove_writer(self._sock_fd)
                if self._closing:
                    self._call_connection_lost(None)
                elif self._eof:
                    self._sock.shutdown(socket.SHUT_WR)

    def write_eof(self):
        if self._eof:
            return
        self._eof = True
        if not self._buffer:
            self._sock.shutdown(socket.SHUT_WR)

    def can_write_eof(self):
        return True


class _SelectorSslTransport(_SelectorTransport):

    _buffer_factory = bytearray

    def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
                 server_side=False, server_hostname=None,
                 extra=None, server=None):
        if ssl is None:
            raise RuntimeError('stdlib ssl module not available')

        if server_side:
            if not sslcontext:
                raise ValueError('Server side ssl needs a valid SSLContext')
        else:
            if not sslcontext:
                # Client side may pass ssl=True to use a default
                # context; in that case the sslcontext passed is None.
                # The default is the same as used by urllib with
                # cadefault=True.
                if hasattr(ssl, '_create_stdlib_context'):
                    sslcontext = ssl._create_stdlib_context(
                        cert_reqs=ssl.CERT_REQUIRED,
                        check_hostname=bool(server_hostname))
                else:
                    # Fallback for Python 3.3.
                    sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                    sslcontext.options |= ssl.OP_NO_SSLv2
                    sslcontext.set_default_verify_paths()
                    sslcontext.verify_mode = ssl.CERT_REQUIRED

        wrap_kwargs = {
            'server_side': server_side,
            'do_handshake_on_connect': False,
        }
        if server_hostname and not server_side and ssl.HAS_SNI:
            wrap_kwargs['server_hostname'] = server_hostname
        sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)

        super().__init__(loop, sslsock, protocol, extra, server)

        self._server_hostname = server_hostname
        self._waiter = waiter
        self._rawsock = rawsock
        self._sslcontext = sslcontext
        self._paused = False

        # SSL-specific extra info.  (peercert is set later)
        self._extra.update(sslcontext=sslcontext)

        self._on_handshake()

    def _on_handshake(self):
        try:
            self._sock.do_handshake()
        except ssl.SSLWantReadError:
            self._loop.add_reader(self._sock_fd, self._on_handshake)
            return
        except ssl.SSLWantWriteError:
            self._loop.add_writer(self._sock_fd, self._on_handshake)
            return
        except Exception as exc:
            self._loop.remove_reader(self._sock_fd)
            self._loop.remove_writer(self._sock_fd)
            self._sock.close()
            if self._waiter is not None:
                self._waiter.set_exception(exc)
            return
        except BaseException as exc:
            self._loop.remove_reader(self._sock_fd)
            self._loop.remove_writer(self._sock_fd)
            self._sock.close()
            if self._waiter is not None:
                self._waiter.set_exception(exc)
            raise

        self._loop.remove_reader(self._sock_fd)
        self._loop.remove_writer(self._sock_fd)

        peercert = self._sock.getpeercert()
        if not hasattr(self._sslcontext, 'check_hostname'):
            # Verify hostname if requested, Python 3.4+ uses check_hostname
            # and checks the hostname in do_handshake()
            if (self._server_hostname and
                self._sslcontext.verify_mode != ssl.CERT_NONE):
                try:
                    ssl.match_hostname(peercert, self._server_hostname)
                except Exception as exc:
                    self._sock.close()
                    if self._waiter is not None:
                        self._waiter.set_exception(exc)
                    return

        # Add extra info that becomes available after handshake.
        self._extra.update(peercert=peercert,
                           cipher=self._sock.cipher(),
                           compression=self._sock.compression(),
                           )

        self._read_wants_write = False
        self._write_wants_read = False
        self._loop.add_reader(self._sock_fd, self._read_ready)
        self._loop.call_soon(self._protocol.connection_made, self)
        if self._waiter is not None:
            self._loop.call_soon(self._waiter.set_result, None)

    def pause_reading(self):
        # XXX This is a bit icky, given the comment at the top of
        # _read_ready().  Is it possible to evoke a deadlock?  I don't
        # know, although it doesn't look like it; write() will still
        # accept more data for the buffer and eventually the app will
        # call resume_reading() again, and things will flow again.

        if self._closing:
            raise RuntimeError('Cannot pause_reading() when closing')
        if self._paused:
            raise RuntimeError('Already paused')
        self._paused = True
        self._loop.remove_reader(self._sock_fd)

    def resume_reading(self):
        if not self._paused:
            raise ('Not paused')
        self._paused = False
        if self._closing:
            return
        self._loop.add_reader(self._sock_fd, self._read_ready)

    def _read_ready(self):
        if self._write_wants_read:
            self._write_wants_read = False
            self._write_ready()

            if self._buffer:
                self._loop.add_writer(self._sock_fd, self._write_ready)

        try:
            data = self._sock.recv(self.max_size)
        except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
            pass
        except ssl.SSLWantWriteError:
            self._read_wants_write = True
            self._loop.remove_reader(self._sock_fd)
            self._loop.add_writer(self._sock_fd, self._write_ready)
        except Exception as exc:
            self._fatal_error(exc)
        else:
            if data:
                self._protocol.data_received(data)
            else:
                try:
                    keep_open = self._protocol.eof_received()
                    if keep_open:
                        logger.warning('returning true from eof_received() '
                                       'has no effect when using ssl')
                finally:
                    self.close()

    def _write_ready(self):
        if self._read_wants_write:
            self._read_wants_write = False
            self._read_ready()

            if not (self._paused or self._closing):
                self._loop.add_reader(self._sock_fd, self._read_ready)

        if self._buffer:
            try:
                n = self._sock.send(self._buffer)
            except (BlockingIOError, InterruptedError,
                    ssl.SSLWantWriteError):
                n = 0
            except ssl.SSLWantReadError:
                n = 0
                self._loop.remove_writer(self._sock_fd)
                self._write_wants_read = True
            except Exception as exc:
                self._loop.remove_writer(self._sock_fd)
                self._buffer.clear()
                self._fatal_error(exc)
                return

            if n:
                del self._buffer[:n]

        self._maybe_resume_protocol()  # May append to buffer.

        if not self._buffer:
            self._loop.remove_writer(self._sock_fd)
            if self._closing:
                self._call_connection_lost(None)

    def write(self, data):
        if not isinstance(data, (bytes, bytearray, memoryview)):
            raise TypeError('data argument must be byte-ish (%r)',
                            type(data))
        if not data:
            return

        if self._conn_lost:
            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
                logger.warning('socket.send() raised exception.')
            self._conn_lost += 1
            return

        if not self._buffer:
            self._loop.add_writer(self._sock_fd, self._write_ready)

        # Add it to the buffer.
        self._buffer.extend(data)
        self._maybe_pause_protocol()

    def can_write_eof(self):
        return False


class _SelectorDatagramTransport(_SelectorTransport):

    _buffer_factory = collections.deque

    def __init__(self, loop, sock, protocol, address=None, extra=None):
        super().__init__(loop, sock, protocol, extra)
        self._address = address
        self._loop.add_reader(self._sock_fd, self._read_ready)
        self._loop.call_soon(self._protocol.connection_made, self)

    def get_write_buffer_size(self):
        return sum(len(data) for data, _ in self._buffer)

    def _read_ready(self):
        try:
            data, addr = self._sock.recvfrom(self.max_size)
        except (BlockingIOError, InterruptedError):
            pass
        except OSError as exc:
            self._protocol.error_received(exc)
        except Exception as exc:
            self._fatal_error(exc)
        else:
            self._protocol.datagram_received(data, addr)

    def sendto(self, data, addr=None):
        if not isinstance(data, (bytes, bytearray, memoryview)):
            raise TypeError('data argument must be byte-ish (%r)',
                            type(data))
        if not data:
            return

        if self._address and addr not in (None, self._address):
            raise ValueError('Invalid address: must be None or %s' %
                             (self._address,))

        if self._conn_lost and self._address:
            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
                logger.warning('socket.send() raised exception.')
            self._conn_lost += 1
            return

        if not self._buffer:
            # Attempt to send it right away first.
            try:
                if self._address:
                    self._sock.send(data)
                else:
                    self._sock.sendto(data, addr)
                return
            except (BlockingIOError, InterruptedError):
                self._loop.add_writer(self._sock_fd, self._sendto_ready)
            except OSError as exc:
                self._protocol.error_received(exc)
                return
            except Exception as exc:
                self._fatal_error(exc)
                return

        # Ensure that what we buffer is immutable.
        self._buffer.append((bytes(data), addr))
        self._maybe_pause_protocol()

    def _sendto_ready(self):
        while self._buffer:
            data, addr = self._buffer.popleft()
            try:
                if self._address:
                    self._sock.send(data)
                else:
                    self._sock.sendto(data, addr)
            except (BlockingIOError, InterruptedError):
                self._buffer.appendleft((data, addr))  # Try again later.
                break
            except OSError as exc:
                self._protocol.error_received(exc)
                return
            except Exception as exc:
                self._fatal_error(exc)
                return

        self._maybe_resume_protocol()  # May append to buffer.
        if not self._buffer:
            self._loop.remove_writer(self._sock_fd)
            if self._closing:
                self._call_connection_lost(None)