summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/connection.py')
-rw-r--r--Lib/multiprocessing/connection.py65
1 files changed, 44 insertions, 21 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index 846d396..a5dc2a8 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -94,6 +94,17 @@ def arbitrary_address(family):
else:
raise ValueError('unrecognized family')
+def _validate_family(family):
+ '''
+ Checks if the family is valid for the current environment.
+ '''
+ if sys.platform != 'win32' and family == 'AF_PIPE':
+ raise ValueError('Family %s is not recognized.' % family)
+
+ if sys.platform == 'win32' and family == 'AF_UNIX':
+ # double check
+ if not hasattr(socket, family):
+ raise ValueError('Family %s is not recognized.' % family)
def address_type(address):
'''
@@ -126,6 +137,7 @@ class Listener(object):
or default_family
address = address or arbitrary_address(family)
+ _validate_family(family)
if family == 'AF_PIPE':
self._listener = PipeListener(address, backlog)
else:
@@ -163,6 +175,7 @@ def Client(address, family=None, authkey=None):
Returns a connection to the address of a `Listener`
'''
family = family or address_type(address)
+ _validate_family(family)
if family == 'AF_PIPE':
c = PipeClient(address)
else:
@@ -186,6 +199,8 @@ if sys.platform != 'win32':
'''
if duplex:
s1, s2 = socket.socketpair()
+ s1.setblocking(True)
+ s2.setblocking(True)
c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
s1.close()
@@ -198,7 +213,6 @@ if sys.platform != 'win32':
return c1, c2
else:
-
from _multiprocessing import win32
def Pipe(duplex=True):
@@ -249,10 +263,15 @@ class SocketListener(object):
'''
def __init__(self, address, family, backlog=1):
self._socket = socket.socket(getattr(socket, family))
- self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self._socket.bind(address)
- self._socket.listen(backlog)
- self._address = self._socket.getsockname()
+ try:
+ self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self._socket.setblocking(True)
+ self._socket.bind(address)
+ self._socket.listen(backlog)
+ self._address = self._socket.getsockname()
+ except socket.error:
+ self._socket.close()
+ raise
self._family = family
self._last_accepted = None
@@ -265,6 +284,7 @@ class SocketListener(object):
def accept(self):
s, self._last_accepted = self._socket.accept()
+ s.setblocking(True)
fd = duplicate(s.fileno())
conn = _multiprocessing.Connection(fd)
s.close()
@@ -281,25 +301,25 @@ def SocketClient(address):
Return a connection object connected to the socket given by `address`
'''
family = address_type(address)
- s = socket.socket( getattr(socket, family) )
- t = _init_timeout()
+ with socket.socket( getattr(socket, family) ) as s:
+ s.setblocking(True)
+ t = _init_timeout()
- while 1:
- try:
- s.connect(address)
- except socket.error as e:
- if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
- debug('failed to connect to address %s', address)
- raise
- time.sleep(0.01)
+ while 1:
+ try:
+ s.connect(address)
+ except socket.error as e:
+ if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
+ debug('failed to connect to address %s', address)
+ raise
+ time.sleep(0.01)
+ else:
+ break
else:
- break
- else:
- raise
+ raise
- fd = duplicate(s.fileno())
+ fd = duplicate(s.fileno())
conn = _multiprocessing.Connection(fd)
- s.close()
return conn
#
@@ -344,7 +364,10 @@ if sys.platform == 'win32':
try:
win32.ConnectNamedPipe(handle, win32.NULL)
except WindowsError as e:
- if e.args[0] != win32.ERROR_PIPE_CONNECTED:
+ # ERROR_NO_DATA can occur if a client has already connected,
+ # written data and then disconnected -- see Issue 14725.
+ if e.args[0] not in (win32.ERROR_PIPE_CONNECTED,
+ win32.ERROR_NO_DATA):
raise
return _multiprocessing.PipeConnection(handle)