summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2011-06-08 15:21:55 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2011-06-08 15:21:55 (GMT)
commitdd696496605883a44da983ad81e55a01e996a004 (patch)
tree505cc588dc520311ff2935b8804c04663b26c054 /Lib/multiprocessing
parent4a5e5de03f47b7076fc0eabc9817a59efd20049d (diff)
downloadcpython-dd696496605883a44da983ad81e55a01e996a004.zip
cpython-dd696496605883a44da983ad81e55a01e996a004.tar.gz
cpython-dd696496605883a44da983ad81e55a01e996a004.tar.bz2
Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed
children and raises BrokenProcessPool in such a situation. Previously it would reliably freeze/deadlock.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/connection.py149
-rw-r--r--Lib/multiprocessing/forking.py1
-rw-r--r--Lib/multiprocessing/queues.py6
3 files changed, 101 insertions, 55 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index 415e210..ede2908 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -48,14 +48,18 @@ import itertools
import _multiprocessing
from multiprocessing import current_process, AuthenticationError, BufferTooShort
-from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
+from multiprocessing.util import (
+ get_temp_dir, Finalize, sub_debug, debug, _eintr_retry)
try:
from _multiprocessing import win32
+ from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
except ImportError:
if sys.platform == 'win32':
raise
win32 = None
+_select = _eintr_retry(select.select)
+
#
#
#
@@ -118,6 +122,15 @@ def address_type(address):
else:
raise ValueError('address type of %r unrecognized' % address)
+
+class SentinelReady(Exception):
+ """
+ Raised when a sentinel is ready when polling.
+ """
+ def __init__(self, *args):
+ Exception.__init__(self, *args)
+ self.sentinels = args[0]
+
#
# Connection classes
#
@@ -253,19 +266,17 @@ class _ConnectionBase:
(offset + size) // itemsize])
return size
- def recv(self):
+ def recv(self, sentinels=None):
"""Receive a (picklable) object"""
self._check_closed()
self._check_readable()
- buf = self._recv_bytes()
+ buf = self._recv_bytes(sentinels=sentinels)
return pickle.loads(buf.getbuffer())
def poll(self, timeout=0.0):
"""Whether there is any input available to be read"""
self._check_closed()
self._check_readable()
- if timeout < 0.0:
- timeout = None
return self._poll(timeout)
@@ -274,61 +285,88 @@ if win32:
class PipeConnection(_ConnectionBase):
"""
Connection class based on a Windows named pipe.
+ Overlapped I/O is used, so the handles must have been created
+ with FILE_FLAG_OVERLAPPED.
"""
+ _buffered = b''
def _close(self):
win32.CloseHandle(self._handle)
def _send_bytes(self, buf):
- nwritten = win32.WriteFile(self._handle, buf)
+ overlapped = win32.WriteFile(self._handle, buf, overlapped=True)
+ nwritten, complete = overlapped.GetOverlappedResult(True)
+ assert complete
assert nwritten == len(buf)
- def _recv_bytes(self, maxsize=None):
+ def _recv_bytes(self, maxsize=None, sentinels=()):
+ if sentinels:
+ self._poll(-1.0, sentinels)
buf = io.BytesIO()
- bufsize = 512
- if maxsize is not None:
- bufsize = min(bufsize, maxsize)
- try:
- firstchunk, complete = win32.ReadFile(self._handle, bufsize)
- except IOError as e:
- if e.errno == win32.ERROR_BROKEN_PIPE:
- raise EOFError
- raise
- lenfirstchunk = len(firstchunk)
- buf.write(firstchunk)
- if complete:
- return buf
+ firstchunk = self._buffered
+ if firstchunk:
+ lenfirstchunk = len(firstchunk)
+ buf.write(firstchunk)
+ self._buffered = b''
+ else:
+ # A reasonable size for the first chunk transfer
+ bufsize = 128
+ if maxsize is not None and maxsize < bufsize:
+ bufsize = maxsize
+ try:
+ overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True)
+ lenfirstchunk, complete = overlapped.GetOverlappedResult(True)
+ firstchunk = overlapped.getbuffer()
+ assert lenfirstchunk == len(firstchunk)
+ except IOError as e:
+ if e.errno == win32.ERROR_BROKEN_PIPE:
+ raise EOFError
+ raise
+ buf.write(firstchunk)
+ if complete:
+ return buf
navail, nleft = win32.PeekNamedPipe(self._handle)
if maxsize is not None and lenfirstchunk + nleft > maxsize:
return None
- lastchunk, complete = win32.ReadFile(self._handle, nleft)
- assert complete
- buf.write(lastchunk)
+ if nleft > 0:
+ overlapped = win32.ReadFile(self._handle, nleft, overlapped=True)
+ res, complete = overlapped.GetOverlappedResult(True)
+ assert res == nleft
+ assert complete
+ buf.write(overlapped.getbuffer())
return buf
- def _poll(self, timeout):
+ def _poll(self, timeout, sentinels=()):
+ # Fast non-blocking path
navail, nleft = win32.PeekNamedPipe(self._handle)
if navail > 0:
return True
elif timeout == 0.0:
return False
- # Setup a polling loop (translated straight from old
- # pipe_connection.c)
+ # Blocking: use overlapped I/O
if timeout < 0.0:
- deadline = None
+ timeout = INFINITE
else:
- deadline = time.time() + timeout
- delay = 0.001
- max_delay = 0.02
- while True:
- time.sleep(delay)
- navail, nleft = win32.PeekNamedPipe(self._handle)
- if navail > 0:
- return True
- if deadline and time.time() > deadline:
- return False
- if delay < max_delay:
- delay += 0.001
+ timeout = int(timeout * 1000 + 0.5)
+ overlapped = win32.ReadFile(self._handle, 1, overlapped=True)
+ try:
+ handles = [overlapped.event]
+ handles += sentinels
+ res = win32.WaitForMultipleObjects(handles, False, timeout)
+ finally:
+ # Always cancel overlapped I/O in the same thread
+ # (because CancelIoEx() appears only in Vista)
+ overlapped.cancel()
+ if res == WAIT_TIMEOUT:
+ return False
+ idx = res - WAIT_OBJECT_0
+ if idx == 0:
+ # I/O was successful, store received data
+ overlapped.GetOverlappedResult(True)
+ self._buffered += overlapped.getbuffer()
+ return True
+ assert 0 < idx < len(handles)
+ raise SentinelReady([handles[idx]])
class Connection(_ConnectionBase):
@@ -357,11 +395,18 @@ class Connection(_ConnectionBase):
break
buf = buf[n:]
- def _recv(self, size, read=_read):
+ def _recv(self, size, sentinels=(), read=_read):
buf = io.BytesIO()
+ handle = self._handle
+ if sentinels:
+ handles = [handle] + sentinels
remaining = size
while remaining > 0:
- chunk = read(self._handle, remaining)
+ if sentinels:
+ r = _select(handles, [], [])[0]
+ if handle not in r:
+ raise SentinelReady(r)
+ chunk = read(handle, remaining)
n = len(chunk)
if n == 0:
if remaining == size:
@@ -381,15 +426,17 @@ class Connection(_ConnectionBase):
if n > 0:
self._send(buf)
- def _recv_bytes(self, maxsize=None):
- buf = self._recv(4)
+ def _recv_bytes(self, maxsize=None, sentinels=()):
+ buf = self._recv(4, sentinels)
size, = struct.unpack("=i", buf.getvalue())
if maxsize is not None and size > maxsize:
return None
- return self._recv(size)
+ return self._recv(size, sentinels)
def _poll(self, timeout):
- r = select.select([self._handle], [], [], timeout)[0]
+ if timeout < 0.0:
+ timeout = None
+ r = _select([self._handle], [], [], timeout)[0]
return bool(r)
@@ -495,23 +542,21 @@ else:
obsize, ibsize = 0, BUFSIZE
h1 = win32.CreateNamedPipe(
- address, openmode,
+ address, openmode | win32.FILE_FLAG_OVERLAPPED,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT,
1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
)
h2 = win32.CreateFile(
- address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+ address, access, 0, win32.NULL, win32.OPEN_EXISTING,
+ win32.FILE_FLAG_OVERLAPPED, win32.NULL
)
win32.SetNamedPipeHandleState(
h2, win32.PIPE_READMODE_MESSAGE, None, None
)
- try:
- win32.ConnectNamedPipe(h1, win32.NULL)
- except WindowsError as e:
- if e.args[0] != win32.ERROR_PIPE_CONNECTED:
- raise
+ overlapped = win32.ConnectNamedPipe(h1, overlapped=True)
+ overlapped.GetOverlappedResult(True)
c1 = PipeConnection(h1, writable=duplex)
c2 = PipeConnection(h2, readable=duplex)
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
index 3c359cb..a2c61ef 100644
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -35,6 +35,7 @@
import os
import sys
import signal
+import select
from multiprocessing import util, process
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 3280a25..3324363 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -44,7 +44,7 @@ import weakref
from queue import Empty, Full
import _multiprocessing
-from multiprocessing import Pipe
+from multiprocessing.connection import Pipe, SentinelReady
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
from multiprocessing.util import debug, info, Finalize, register_after_fork
from multiprocessing.forking import assert_spawning
@@ -372,10 +372,10 @@ class SimpleQueue(object):
def _make_methods(self):
recv = self._reader.recv
racquire, rrelease = self._rlock.acquire, self._rlock.release
- def get():
+ def get(*, sentinels=None):
racquire()
try:
- return recv()
+ return recv(sentinels)
finally:
rrelease()
self.get = get