diff options
Diffstat (limited to 'Lib/multiprocessing/connection.py')
| -rw-r--r-- | Lib/multiprocessing/connection.py | 65 |
1 files changed, 44 insertions, 21 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 846d396..a5dc2a8 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -94,6 +94,17 @@ def arbitrary_address(family): else: raise ValueError('unrecognized family') +def _validate_family(family): + ''' + Checks if the family is valid for the current environment. + ''' + if sys.platform != 'win32' and family == 'AF_PIPE': + raise ValueError('Family %s is not recognized.' % family) + + if sys.platform == 'win32' and family == 'AF_UNIX': + # double check + if not hasattr(socket, family): + raise ValueError('Family %s is not recognized.' % family) def address_type(address): ''' @@ -126,6 +137,7 @@ class Listener(object): or default_family address = address or arbitrary_address(family) + _validate_family(family) if family == 'AF_PIPE': self._listener = PipeListener(address, backlog) else: @@ -163,6 +175,7 @@ def Client(address, family=None, authkey=None): Returns a connection to the address of a `Listener` ''' family = family or address_type(address) + _validate_family(family) if family == 'AF_PIPE': c = PipeClient(address) else: @@ -186,6 +199,8 @@ if sys.platform != 'win32': ''' if duplex: s1, s2 = socket.socketpair() + s1.setblocking(True) + s2.setblocking(True) c1 = _multiprocessing.Connection(os.dup(s1.fileno())) c2 = _multiprocessing.Connection(os.dup(s2.fileno())) s1.close() @@ -198,7 +213,6 @@ if sys.platform != 'win32': return c1, c2 else: - from _multiprocessing import win32 def Pipe(duplex=True): @@ -249,10 +263,15 @@ class SocketListener(object): ''' def __init__(self, address, family, backlog=1): self._socket = socket.socket(getattr(socket, family)) - self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self._socket.bind(address) - self._socket.listen(backlog) - self._address = self._socket.getsockname() + try: + self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._socket.setblocking(True) + self._socket.bind(address) + self._socket.listen(backlog) + self._address = self._socket.getsockname() + except socket.error: + self._socket.close() + raise self._family = family self._last_accepted = None @@ -265,6 +284,7 @@ class SocketListener(object): def accept(self): s, self._last_accepted = self._socket.accept() + s.setblocking(True) fd = duplicate(s.fileno()) conn = _multiprocessing.Connection(fd) s.close() @@ -281,25 +301,25 @@ def SocketClient(address): Return a connection object connected to the socket given by `address` ''' family = address_type(address) - s = socket.socket( getattr(socket, family) ) - t = _init_timeout() + with socket.socket( getattr(socket, family) ) as s: + s.setblocking(True) + t = _init_timeout() - while 1: - try: - s.connect(address) - except socket.error as e: - if e.args[0] != errno.ECONNREFUSED or _check_timeout(t): - debug('failed to connect to address %s', address) - raise - time.sleep(0.01) + while 1: + try: + s.connect(address) + except socket.error as e: + if e.args[0] != errno.ECONNREFUSED or _check_timeout(t): + debug('failed to connect to address %s', address) + raise + time.sleep(0.01) + else: + break else: - break - else: - raise + raise - fd = duplicate(s.fileno()) + fd = duplicate(s.fileno()) conn = _multiprocessing.Connection(fd) - s.close() return conn # @@ -344,7 +364,10 @@ if sys.platform == 'win32': try: win32.ConnectNamedPipe(handle, win32.NULL) except WindowsError as e: - if e.args[0] != win32.ERROR_PIPE_CONNECTED: + # ERROR_NO_DATA can occur if a client has already connected, + # written data and then disconnected -- see Issue 14725. + if e.args[0] not in (win32.ERROR_PIPE_CONNECTED, + win32.ERROR_NO_DATA): raise return _multiprocessing.PipeConnection(handle) |
