summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/reduction.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/reduction.py')
-rw-r--r--Lib/multiprocessing/reduction.py407
1 files changed, 236 insertions, 171 deletions
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index 6e5e5bc..656fa8f 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -5,53 +5,29 @@
# multiprocessing/reduction.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
-__all__ = []
+__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle']
import os
import sys
import socket
import threading
+import struct
+import signal
-import _multiprocessing
from multiprocessing import current_process
-from multiprocessing.forking import Popen, duplicate, close, ForkingPickler
from multiprocessing.util import register_after_fork, debug, sub_debug
-from multiprocessing.connection import Client, Listener
+from multiprocessing.util import is_exiting, sub_warning
#
#
#
-if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
+if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
+ hasattr(socket, 'SCM_RIGHTS'))):
raise ImportError('pickling of connections not supported')
#
@@ -59,157 +35,246 @@ if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
#
if sys.platform == 'win32':
- import _subprocess
- from _multiprocessing import win32
-
- def send_handle(conn, handle, destination_pid):
- process_handle = win32.OpenProcess(
- win32.PROCESS_ALL_ACCESS, False, destination_pid
- )
- try:
- new_handle = duplicate(handle, process_handle)
- conn.send(new_handle)
- finally:
- close(process_handle)
-
- def recv_handle(conn):
- return conn.recv()
+ # Windows
+ __all__ += ['reduce_pipe_connection']
+ import _winapi
-else:
def send_handle(conn, handle, destination_pid):
- _multiprocessing.sendfd(conn.fileno(), handle)
+ dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
+ conn.send(dh)
def recv_handle(conn):
- return _multiprocessing.recvfd(conn.fileno())
-
-#
-# Support for a per-process server thread which caches pickled handles
-#
-
-_cache = set()
-
-def _reset(obj):
- global _lock, _listener, _cache
- for h in _cache:
- close(h)
- _cache.clear()
- _lock = threading.Lock()
- _listener = None
-
-_reset(None)
-register_after_fork(_reset, _reset)
-
-def _get_listener():
- global _listener
-
- if _listener is None:
- _lock.acquire()
- try:
- if _listener is None:
- debug('starting listener and thread for sending handles')
- _listener = Listener(authkey=current_process().authkey)
- t = threading.Thread(target=_serve)
- t.daemon = True
- t.start()
- finally:
- _lock.release()
-
- return _listener
-
-def _serve():
- from .util import is_exiting, sub_warning
-
- while 1:
- try:
- conn = _listener.accept()
- handle_wanted, destination_pid = conn.recv()
- _cache.remove(handle_wanted)
- send_handle(conn, handle_wanted, destination_pid)
- close(handle_wanted)
- conn.close()
- except:
- if not is_exiting():
- import traceback
- sub_warning(
- 'thread for sharing handles raised exception :\n' +
- '-'*79 + '\n' + traceback.format_exc() + '-'*79
- )
-
-#
-# Functions to be used for pickling/unpickling objects with handles
-#
-
-def reduce_handle(handle):
- if Popen.thread_is_spawning():
- return (None, Popen.duplicate_for_child(handle), True)
- dup_handle = duplicate(handle)
- _cache.add(dup_handle)
- sub_debug('reducing handle %d', handle)
- return (_get_listener().address, dup_handle, False)
-
-def rebuild_handle(pickled_data):
- address, handle, inherited = pickled_data
- if inherited:
- return handle
- sub_debug('rebuilding handle %d', handle)
- conn = Client(address, authkey=current_process().authkey)
- conn.send((handle, os.getpid()))
- new_handle = recv_handle(conn)
- conn.close()
- return new_handle
+ return conn.recv().detach()
+
+ class DupHandle(object):
+ def __init__(self, handle, access, pid=None):
+ # duplicate handle for process with given pid
+ if pid is None:
+ pid = os.getpid()
+ proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
+ try:
+ self._handle = _winapi.DuplicateHandle(
+ _winapi.GetCurrentProcess(),
+ handle, proc, access, False, 0)
+ finally:
+ _winapi.CloseHandle(proc)
+ self._access = access
+ self._pid = pid
+
+ def detach(self):
+ # retrieve handle from process which currently owns it
+ if self._pid == os.getpid():
+ return self._handle
+ proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
+ self._pid)
+ try:
+ return _winapi.DuplicateHandle(
+ proc, self._handle, _winapi.GetCurrentProcess(),
+ self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
+ finally:
+ _winapi.CloseHandle(proc)
+
+ class DupSocket(object):
+ def __init__(self, sock):
+ new_sock = sock.dup()
+ def send(conn, pid):
+ share = new_sock.share(pid)
+ conn.send_bytes(share)
+ self._id = resource_sharer.register(send, new_sock.close)
+
+ def detach(self):
+ conn = resource_sharer.get_connection(self._id)
+ try:
+ share = conn.recv_bytes()
+ return socket.fromshare(share)
+ finally:
+ conn.close()
+
+ def reduce_socket(s):
+ return rebuild_socket, (DupSocket(s),)
+
+ def rebuild_socket(ds):
+ return ds.detach()
+
+ def reduce_connection(conn):
+ handle = conn.fileno()
+ with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
+ ds = DupSocket(s)
+ return rebuild_connection, (ds, conn.readable, conn.writable)
+
+ def rebuild_connection(ds, readable, writable):
+ from .connection import Connection
+ sock = ds.detach()
+ return Connection(sock.detach(), readable, writable)
-#
-# Register `_multiprocessing.Connection` with `ForkingPickler`
-#
-
-def reduce_connection(conn):
- rh = reduce_handle(conn.fileno())
- return rebuild_connection, (rh, conn.readable, conn.writable)
-
-def rebuild_connection(reduced_handle, readable, writable):
- handle = rebuild_handle(reduced_handle)
- return _multiprocessing.Connection(
- handle, readable=readable, writable=writable
- )
-
-ForkingPickler.register(_multiprocessing.Connection, reduce_connection)
+ def reduce_pipe_connection(conn):
+ access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
+ (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
+ dh = DupHandle(conn.fileno(), access)
+ return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
-#
-# Register `socket.socket` with `ForkingPickler`
-#
+ def rebuild_pipe_connection(dh, readable, writable):
+ from .connection import PipeConnection
+ handle = dh.detach()
+ return PipeConnection(handle, readable, writable)
-def fromfd(fd, family, type_, proto=0):
- s = socket.fromfd(fd, family, type_, proto)
- if s.__class__ is not socket.socket:
- s = socket.socket(_sock=s)
- return s
+else:
+ # Unix
-def reduce_socket(s):
- reduced_handle = reduce_handle(s.fileno())
- return rebuild_socket, (reduced_handle, s.family, s.type, s.proto)
+ # On MacOSX we should acknowledge receipt of fds -- see Issue14669
+ ACKNOWLEDGE = sys.platform == 'darwin'
-def rebuild_socket(reduced_handle, family, type_, proto):
- fd = rebuild_handle(reduced_handle)
- _sock = fromfd(fd, family, type_, proto)
- close(fd)
- return _sock
+ def send_handle(conn, handle, destination_pid):
+ with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+ s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
+ struct.pack("@i", handle))])
+ if ACKNOWLEDGE and conn.recv_bytes() != b'ACK':
+ raise RuntimeError('did not receive acknowledgement of fd')
-ForkingPickler.register(socket.socket, reduce_socket)
+ def recv_handle(conn):
+ size = struct.calcsize("@i")
+ with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+ msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
+ try:
+ if ACKNOWLEDGE:
+ conn.send_bytes(b'ACK')
+ cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+ if (cmsg_level == socket.SOL_SOCKET and
+ cmsg_type == socket.SCM_RIGHTS):
+ return struct.unpack("@i", cmsg_data[:size])[0]
+ except (ValueError, IndexError, struct.error):
+ pass
+ raise RuntimeError('Invalid data received')
+
+ class DupFd(object):
+ def __init__(self, fd):
+ new_fd = os.dup(fd)
+ def send(conn, pid):
+ send_handle(conn, new_fd, pid)
+ def close():
+ os.close(new_fd)
+ self._id = resource_sharer.register(send, close)
+
+ def detach(self):
+ conn = resource_sharer.get_connection(self._id)
+ try:
+ return recv_handle(conn)
+ finally:
+ conn.close()
+
+ def reduce_socket(s):
+ df = DupFd(s.fileno())
+ return rebuild_socket, (df, s.family, s.type, s.proto)
+
+ def rebuild_socket(df, family, type, proto):
+ fd = df.detach()
+ s = socket.fromfd(fd, family, type, proto)
+ os.close(fd)
+ return s
+
+ def reduce_connection(conn):
+ df = DupFd(conn.fileno())
+ return rebuild_connection, (df, conn.readable, conn.writable)
+
+ def rebuild_connection(df, readable, writable):
+ from .connection import Connection
+ fd = df.detach()
+ return Connection(fd, readable, writable)
#
-# Register `_multiprocessing.PipeConnection` with `ForkingPickler`
+# Server which shares registered resources with clients
#
-if sys.platform == 'win32':
-
- def reduce_pipe_connection(conn):
- rh = reduce_handle(conn.fileno())
- return rebuild_pipe_connection, (rh, conn.readable, conn.writable)
-
- def rebuild_pipe_connection(reduced_handle, readable, writable):
- handle = rebuild_handle(reduced_handle)
- return _multiprocessing.PipeConnection(
- handle, readable=readable, writable=writable
- )
-
- ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection)
+class ResourceSharer(object):
+ def __init__(self):
+ self._key = 0
+ self._cache = {}
+ self._old_locks = []
+ self._lock = threading.Lock()
+ self._listener = None
+ self._address = None
+ self._thread = None
+ register_after_fork(self, ResourceSharer._afterfork)
+
+ def register(self, send, close):
+ with self._lock:
+ if self._address is None:
+ self._start()
+ self._key += 1
+ self._cache[self._key] = (send, close)
+ return (self._address, self._key)
+
+ @staticmethod
+ def get_connection(ident):
+ from .connection import Client
+ address, key = ident
+ c = Client(address, authkey=current_process().authkey)
+ c.send((key, os.getpid()))
+ return c
+
+ def stop(self, timeout=None):
+ from .connection import Client
+ with self._lock:
+ if self._address is not None:
+ c = Client(self._address, authkey=current_process().authkey)
+ c.send(None)
+ c.close()
+ self._thread.join(timeout)
+ if self._thread.is_alive():
+ sub_warn('ResourceSharer thread did not stop when asked')
+ self._listener.close()
+ self._thread = None
+ self._address = None
+ self._listener = None
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+
+ def _afterfork(self):
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+ # If self._lock was locked at the time of the fork, it may be broken
+ # -- see issue 6721. Replace it without letting it be gc'ed.
+ self._old_locks.append(self._lock)
+ self._lock = threading.Lock()
+ if self._listener is not None:
+ self._listener.close()
+ self._listener = None
+ self._address = None
+ self._thread = None
+
+ def _start(self):
+ from .connection import Listener
+ assert self._listener is None
+ debug('starting listener and thread for sending handles')
+ self._listener = Listener(authkey=current_process().authkey)
+ self._address = self._listener.address
+ t = threading.Thread(target=self._serve)
+ t.daemon = True
+ t.start()
+ self._thread = t
+
+ def _serve(self):
+ if hasattr(signal, 'pthread_sigmask'):
+ signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
+ while 1:
+ try:
+ conn = self._listener.accept()
+ msg = conn.recv()
+ if msg is None:
+ break
+ key, destination_pid = msg
+ send, close = self._cache.pop(key)
+ send(conn, destination_pid)
+ close()
+ conn.close()
+ except:
+ if not is_exiting():
+ import traceback
+ sub_warning(
+ 'thread for sharing handles raised exception :\n' +
+ '-'*79 + '\n' + traceback.format_exc() + '-'*79
+ )
+
+resource_sharer = ResourceSharer()