diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2012-03-05 18:28:37 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2012-03-05 18:28:37 (GMT) |
commit | bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973 (patch) | |
tree | 54137f9699833726def7c803cff7c995af22cfa5 /Lib/concurrent/futures | |
parent | 1e88f3faa61dbaa9ea0d2404aa8563c1eeceba54 (diff) | |
download | cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.zip cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.tar.gz cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.tar.bz2 |
Issue #12328: Fix multiprocessing's use of overlapped I/O on Windows.
Also, add a multiprocessing.connection.wait(rlist, timeout=None) function
for polling multiple objects at once. Patch by sbt.
Complete changelist from sbt's patch:
* Adds a wait(rlist, timeout=None) function for polling multiple
objects at once. On Unix this is just a wrapper for
select(rlist, [], [], timeout=None).
* Removes use of the SentinelReady exception and the sentinels argument
to certain methods. concurrent.futures.process has been changed to
use wait() instead of SentinelReady.
* Fixes bugs concerning PipeConnection.poll() and messages of zero
length.
* Fixes PipeListener.accept() to call ConnectNamedPipe() with
overlapped=True.
* Fixes Queue.empty() and SimpleQueue.empty() so that they are
threadsafe on Windows.
* Now PipeConnection.poll() and wait() will not modify the pipe except
possibly by consuming a zero length message. (Previously poll()
could consume a partial message.)
* All of multiprocesing's pipe related blocking functions/methods are
now interruptible by SIGINT on Windows.
Diffstat (limited to 'Lib/concurrent/futures')
-rw-r--r-- | Lib/concurrent/futures/process.py | 12 |
1 files changed, 8 insertions, 4 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 7f31ec2..04238a7 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -50,7 +50,8 @@ import os from concurrent.futures import _base import queue import multiprocessing -from multiprocessing.queues import SimpleQueue, SentinelReady, Full +from multiprocessing.queues import SimpleQueue, Full +from multiprocessing.connection import wait import threading import weakref @@ -212,6 +213,8 @@ def _queue_management_worker(executor_reference, for p in processes.values(): p.join() + reader = result_queue._reader + while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, @@ -219,9 +222,10 @@ def _queue_management_worker(executor_reference, sentinels = [p.sentinel for p in processes.values()] assert sentinels - try: - result_item = result_queue.get(sentinels=sentinels) - except SentinelReady: + ready = wait([reader] + sentinels) + if reader in ready: + result_item = reader.recv() + else: # Mark the process pool broken so that submits fail right now. executor = executor_reference() if executor is not None: |