summaryrefslogtreecommitdiffstats
path: root/Lib/concurrent/futures
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2012-03-05 18:28:37 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2012-03-05 18:28:37 (GMT)
commitbdb1cf1ca56db25b33fb15dd91eef2cc32cd8973 (patch)
tree54137f9699833726def7c803cff7c995af22cfa5 /Lib/concurrent/futures
parent1e88f3faa61dbaa9ea0d2404aa8563c1eeceba54 (diff)
downloadcpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.zip
cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.tar.gz
cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.tar.bz2
Issue #12328: Fix multiprocessing's use of overlapped I/O on Windows.
Also, add a multiprocessing.connection.wait(rlist, timeout=None) function for polling multiple objects at once. Patch by sbt. Complete changelist from sbt's patch: * Adds a wait(rlist, timeout=None) function for polling multiple objects at once. On Unix this is just a wrapper for select(rlist, [], [], timeout=None). * Removes use of the SentinelReady exception and the sentinels argument to certain methods. concurrent.futures.process has been changed to use wait() instead of SentinelReady. * Fixes bugs concerning PipeConnection.poll() and messages of zero length. * Fixes PipeListener.accept() to call ConnectNamedPipe() with overlapped=True. * Fixes Queue.empty() and SimpleQueue.empty() so that they are threadsafe on Windows. * Now PipeConnection.poll() and wait() will not modify the pipe except possibly by consuming a zero length message. (Previously poll() could consume a partial message.) * All of multiprocesing's pipe related blocking functions/methods are now interruptible by SIGINT on Windows.
Diffstat (limited to 'Lib/concurrent/futures')
-rw-r--r--Lib/concurrent/futures/process.py12
1 files changed, 8 insertions, 4 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 7f31ec2..04238a7 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -50,7 +50,8 @@ import os
from concurrent.futures import _base
import queue
import multiprocessing
-from multiprocessing.queues import SimpleQueue, SentinelReady, Full
+from multiprocessing.queues import SimpleQueue, Full
+from multiprocessing.connection import wait
import threading
import weakref
@@ -212,6 +213,8 @@ def _queue_management_worker(executor_reference,
for p in processes.values():
p.join()
+ reader = result_queue._reader
+
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
@@ -219,9 +222,10 @@ def _queue_management_worker(executor_reference,
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
- try:
- result_item = result_queue.get(sentinels=sentinels)
- except SentinelReady:
+ ready = wait([reader] + sentinels)
+ if reader in ready:
+ result_item = reader.recv()
+ else:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None: