diff options
author | Benjamin Peterson <benjamin@python.org> | 2008-06-11 16:44:04 (GMT) |
---|---|---|
committer | Benjamin Peterson <benjamin@python.org> | 2008-06-11 16:44:04 (GMT) |
commit | e711cafab13efc9c1fe6c5cd75826401445eb585 (patch) | |
tree | 091a6334fdf6ccdcb93027302c5e038570ca04a4 /Lib/multiprocessing/connection.py | |
parent | eec3d7137929611b98dd593cd2f122cd91b723b2 (diff) | |
download | cpython-e711cafab13efc9c1fe6c5cd75826401445eb585.zip cpython-e711cafab13efc9c1fe6c5cd75826401445eb585.tar.gz cpython-e711cafab13efc9c1fe6c5cd75826401445eb585.tar.bz2 |
Merged revisions 64104,64117 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r64104 | benjamin.peterson | 2008-06-10 21:40:25 -0500 (Tue, 10 Jun 2008) | 2 lines
add the multiprocessing package to fulfill PEP 371
........
r64117 | benjamin.peterson | 2008-06-11 07:26:31 -0500 (Wed, 11 Jun 2008) | 2 lines
fix import of multiprocessing by juggling imports
........
Diffstat (limited to 'Lib/multiprocessing/connection.py')
-rw-r--r-- | Lib/multiprocessing/connection.py | 425 |
1 files changed, 425 insertions, 0 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py new file mode 100644 index 0000000..27c44cd --- /dev/null +++ b/Lib/multiprocessing/connection.py @@ -0,0 +1,425 @@ +# +# A higher level module for using sockets (or Windows named pipes) +# +# multiprocessing/connection.py +# +# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# + +__all__ = [ 'Client', 'Listener', 'Pipe' ] + +import os +import sys +import socket +import time +import tempfile +import itertools + +import _multiprocessing +from multiprocessing import current_process +from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug +from multiprocessing.forking import duplicate, close + + +# +# +# + +BUFSIZE = 8192 + +_mmap_counter = itertools.count() + +default_family = 'AF_INET' +families = ['AF_INET'] + +if hasattr(socket, 'AF_UNIX'): + default_family = 'AF_UNIX' + families += ['AF_UNIX'] + +if sys.platform == 'win32': + default_family = 'AF_PIPE' + families += ['AF_PIPE'] + +# +# +# + +def arbitrary_address(family): + ''' + Return an arbitrary free address for the given family + ''' + if family == 'AF_INET': + return ('localhost', 0) + elif family == 'AF_UNIX': + return tempfile.mktemp(prefix='listener-', dir=get_temp_dir()) + elif family == 'AF_PIPE': + return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % + (os.getpid(), next(_mmap_counter))) + else: + raise ValueError('unrecognized family') + + +def address_type(address): + ''' + Return the types of the address + + This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' + ''' + if type(address) == tuple: + return 'AF_INET' + elif type(address) is str and address.startswith('\\\\'): + return 'AF_PIPE' + elif type(address) is str: + return 'AF_UNIX' + else: + raise ValueError('address type of %r unrecognized' % address) + +# +# Public functions +# + +class Listener(object): + ''' + Returns a listener object. + + This is a wrapper for a bound socket which is 'listening' for + connections, or for a Windows named pipe. + ''' + def __init__(self, address=None, family=None, backlog=1, authkey=None): + family = family or (address and address_type(address)) \ + or default_family + address = address or arbitrary_address(family) + + if family == 'AF_PIPE': + self._listener = PipeListener(address, backlog) + else: + self._listener = SocketListener(address, family, backlog) + + if authkey is not None and not isinstance(authkey, bytes): + raise TypeError('authkey should be a byte string') + + self._authkey = authkey + + def accept(self): + ''' + Accept a connection on the bound socket or named pipe of `self`. + + Returns a `Connection` object. + ''' + c = self._listener.accept() + if self._authkey: + deliver_challenge(c, self._authkey) + answer_challenge(c, self._authkey) + return c + + def close(self): + ''' + Close the bound socket or named pipe of `self`. + ''' + return self._listener.close() + + address = property(lambda self: self._listener._address) + last_accepted = property(lambda self: self._listener._last_accepted) + + +def Client(address, family=None, authkey=None): + ''' + Returns a connection to the address of a `Listener` + ''' + family = family or address_type(address) + if family == 'AF_PIPE': + c = PipeClient(address) + else: + c = SocketClient(address) + + if authkey is not None and not isinstance(authkey, bytes): + raise TypeError('authkey should be a byte string') + + if authkey is not None: + answer_challenge(c, authkey) + deliver_challenge(c, authkey) + + return c + + +if sys.platform != 'win32': + + def Pipe(duplex=True): + ''' + Returns pair of connection objects at either end of a pipe + ''' + if duplex: + s1, s2 = socket.socketpair() + c1 = _multiprocessing.Connection(os.dup(s1.fileno())) + c2 = _multiprocessing.Connection(os.dup(s2.fileno())) + s1.close() + s2.close() + else: + fd1, fd2 = os.pipe() + c1 = _multiprocessing.Connection(fd1, writable=False) + c2 = _multiprocessing.Connection(fd2, readable=False) + + return c1, c2 + +else: + + from ._multiprocessing import win32 + + def Pipe(duplex=True): + ''' + Returns pair of connection objects at either end of a pipe + ''' + address = arbitrary_address('AF_PIPE') + if duplex: + openmode = win32.PIPE_ACCESS_DUPLEX + access = win32.GENERIC_READ | win32.GENERIC_WRITE + obsize, ibsize = BUFSIZE, BUFSIZE + else: + openmode = win32.PIPE_ACCESS_INBOUND + access = win32.GENERIC_WRITE + obsize, ibsize = 0, BUFSIZE + + h1 = win32.CreateNamedPipe( + address, openmode, + win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | + win32.PIPE_WAIT, + 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL + ) + h2 = win32.CreateFile( + address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL + ) + win32.SetNamedPipeHandleState( + h2, win32.PIPE_READMODE_MESSAGE, None, None + ) + + try: + win32.ConnectNamedPipe(h1, win32.NULL) + except WindowsError as e: + if e.args[0] != win32.ERROR_PIPE_CONNECTED: + raise + + c1 = _multiprocessing.PipeConnection(h1, writable=duplex) + c2 = _multiprocessing.PipeConnection(h2, readable=duplex) + + return c1, c2 + +# +# Definitions for connections based on sockets +# + +class SocketListener(object): + ''' + Represtation of a socket which is bound to an address and listening + ''' + def __init__(self, address, family, backlog=1): + self._socket = socket.socket(getattr(socket, family)) + self._socket.bind(address) + self._socket.listen(backlog) + address = self._socket.getsockname() + if type(address) is tuple: + address = (socket.getfqdn(address[0]),) + address[1:] + self._address = address + self._family = family + self._last_accepted = None + + sub_debug('listener bound to address %r', self._address) + + if family == 'AF_UNIX': + self._unlink = Finalize( + self, os.unlink, args=(self._address,), exitpriority=0 + ) + else: + self._unlink = None + + def accept(self): + s, self._last_accepted = self._socket.accept() + fd = duplicate(s.fileno()) + conn = _multiprocessing.Connection(fd) + s.close() + return conn + + def close(self): + self._socket.close() + if self._unlink is not None: + self._unlink() + + +def SocketClient(address): + ''' + Return a connection object connected to the socket given by `address` + ''' + family = address_type(address) + s = socket.socket( getattr(socket, family) ) + + while 1: + try: + s.connect(address) + except socket.error as e: + if e.args[0] != 10061: # 10061 => connection refused + debug('failed to connect to address %s', address) + raise + time.sleep(0.01) + else: + break + else: + raise + + fd = duplicate(s.fileno()) + conn = _multiprocessing.Connection(fd) + s.close() + return conn + +# +# Definitions for connections based on named pipes +# + +if sys.platform == 'win32': + + class PipeListener(object): + ''' + Representation of a named pipe + ''' + def __init__(self, address, backlog=None): + self._address = address + handle = win32.CreateNamedPipe( + address, win32.PIPE_ACCESS_DUPLEX, + win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | + win32.PIPE_WAIT, + win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, + win32.NMPWAIT_WAIT_FOREVER, win32.NULL + ) + self._handle_queue = [handle] + self._last_accepted = None + + sub_debug('listener created with address=%r', self._address) + + self.close = Finalize( + self, PipeListener._finalize_pipe_listener, + args=(self._handle_queue, self._address), exitpriority=0 + ) + + def accept(self): + newhandle = win32.CreateNamedPipe( + self._address, win32.PIPE_ACCESS_DUPLEX, + win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | + win32.PIPE_WAIT, + win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, + win32.NMPWAIT_WAIT_FOREVER, win32.NULL + ) + self._handle_queue.append(newhandle) + handle = self._handle_queue.pop(0) + try: + win32.ConnectNamedPipe(handle, win32.NULL) + except WindowsError as e: + if e.args[0] != win32.ERROR_PIPE_CONNECTED: + raise + return _multiprocessing.PipeConnection(handle) + + @staticmethod + def _finalize_pipe_listener(queue, address): + sub_debug('closing listener with address=%r', address) + for handle in queue: + close(handle) + + def PipeClient(address): + ''' + Return a connection object connected to the pipe given by `address` + ''' + while 1: + try: + win32.WaitNamedPipe(address, 1000) + h = win32.CreateFile( + address, win32.GENERIC_READ | win32.GENERIC_WRITE, + 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL + ) + except WindowsError as e: + if e.args[0] not in (win32.ERROR_SEM_TIMEOUT, + win32.ERROR_PIPE_BUSY): + raise + else: + break + else: + raise + + win32.SetNamedPipeHandleState( + h, win32.PIPE_READMODE_MESSAGE, None, None + ) + return _multiprocessing.PipeConnection(h) + +# +# Authentication stuff +# + +MESSAGE_LENGTH = 20 + +CHALLENGE = '#CHALLENGE#' +WELCOME = '#WELCOME#' +FAILURE = '#FAILURE#' + +if sys.version_info >= (3, 0): # XXX can use bytes literals in 2.6/3.0 + CHALLENGE = CHALLENGE.encode('ascii') + WELCOME = WELCOME.encode('ascii') + FAILURE = FAILURE.encode('ascii') + +def deliver_challenge(connection, authkey): + import hmac + assert isinstance(authkey, bytes) + message = os.urandom(MESSAGE_LENGTH) + connection.send_bytes(CHALLENGE + message) + digest = hmac.new(authkey, message).digest() + response = connection.recv_bytes(256) # reject large message + if response == digest: + connection.send_bytes(WELCOME) + else: + connection.send_bytes(FAILURE) + raise AuthenticationError('digest received was wrong') + +def answer_challenge(connection, authkey): + import hmac + assert isinstance(authkey, bytes) + message = connection.recv_bytes(256) # reject large message + assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message + message = message[len(CHALLENGE):] + digest = hmac.new(authkey, message).digest() + connection.send_bytes(digest) + response = connection.recv_bytes(256) # reject large message + if response != WELCOME: + raise AuthenticationError('digest sent was rejected') + +# +# Support for using xmlrpclib for serialization +# + +class ConnectionWrapper(object): + def __init__(self, conn, dumps, loads): + self._conn = conn + self._dumps = dumps + self._loads = loads + for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): + obj = getattr(conn, attr) + setattr(self, attr, obj) + def send(self, obj): + s = self._dumps(obj) + self._conn.send_bytes(s) + def recv(self): + s = self._conn.recv_bytes() + return self._loads(s) + +def _xml_dumps(obj): + return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8') + +def _xml_loads(s): + (obj,), method = xmlrpclib.loads(s.decode('utf8')) + return obj + +class XmlListener(Listener): + def accept(self): + global xmlrpclib + import xmlrpc.client as xmlrpclib + obj = Listener.accept(self) + return ConnectionWrapper(obj, _xml_dumps, _xml_loads) + +def XmlClient(*args, **kwds): + global xmlrpclib + import xmlrpc.client as xmlrpclib + return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) |