diff options
Diffstat (limited to 'Lib/multiprocessing/connection.py')
-rw-r--r-- | Lib/multiprocessing/connection.py | 61 |
1 files changed, 41 insertions, 20 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 1093d9f..443fa7e 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -21,9 +21,13 @@ import tempfile import itertools import _multiprocessing -from multiprocessing import current_process, AuthenticationError, BufferTooShort -from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug -from multiprocessing.forking import ForkingPickler + +from . import reduction +from . import util + +from . import AuthenticationError, BufferTooShort +from .reduction import ForkingPickler + try: import _winapi from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE @@ -71,7 +75,7 @@ def arbitrary_address(family): if family == 'AF_INET': return ('localhost', 0) elif family == 'AF_UNIX': - return tempfile.mktemp(prefix='listener-', dir=get_temp_dir()) + return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) elif family == 'AF_PIPE': return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % (os.getpid(), next(_mmap_counter))) @@ -505,7 +509,7 @@ if sys.platform != 'win32': c1 = Connection(s1.detach()) c2 = Connection(s2.detach()) else: - fd1, fd2 = os.pipe() + fd1, fd2 = util.pipe() c1 = Connection(fd1, writable=False) c2 = Connection(fd2, readable=False) @@ -577,7 +581,7 @@ class SocketListener(object): self._last_accepted = None if family == 'AF_UNIX': - self._unlink = Finalize( + self._unlink = util.Finalize( self, os.unlink, args=(address,), exitpriority=0 ) else: @@ -625,8 +629,8 @@ if sys.platform == 'win32': self._handle_queue = [self._new_handle(first=True)] self._last_accepted = None - sub_debug('listener created with address=%r', self._address) - self.close = Finalize( + util.sub_debug('listener created with address=%r', self._address) + self.close = util.Finalize( self, PipeListener._finalize_pipe_listener, args=(self._handle_queue, self._address), exitpriority=0 ) @@ -668,7 +672,7 @@ if sys.platform == 'win32': @staticmethod def _finalize_pipe_listener(queue, address): - sub_debug('closing listener with address=%r', address) + util.sub_debug('closing listener with address=%r', address) for handle in queue: _winapi.CloseHandle(handle) @@ -919,15 +923,32 @@ else: # if sys.platform == 'win32': - from . import reduction - ForkingPickler.register(socket.socket, reduction.reduce_socket) - ForkingPickler.register(Connection, reduction.reduce_connection) - ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection) + def reduce_connection(conn): + handle = conn.fileno() + with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: + from . import resource_sharer + ds = resource_sharer.DupSocket(s) + return rebuild_connection, (ds, conn.readable, conn.writable) + def rebuild_connection(ds, readable, writable): + sock = ds.detach() + return Connection(sock.detach(), readable, writable) + reduction.register(Connection, reduce_connection) + + def reduce_pipe_connection(conn): + access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | + (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) + dh = reduction.DupHandle(conn.fileno(), access) + return rebuild_pipe_connection, (dh, conn.readable, conn.writable) + def rebuild_pipe_connection(dh, readable, writable): + handle = dh.detach() + return PipeConnection(handle, readable, writable) + reduction.register(PipeConnection, reduce_pipe_connection) + else: - try: - from . import reduction - except ImportError: - pass - else: - ForkingPickler.register(socket.socket, reduction.reduce_socket) - ForkingPickler.register(Connection, reduction.reduce_connection) + def reduce_connection(conn): + df = reduction.DupFd(conn.fileno()) + return rebuild_connection, (df, conn.readable, conn.writable) + def rebuild_connection(df, readable, writable): + fd = df.detach() + return Connection(fd, readable, writable) + reduction.register(Connection, reduce_connection) |