diff options
Diffstat (limited to 'Lib/multiprocessing/reduction.py')
| -rw-r--r-- | Lib/multiprocessing/reduction.py | 407 | 
1 files changed, 236 insertions, 171 deletions
| diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 6e5e5bc..656fa8f 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -5,53 +5,29 @@  # multiprocessing/reduction.py  #  # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -#    notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -#    notice, this list of conditions and the following disclaimer in the -#    documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -#    used to endorse or promote products derived from this software -#    without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement.  # -__all__ = [] +__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle']  import os  import sys  import socket  import threading +import struct +import signal -import _multiprocessing  from multiprocessing import current_process -from multiprocessing.forking import Popen, duplicate, close, ForkingPickler  from multiprocessing.util import register_after_fork, debug, sub_debug -from multiprocessing.connection import Client, Listener +from multiprocessing.util import is_exiting, sub_warning  #  #  # -if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): +if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and +                                   hasattr(socket, 'SCM_RIGHTS'))):      raise ImportError('pickling of connections not supported')  # @@ -59,157 +35,246 @@ if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):  #  if sys.platform == 'win32': -    import _subprocess -    from _multiprocessing import win32 - -    def send_handle(conn, handle, destination_pid): -        process_handle = win32.OpenProcess( -            win32.PROCESS_ALL_ACCESS, False, destination_pid -            ) -        try: -            new_handle = duplicate(handle, process_handle) -            conn.send(new_handle) -        finally: -            close(process_handle) - -    def recv_handle(conn): -        return conn.recv() +    # Windows +    __all__ += ['reduce_pipe_connection'] +    import _winapi -else:      def send_handle(conn, handle, destination_pid): -        _multiprocessing.sendfd(conn.fileno(), handle) +        dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) +        conn.send(dh)      def recv_handle(conn): -        return _multiprocessing.recvfd(conn.fileno()) - -# -# Support for a per-process server thread which caches pickled handles -# - -_cache = set() - -def _reset(obj): -    global _lock, _listener, _cache -    for h in _cache: -        close(h) -    _cache.clear() -    _lock = threading.Lock() -    _listener = None - -_reset(None) -register_after_fork(_reset, _reset) - -def _get_listener(): -    global _listener - -    if _listener is None: -        _lock.acquire() -        try: -            if _listener is None: -                debug('starting listener and thread for sending handles') -                _listener = Listener(authkey=current_process().authkey) -                t = threading.Thread(target=_serve) -                t.daemon = True -                t.start() -        finally: -            _lock.release() - -    return _listener - -def _serve(): -    from .util import is_exiting, sub_warning - -    while 1: -        try: -            conn = _listener.accept() -            handle_wanted, destination_pid = conn.recv() -            _cache.remove(handle_wanted) -            send_handle(conn, handle_wanted, destination_pid) -            close(handle_wanted) -            conn.close() -        except: -            if not is_exiting(): -                import traceback -                sub_warning( -                    'thread for sharing handles raised exception :\n' + -                    '-'*79 + '\n' + traceback.format_exc() + '-'*79 -                    ) - -# -# Functions to be used for pickling/unpickling objects with handles -# - -def reduce_handle(handle): -    if Popen.thread_is_spawning(): -        return (None, Popen.duplicate_for_child(handle), True) -    dup_handle = duplicate(handle) -    _cache.add(dup_handle) -    sub_debug('reducing handle %d', handle) -    return (_get_listener().address, dup_handle, False) - -def rebuild_handle(pickled_data): -    address, handle, inherited = pickled_data -    if inherited: -        return handle -    sub_debug('rebuilding handle %d', handle) -    conn = Client(address, authkey=current_process().authkey) -    conn.send((handle, os.getpid())) -    new_handle = recv_handle(conn) -    conn.close() -    return new_handle +        return conn.recv().detach() + +    class DupHandle(object): +        def __init__(self, handle, access, pid=None): +            # duplicate handle for process with given pid +            if pid is None: +                pid = os.getpid() +            proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) +            try: +                self._handle = _winapi.DuplicateHandle( +                    _winapi.GetCurrentProcess(), +                    handle, proc, access, False, 0) +            finally: +                _winapi.CloseHandle(proc) +            self._access = access +            self._pid = pid + +        def detach(self): +            # retrieve handle from process which currently owns it +            if self._pid == os.getpid(): +                return self._handle +            proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, +                                       self._pid) +            try: +                return _winapi.DuplicateHandle( +                    proc, self._handle, _winapi.GetCurrentProcess(), +                    self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) +            finally: +                _winapi.CloseHandle(proc) + +    class DupSocket(object): +        def __init__(self, sock): +            new_sock = sock.dup() +            def send(conn, pid): +                share = new_sock.share(pid) +                conn.send_bytes(share) +            self._id = resource_sharer.register(send, new_sock.close) + +        def detach(self): +            conn = resource_sharer.get_connection(self._id) +            try: +                share = conn.recv_bytes() +                return socket.fromshare(share) +            finally: +                conn.close() + +    def reduce_socket(s): +        return rebuild_socket, (DupSocket(s),) + +    def rebuild_socket(ds): +        return ds.detach() + +    def reduce_connection(conn): +        handle = conn.fileno() +        with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: +            ds = DupSocket(s) +            return rebuild_connection, (ds, conn.readable, conn.writable) + +    def rebuild_connection(ds, readable, writable): +        from .connection import Connection +        sock = ds.detach() +        return Connection(sock.detach(), readable, writable) -# -# Register `_multiprocessing.Connection` with `ForkingPickler` -# - -def reduce_connection(conn): -    rh = reduce_handle(conn.fileno()) -    return rebuild_connection, (rh, conn.readable, conn.writable) - -def rebuild_connection(reduced_handle, readable, writable): -    handle = rebuild_handle(reduced_handle) -    return _multiprocessing.Connection( -        handle, readable=readable, writable=writable -        ) - -ForkingPickler.register(_multiprocessing.Connection, reduce_connection) +    def reduce_pipe_connection(conn): +        access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | +                  (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) +        dh = DupHandle(conn.fileno(), access) +        return rebuild_pipe_connection, (dh, conn.readable, conn.writable) -# -# Register `socket.socket` with `ForkingPickler` -# +    def rebuild_pipe_connection(dh, readable, writable): +        from .connection import PipeConnection +        handle = dh.detach() +        return PipeConnection(handle, readable, writable) -def fromfd(fd, family, type_, proto=0): -    s = socket.fromfd(fd, family, type_, proto) -    if s.__class__ is not socket.socket: -        s = socket.socket(_sock=s) -    return s +else: +    # Unix -def reduce_socket(s): -    reduced_handle = reduce_handle(s.fileno()) -    return rebuild_socket, (reduced_handle, s.family, s.type, s.proto) +    # On MacOSX we should acknowledge receipt of fds -- see Issue14669 +    ACKNOWLEDGE = sys.platform == 'darwin' -def rebuild_socket(reduced_handle, family, type_, proto): -    fd = rebuild_handle(reduced_handle) -    _sock = fromfd(fd, family, type_, proto) -    close(fd) -    return _sock +    def send_handle(conn, handle, destination_pid): +        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: +            s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, +                                struct.pack("@i", handle))]) +        if ACKNOWLEDGE and conn.recv_bytes() != b'ACK': +            raise RuntimeError('did not receive acknowledgement of fd') -ForkingPickler.register(socket.socket, reduce_socket) +    def recv_handle(conn): +        size = struct.calcsize("@i") +        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: +            msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size)) +            try: +                if ACKNOWLEDGE: +                    conn.send_bytes(b'ACK') +                cmsg_level, cmsg_type, cmsg_data = ancdata[0] +                if (cmsg_level == socket.SOL_SOCKET and +                    cmsg_type == socket.SCM_RIGHTS): +                    return struct.unpack("@i", cmsg_data[:size])[0] +            except (ValueError, IndexError, struct.error): +                pass +            raise RuntimeError('Invalid data received') + +    class DupFd(object): +        def __init__(self, fd): +            new_fd = os.dup(fd) +            def send(conn, pid): +                send_handle(conn, new_fd, pid) +            def close(): +                os.close(new_fd) +            self._id = resource_sharer.register(send, close) + +        def detach(self): +            conn = resource_sharer.get_connection(self._id) +            try: +                return recv_handle(conn) +            finally: +                conn.close() + +    def reduce_socket(s): +        df = DupFd(s.fileno()) +        return rebuild_socket, (df, s.family, s.type, s.proto) + +    def rebuild_socket(df, family, type, proto): +        fd = df.detach() +        s = socket.fromfd(fd, family, type, proto) +        os.close(fd) +        return s + +    def reduce_connection(conn): +        df = DupFd(conn.fileno()) +        return rebuild_connection, (df, conn.readable, conn.writable) + +    def rebuild_connection(df, readable, writable): +        from .connection import Connection +        fd = df.detach() +        return Connection(fd, readable, writable)  # -# Register `_multiprocessing.PipeConnection` with `ForkingPickler` +# Server which shares registered resources with clients  # -if sys.platform == 'win32': - -    def reduce_pipe_connection(conn): -        rh = reduce_handle(conn.fileno()) -        return rebuild_pipe_connection, (rh, conn.readable, conn.writable) - -    def rebuild_pipe_connection(reduced_handle, readable, writable): -        handle = rebuild_handle(reduced_handle) -        return _multiprocessing.PipeConnection( -            handle, readable=readable, writable=writable -            ) - -    ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection) +class ResourceSharer(object): +    def __init__(self): +        self._key = 0 +        self._cache = {} +        self._old_locks = [] +        self._lock = threading.Lock() +        self._listener = None +        self._address = None +        self._thread = None +        register_after_fork(self, ResourceSharer._afterfork) + +    def register(self, send, close): +        with self._lock: +            if self._address is None: +                self._start() +            self._key += 1 +            self._cache[self._key] = (send, close) +            return (self._address, self._key) + +    @staticmethod +    def get_connection(ident): +        from .connection import Client +        address, key = ident +        c = Client(address, authkey=current_process().authkey) +        c.send((key, os.getpid())) +        return c + +    def stop(self, timeout=None): +        from .connection import Client +        with self._lock: +            if self._address is not None: +                c = Client(self._address, authkey=current_process().authkey) +                c.send(None) +                c.close() +                self._thread.join(timeout) +                if self._thread.is_alive(): +                    sub_warn('ResourceSharer thread did not stop when asked') +                self._listener.close() +                self._thread = None +                self._address = None +                self._listener = None +                for key, (send, close) in self._cache.items(): +                    close() +                self._cache.clear() + +    def _afterfork(self): +        for key, (send, close) in self._cache.items(): +            close() +        self._cache.clear() +        # If self._lock was locked at the time of the fork, it may be broken +        # -- see issue 6721.  Replace it without letting it be gc'ed. +        self._old_locks.append(self._lock) +        self._lock = threading.Lock() +        if self._listener is not None: +            self._listener.close() +        self._listener = None +        self._address = None +        self._thread = None + +    def _start(self): +        from .connection import Listener +        assert self._listener is None +        debug('starting listener and thread for sending handles') +        self._listener = Listener(authkey=current_process().authkey) +        self._address = self._listener.address +        t = threading.Thread(target=self._serve) +        t.daemon = True +        t.start() +        self._thread = t + +    def _serve(self): +        if hasattr(signal, 'pthread_sigmask'): +            signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) +        while 1: +            try: +                conn = self._listener.accept() +                msg = conn.recv() +                if msg is None: +                    break +                key, destination_pid = msg +                send, close = self._cache.pop(key) +                send(conn, destination_pid) +                close() +                conn.close() +            except: +                if not is_exiting(): +                    import traceback +                    sub_warning( +                        'thread for sharing handles raised exception :\n' + +                        '-'*79 + '\n' + traceback.format_exc() + '-'*79 +                        ) + +resource_sharer = ResourceSharer() | 
