summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2011-06-08 15:21:55 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2011-06-08 15:21:55 (GMT)
commitdd696496605883a44da983ad81e55a01e996a004 (patch)
tree505cc588dc520311ff2935b8804c04663b26c054
parent4a5e5de03f47b7076fc0eabc9817a59efd20049d (diff)
downloadcpython-dd696496605883a44da983ad81e55a01e996a004.zip
cpython-dd696496605883a44da983ad81e55a01e996a004.tar.gz
cpython-dd696496605883a44da983ad81e55a01e996a004.tar.bz2
Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed
children and raises BrokenProcessPool in such a situation. Previously it would reliably freeze/deadlock.
-rw-r--r--Doc/library/concurrent.futures.rst19
-rw-r--r--Lib/concurrent/futures/process.py105
-rw-r--r--Lib/multiprocessing/connection.py149
-rw-r--r--Lib/multiprocessing/forking.py1
-rw-r--r--Lib/multiprocessing/queues.py6
-rw-r--r--Lib/test/test_concurrent_futures.py20
-rw-r--r--Misc/NEWS4
-rw-r--r--Modules/_multiprocessing/win32_functions.c390
8 files changed, 587 insertions, 107 deletions
diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst
index e5d13f3..3bd4531 100644
--- a/Doc/library/concurrent.futures.rst
+++ b/Doc/library/concurrent.futures.rst
@@ -169,6 +169,12 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
of at most *max_workers* processes. If *max_workers* is ``None`` or not
given, it will default to the number of processors on the machine.
+ .. versionchanged:: 3.3
+ When one of the worker processes terminates abruptly, a
+ :exc:`BrokenProcessPool` error is now raised. Previously, behaviour
+ was undefined but operations on the executor or its futures would often
+ freeze or deadlock.
+
.. _processpoolexecutor-example:
@@ -369,3 +375,16 @@ Module Functions
:pep:`3148` -- futures - execute computations asynchronously
The proposal which described this feature for inclusion in the Python
standard library.
+
+
+Exception classes
+-----------------
+
+.. exception:: BrokenProcessPool
+
+ Derived from :exc:`RuntimeError`, this exception class is raised when
+ one of the workers of a :class:`ProcessPoolExecutor` has terminated
+ in a non-clean fashion (for example, if it was killed from the outside).
+
+ .. versionadded:: 3.3
+
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index f0bf6d5..c2331e7 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -46,10 +46,11 @@ Process #1..n:
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
import atexit
+import os
from concurrent.futures import _base
import queue
import multiprocessing
-from multiprocessing.queues import SimpleQueue
+from multiprocessing.queues import SimpleQueue, SentinelReady
import threading
import weakref
@@ -122,7 +123,7 @@ def _process_worker(call_queue, result_queue):
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
- result_queue.put(None)
+ result_queue.put(os.getpid())
return
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
@@ -194,29 +195,63 @@ def _queue_management_worker(executor_reference,
result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers.
"""
- nb_shutdown_processes = 0
- def shutdown_one_process():
- """Tell a worker to terminate, which will in turn wake us again"""
- nonlocal nb_shutdown_processes
- call_queue.put(None)
- nb_shutdown_processes += 1
+
+ def shutdown_worker():
+ # This is an upper bound
+ nb_children_alive = sum(p.is_alive() for p in processes.values())
+ for i in range(0, nb_children_alive):
+ call_queue.put(None)
+ # If .join() is not called on the created processes then
+ # some multiprocessing.Queue methods may deadlock on Mac OS
+ # X.
+ for p in processes.values():
+ p.join()
+
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
- result_item = result_queue.get()
- if result_item is not None:
- work_item = pending_work_items[result_item.work_id]
- del pending_work_items[result_item.work_id]
-
- if result_item.exception:
- work_item.future.set_exception(result_item.exception)
- else:
- work_item.future.set_result(result_item.result)
- continue
- # If we come here, we either got a timeout or were explicitly woken up.
- # In either case, check whether we should start shutting down.
+ sentinels = [p.sentinel for p in processes.values()]
+ assert sentinels
+ try:
+ result_item = result_queue.get(sentinels=sentinels)
+ except SentinelReady as e:
+ # Mark the process pool broken so that submits fail right now.
+ executor = executor_reference()
+ if executor is not None:
+ executor._broken = True
+ executor._shutdown_thread = True
+ del executor
+ # All futures in flight must be marked failed
+ for work_id, work_item in pending_work_items.items():
+ work_item.future.set_exception(
+ BrokenProcessPool(
+ "A process in the process pool was "
+ "terminated abruptly while the future was "
+ "running or pending."
+ ))
+ pending_work_items.clear()
+ # Terminate remaining workers forcibly: the queues or their
+ # locks may be in a dirty state and block forever.
+ for p in processes.values():
+ p.terminate()
+ for p in processes.values():
+ p.join()
+ return
+ if isinstance(result_item, int):
+ # Clean shutdown of a worker using its PID
+ # (avoids marking the executor broken)
+ del processes[result_item]
+ elif result_item is not None:
+ work_item = pending_work_items.pop(result_item.work_id, None)
+ # work_item can be None if another process terminated (see above)
+ if work_item is not None:
+ if result_item.exception:
+ work_item.future.set_exception(result_item.exception)
+ else:
+ work_item.future.set_result(result_item.result)
+ # Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
@@ -226,17 +261,11 @@ def _queue_management_worker(executor_reference,
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
- while nb_shutdown_processes < len(processes):
- shutdown_one_process()
- # If .join() is not called on the created processes then
- # some multiprocessing.Queue methods may deadlock on Mac OS
- # X.
- for p in processes:
- p.join()
+ shutdown_worker()
return
else:
# Start shutting down by telling a process it can exit.
- shutdown_one_process()
+ call_queue.put(None)
del executor
_system_limits_checked = False
@@ -264,6 +293,14 @@ def _check_system_limits():
_system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
raise NotImplementedError(_system_limited)
+
+class BrokenProcessPool(RuntimeError):
+ """
+ Raised when a process in a ProcessPoolExecutor terminated abruptly
+ while a future was in the running state.
+ """
+
+
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""Initializes a new ProcessPoolExecutor instance.
@@ -288,11 +325,13 @@ class ProcessPoolExecutor(_base.Executor):
self._result_queue = SimpleQueue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
- self._processes = set()
+ # Map of pids to processes
+ self._processes = {}
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
+ self._broken = False
self._queue_count = 0
self._pending_work_items = {}
@@ -302,6 +341,8 @@ class ProcessPoolExecutor(_base.Executor):
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None:
+ # Start the processes so that their sentinels are known.
+ self._adjust_process_count()
self._queue_management_thread = threading.Thread(
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
@@ -321,10 +362,13 @@ class ProcessPoolExecutor(_base.Executor):
args=(self._call_queue,
self._result_queue))
p.start()
- self._processes.add(p)
+ self._processes[p.pid] = p
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
+ if self._broken:
+ raise BrokenProcessPool('A child process terminated '
+ 'abruptly, the process pool is not usable anymore')
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
@@ -338,7 +382,6 @@ class ProcessPoolExecutor(_base.Executor):
self._result_queue.put(None)
self._start_queue_management_thread()
- self._adjust_process_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index 415e210..ede2908 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -48,14 +48,18 @@ import itertools
import _multiprocessing
from multiprocessing import current_process, AuthenticationError, BufferTooShort
-from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
+from multiprocessing.util import (
+ get_temp_dir, Finalize, sub_debug, debug, _eintr_retry)
try:
from _multiprocessing import win32
+ from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
except ImportError:
if sys.platform == 'win32':
raise
win32 = None
+_select = _eintr_retry(select.select)
+
#
#
#
@@ -118,6 +122,15 @@ def address_type(address):
else:
raise ValueError('address type of %r unrecognized' % address)
+
+class SentinelReady(Exception):
+ """
+ Raised when a sentinel is ready when polling.
+ """
+ def __init__(self, *args):
+ Exception.__init__(self, *args)
+ self.sentinels = args[0]
+
#
# Connection classes
#
@@ -253,19 +266,17 @@ class _ConnectionBase:
(offset + size) // itemsize])
return size
- def recv(self):
+ def recv(self, sentinels=None):
"""Receive a (picklable) object"""
self._check_closed()
self._check_readable()
- buf = self._recv_bytes()
+ buf = self._recv_bytes(sentinels=sentinels)
return pickle.loads(buf.getbuffer())
def poll(self, timeout=0.0):
"""Whether there is any input available to be read"""
self._check_closed()
self._check_readable()
- if timeout < 0.0:
- timeout = None
return self._poll(timeout)
@@ -274,61 +285,88 @@ if win32:
class PipeConnection(_ConnectionBase):
"""
Connection class based on a Windows named pipe.
+ Overlapped I/O is used, so the handles must have been created
+ with FILE_FLAG_OVERLAPPED.
"""
+ _buffered = b''
def _close(self):
win32.CloseHandle(self._handle)
def _send_bytes(self, buf):
- nwritten = win32.WriteFile(self._handle, buf)
+ overlapped = win32.WriteFile(self._handle, buf, overlapped=True)
+ nwritten, complete = overlapped.GetOverlappedResult(True)
+ assert complete
assert nwritten == len(buf)
- def _recv_bytes(self, maxsize=None):
+ def _recv_bytes(self, maxsize=None, sentinels=()):
+ if sentinels:
+ self._poll(-1.0, sentinels)
buf = io.BytesIO()
- bufsize = 512
- if maxsize is not None:
- bufsize = min(bufsize, maxsize)
- try:
- firstchunk, complete = win32.ReadFile(self._handle, bufsize)
- except IOError as e:
- if e.errno == win32.ERROR_BROKEN_PIPE:
- raise EOFError
- raise
- lenfirstchunk = len(firstchunk)
- buf.write(firstchunk)
- if complete:
- return buf
+ firstchunk = self._buffered
+ if firstchunk:
+ lenfirstchunk = len(firstchunk)
+ buf.write(firstchunk)
+ self._buffered = b''
+ else:
+ # A reasonable size for the first chunk transfer
+ bufsize = 128
+ if maxsize is not None and maxsize < bufsize:
+ bufsize = maxsize
+ try:
+ overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True)
+ lenfirstchunk, complete = overlapped.GetOverlappedResult(True)
+ firstchunk = overlapped.getbuffer()
+ assert lenfirstchunk == len(firstchunk)
+ except IOError as e:
+ if e.errno == win32.ERROR_BROKEN_PIPE:
+ raise EOFError
+ raise
+ buf.write(firstchunk)
+ if complete:
+ return buf
navail, nleft = win32.PeekNamedPipe(self._handle)
if maxsize is not None and lenfirstchunk + nleft > maxsize:
return None
- lastchunk, complete = win32.ReadFile(self._handle, nleft)
- assert complete
- buf.write(lastchunk)
+ if nleft > 0:
+ overlapped = win32.ReadFile(self._handle, nleft, overlapped=True)
+ res, complete = overlapped.GetOverlappedResult(True)
+ assert res == nleft
+ assert complete
+ buf.write(overlapped.getbuffer())
return buf
- def _poll(self, timeout):
+ def _poll(self, timeout, sentinels=()):
+ # Fast non-blocking path
navail, nleft = win32.PeekNamedPipe(self._handle)
if navail > 0:
return True
elif timeout == 0.0:
return False
- # Setup a polling loop (translated straight from old
- # pipe_connection.c)
+ # Blocking: use overlapped I/O
if timeout < 0.0:
- deadline = None
+ timeout = INFINITE
else:
- deadline = time.time() + timeout
- delay = 0.001
- max_delay = 0.02
- while True:
- time.sleep(delay)
- navail, nleft = win32.PeekNamedPipe(self._handle)
- if navail > 0:
- return True
- if deadline and time.time() > deadline:
- return False
- if delay < max_delay:
- delay += 0.001
+ timeout = int(timeout * 1000 + 0.5)
+ overlapped = win32.ReadFile(self._handle, 1, overlapped=True)
+ try:
+ handles = [overlapped.event]
+ handles += sentinels
+ res = win32.WaitForMultipleObjects(handles, False, timeout)
+ finally:
+ # Always cancel overlapped I/O in the same thread
+ # (because CancelIoEx() appears only in Vista)
+ overlapped.cancel()
+ if res == WAIT_TIMEOUT:
+ return False
+ idx = res - WAIT_OBJECT_0
+ if idx == 0:
+ # I/O was successful, store received data
+ overlapped.GetOverlappedResult(True)
+ self._buffered += overlapped.getbuffer()
+ return True
+ assert 0 < idx < len(handles)
+ raise SentinelReady([handles[idx]])
class Connection(_ConnectionBase):
@@ -357,11 +395,18 @@ class Connection(_ConnectionBase):
break
buf = buf[n:]
- def _recv(self, size, read=_read):
+ def _recv(self, size, sentinels=(), read=_read):
buf = io.BytesIO()
+ handle = self._handle
+ if sentinels:
+ handles = [handle] + sentinels
remaining = size
while remaining > 0:
- chunk = read(self._handle, remaining)
+ if sentinels:
+ r = _select(handles, [], [])[0]
+ if handle not in r:
+ raise SentinelReady(r)
+ chunk = read(handle, remaining)
n = len(chunk)
if n == 0:
if remaining == size:
@@ -381,15 +426,17 @@ class Connection(_ConnectionBase):
if n > 0:
self._send(buf)
- def _recv_bytes(self, maxsize=None):
- buf = self._recv(4)
+ def _recv_bytes(self, maxsize=None, sentinels=()):
+ buf = self._recv(4, sentinels)
size, = struct.unpack("=i", buf.getvalue())
if maxsize is not None and size > maxsize:
return None
- return self._recv(size)
+ return self._recv(size, sentinels)
def _poll(self, timeout):
- r = select.select([self._handle], [], [], timeout)[0]
+ if timeout < 0.0:
+ timeout = None
+ r = _select([self._handle], [], [], timeout)[0]
return bool(r)
@@ -495,23 +542,21 @@ else:
obsize, ibsize = 0, BUFSIZE
h1 = win32.CreateNamedPipe(
- address, openmode,
+ address, openmode | win32.FILE_FLAG_OVERLAPPED,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT,
1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
)
h2 = win32.CreateFile(
- address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+ address, access, 0, win32.NULL, win32.OPEN_EXISTING,
+ win32.FILE_FLAG_OVERLAPPED, win32.NULL
)
win32.SetNamedPipeHandleState(
h2, win32.PIPE_READMODE_MESSAGE, None, None
)
- try:
- win32.ConnectNamedPipe(h1, win32.NULL)
- except WindowsError as e:
- if e.args[0] != win32.ERROR_PIPE_CONNECTED:
- raise
+ overlapped = win32.ConnectNamedPipe(h1, overlapped=True)
+ overlapped.GetOverlappedResult(True)
c1 = PipeConnection(h1, writable=duplex)
c2 = PipeConnection(h2, readable=duplex)
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
index 3c359cb..a2c61ef 100644
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -35,6 +35,7 @@
import os
import sys
import signal
+import select
from multiprocessing import util, process
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 3280a25..3324363 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -44,7 +44,7 @@ import weakref
from queue import Empty, Full
import _multiprocessing
-from multiprocessing import Pipe
+from multiprocessing.connection import Pipe, SentinelReady
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
from multiprocessing.util import debug, info, Finalize, register_after_fork
from multiprocessing.forking import assert_spawning
@@ -372,10 +372,10 @@ class SimpleQueue(object):
def _make_methods(self):
recv = self._reader.recv
racquire, rrelease = self._rlock.acquire, self._rlock.release
- def get():
+ def get(*, sentinels=None):
racquire()
try:
- return recv()
+ return recv(sentinels)
finally:
rrelease()
self.get = get
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 7457f39..5968980 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -19,7 +19,7 @@ import unittest
from concurrent import futures
from concurrent.futures._base import (
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
-import concurrent.futures.process
+from concurrent.futures.process import BrokenProcessPool
def create_future(state=PENDING, exception=None, result=None):
@@ -154,7 +154,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
processes = self.executor._processes
self.executor.shutdown()
- for p in processes:
+ for p in processes.values():
p.join()
def test_context_manager_shutdown(self):
@@ -163,7 +163,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
- for p in processes:
+ for p in processes.values():
p.join()
def test_del_shutdown(self):
@@ -174,7 +174,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
del executor
queue_management_thread.join()
- for p in processes:
+ for p in processes.values():
p.join()
class WaitTests(unittest.TestCase):
@@ -381,7 +381,17 @@ class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
- pass
+ def test_killed_child(self):
+ # When a child process is abruptly terminated, the whole pool gets
+ # "broken".
+ futures = [self.executor.submit(time.sleep, 3)]
+ # Get one of the processes, and terminate (kill) it
+ p = next(iter(self.executor._processes.values()))
+ p.terminate()
+ for fut in futures:
+ self.assertRaises(BrokenProcessPool, fut.result)
+ # Submitting other jobs fails as well.
+ self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
class FutureTests(unittest.TestCase):
diff --git a/Misc/NEWS b/Misc/NEWS
index 874498b..8a70ce9 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -187,6 +187,10 @@ Core and Builtins
Library
-------
+- Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed
+ children and raises BrokenProcessPool in such a situation. Previously it
+ would reliably freeze/deadlock.
+
- Issue #12040: Expose a new attribute ``sentinel`` on instances of
:class:`multiprocessing.Process`. Also, fix Process.join() to not use
polling anymore, when given a timeout.
diff --git a/Modules/_multiprocessing/win32_functions.c b/Modules/_multiprocessing/win32_functions.c
index 12dc0cd..c017b2a 100644
--- a/Modules/_multiprocessing/win32_functions.c
+++ b/Modules/_multiprocessing/win32_functions.c
@@ -12,10 +12,223 @@
#define WIN32_FUNCTION(func) \
{#func, (PyCFunction)win32_ ## func, METH_VARARGS | METH_STATIC, ""}
+#define WIN32_KWARGS_FUNCTION(func) \
+ {#func, (PyCFunction)win32_ ## func, METH_VARARGS | METH_KEYWORDS | METH_STATIC, ""}
+
#define WIN32_CONSTANT(fmt, con) \
PyDict_SetItemString(Win32Type.tp_dict, #con, Py_BuildValue(fmt, con))
+/* Grab CancelIoEx dynamically from kernel32 */
+static int has_CancelIoEx = -1;
+static BOOL (CALLBACK *Py_CancelIoEx)(HANDLE, LPOVERLAPPED);
+
+static int
+check_CancelIoEx()
+{
+ if (has_CancelIoEx == -1)
+ {
+ HINSTANCE hKernel32 = GetModuleHandle("KERNEL32");
+ * (FARPROC *) &Py_CancelIoEx = GetProcAddress(hKernel32,
+ "CancelIoEx");
+ has_CancelIoEx = (Py_CancelIoEx != NULL);
+ }
+ return has_CancelIoEx;
+}
+
+
+/*
+ * A Python object wrapping an OVERLAPPED structure and other useful data
+ * for overlapped I/O
+ */
+
+typedef struct {
+ PyObject_HEAD
+ OVERLAPPED overlapped;
+ /* For convenience, we store the file handle too */
+ HANDLE handle;
+ /* Whether there's I/O in flight */
+ int pending;
+ /* Whether I/O completed successfully */
+ int completed;
+ /* Buffer used for reading (optional) */
+ PyObject *read_buffer;
+ /* Buffer used for writing (optional) */
+ Py_buffer write_buffer;
+} OverlappedObject;
+
+static void
+overlapped_dealloc(OverlappedObject *self)
+{
+ int err = GetLastError();
+ if (self->pending) {
+ if (check_CancelIoEx())
+ Py_CancelIoEx(self->handle, &self->overlapped);
+ else {
+ PyErr_SetString(PyExc_RuntimeError,
+ "I/O operations still in flight while destroying "
+ "Overlapped object, the process may crash");
+ PyErr_WriteUnraisable(NULL);
+ }
+ }
+ CloseHandle(self->overlapped.hEvent);
+ SetLastError(err);
+ if (self->write_buffer.obj)
+ PyBuffer_Release(&self->write_buffer);
+ Py_CLEAR(self->read_buffer);
+ PyObject_Del(self);
+}
+
+static PyObject *
+overlapped_GetOverlappedResult(OverlappedObject *self, PyObject *waitobj)
+{
+ int wait;
+ BOOL res;
+ DWORD transferred = 0;
+
+ wait = PyObject_IsTrue(waitobj);
+ if (wait < 0)
+ return NULL;
+ Py_BEGIN_ALLOW_THREADS
+ res = GetOverlappedResult(self->handle, &self->overlapped, &transferred,
+ wait != 0);
+ Py_END_ALLOW_THREADS
+
+ if (!res) {
+ int err = GetLastError();
+ if (err == ERROR_IO_INCOMPLETE)
+ Py_RETURN_NONE;
+ if (err != ERROR_MORE_DATA) {
+ self->pending = 0;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+ }
+ self->pending = 0;
+ self->completed = 1;
+ if (self->read_buffer) {
+ assert(PyBytes_CheckExact(self->read_buffer));
+ if (_PyBytes_Resize(&self->read_buffer, transferred))
+ return NULL;
+ }
+ return Py_BuildValue("lN", (long) transferred, PyBool_FromLong(res));
+}
+
+static PyObject *
+overlapped_getbuffer(OverlappedObject *self)
+{
+ PyObject *res;
+ if (!self->completed) {
+ PyErr_SetString(PyExc_ValueError,
+ "can't get read buffer before GetOverlappedResult() "
+ "signals the operation completed");
+ return NULL;
+ }
+ res = self->read_buffer ? self->read_buffer : Py_None;
+ Py_INCREF(res);
+ return res;
+}
+
+static PyObject *
+overlapped_cancel(OverlappedObject *self)
+{
+ BOOL res = TRUE;
+
+ if (self->pending) {
+ Py_BEGIN_ALLOW_THREADS
+ if (check_CancelIoEx())
+ res = Py_CancelIoEx(self->handle, &self->overlapped);
+ else
+ res = CancelIo(self->handle);
+ Py_END_ALLOW_THREADS
+ }
+
+ /* CancelIoEx returns ERROR_NOT_FOUND if the I/O completed in-between */
+ if (!res && GetLastError() != ERROR_NOT_FOUND)
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+ self->pending = 0;
+ Py_RETURN_NONE;
+}
+
+static PyMethodDef overlapped_methods[] = {
+ {"GetOverlappedResult", (PyCFunction) overlapped_GetOverlappedResult,
+ METH_O, NULL},
+ {"getbuffer", (PyCFunction) overlapped_getbuffer, METH_NOARGS, NULL},
+ {"cancel", (PyCFunction) overlapped_cancel, METH_NOARGS, NULL},
+ {NULL}
+};
+
+static PyMemberDef overlapped_members[] = {
+ {"event", T_HANDLE,
+ offsetof(OverlappedObject, overlapped) + offsetof(OVERLAPPED, hEvent),
+ READONLY, "overlapped event handle"},
+ {NULL}
+};
+
+PyTypeObject OverlappedType = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ /* tp_name */ "_multiprocessing.win32.Overlapped",
+ /* tp_basicsize */ sizeof(OverlappedObject),
+ /* tp_itemsize */ 0,
+ /* tp_dealloc */ (destructor) overlapped_dealloc,
+ /* tp_print */ 0,
+ /* tp_getattr */ 0,
+ /* tp_setattr */ 0,
+ /* tp_reserved */ 0,
+ /* tp_repr */ 0,
+ /* tp_as_number */ 0,
+ /* tp_as_sequence */ 0,
+ /* tp_as_mapping */ 0,
+ /* tp_hash */ 0,
+ /* tp_call */ 0,
+ /* tp_str */ 0,
+ /* tp_getattro */ 0,
+ /* tp_setattro */ 0,
+ /* tp_as_buffer */ 0,
+ /* tp_flags */ Py_TPFLAGS_DEFAULT,
+ /* tp_doc */ "OVERLAPPED structure wrapper",
+ /* tp_traverse */ 0,
+ /* tp_clear */ 0,
+ /* tp_richcompare */ 0,
+ /* tp_weaklistoffset */ 0,
+ /* tp_iter */ 0,
+ /* tp_iternext */ 0,
+ /* tp_methods */ overlapped_methods,
+ /* tp_members */ overlapped_members,
+ /* tp_getset */ 0,
+ /* tp_base */ 0,
+ /* tp_dict */ 0,
+ /* tp_descr_get */ 0,
+ /* tp_descr_set */ 0,
+ /* tp_dictoffset */ 0,
+ /* tp_init */ 0,
+ /* tp_alloc */ 0,
+ /* tp_new */ 0,
+};
+
+static OverlappedObject *
+new_overlapped(HANDLE handle)
+{
+ OverlappedObject *self;
+
+ self = PyObject_New(OverlappedObject, &OverlappedType);
+ if (!self)
+ return NULL;
+ self->handle = handle;
+ self->read_buffer = NULL;
+ self->pending = 0;
+ self->completed = 0;
+ memset(&self->overlapped, 0, sizeof(OVERLAPPED));
+ memset(&self->write_buffer, 0, sizeof(Py_buffer));
+ /* Manual reset, initially non-signalled */
+ self->overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+ return self;
+}
+
+
+/*
+ * Module functions
+ */
+
static PyObject *
win32_CloseHandle(PyObject *self, PyObject *args)
{
@@ -36,20 +249,44 @@ win32_CloseHandle(PyObject *self, PyObject *args)
}
static PyObject *
-win32_ConnectNamedPipe(PyObject *self, PyObject *args)
+win32_ConnectNamedPipe(PyObject *self, PyObject *args, PyObject *kwds)
{
HANDLE hNamedPipe;
- LPOVERLAPPED lpOverlapped;
+ int use_overlapped = 0;
BOOL success;
+ OverlappedObject *overlapped = NULL;
+ static char *kwlist[] = {"handle", "overlapped", NULL};
- if (!PyArg_ParseTuple(args, F_HANDLE F_POINTER,
- &hNamedPipe, &lpOverlapped))
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ F_HANDLE "|i", kwlist,
+ &hNamedPipe, &use_overlapped))
return NULL;
+ if (use_overlapped) {
+ overlapped = new_overlapped(hNamedPipe);
+ if (!overlapped)
+ return NULL;
+ }
+
Py_BEGIN_ALLOW_THREADS
- success = ConnectNamedPipe(hNamedPipe, lpOverlapped);
+ success = ConnectNamedPipe(hNamedPipe,
+ overlapped ? &overlapped->overlapped : NULL);
Py_END_ALLOW_THREADS
+ if (overlapped) {
+ int err = GetLastError();
+ /* Overlapped ConnectNamedPipe never returns a success code */
+ assert(success == 0);
+ if (err == ERROR_IO_PENDING)
+ overlapped->pending = 1;
+ else if (err == ERROR_PIPE_CONNECTED)
+ SetEvent(overlapped->overlapped.hEvent);
+ else {
+ Py_DECREF(overlapped);
+ return PyErr_SetFromWindowsErr(err);
+ }
+ return (PyObject *) overlapped;
+ }
if (!success)
return PyErr_SetFromWindowsErr(0);
@@ -280,46 +517,109 @@ win32_send(PyObject *self, PyObject *args)
}
static PyObject *
-win32_WriteFile(PyObject *self, PyObject *args)
+win32_WriteFile(PyObject *self, PyObject *args, PyObject *kwds)
{
HANDLE handle;
- Py_buffer buf;
+ Py_buffer _buf, *buf;
+ PyObject *bufobj;
int written;
BOOL ret;
+ int use_overlapped = 0;
+ OverlappedObject *overlapped = NULL;
+ static char *kwlist[] = {"handle", "buffer", "overlapped", NULL};
+
+ /* First get handle and use_overlapped to know which Py_buffer to use */
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ F_HANDLE "O|i:WriteFile", kwlist,
+ &handle, &bufobj, &use_overlapped))
+ return NULL;
+
+ if (use_overlapped) {
+ overlapped = new_overlapped(handle);
+ if (!overlapped)
+ return NULL;
+ buf = &overlapped->write_buffer;
+ }
+ else
+ buf = &_buf;
- if (!PyArg_ParseTuple(args, F_HANDLE "y*:WriteFile" , &handle, &buf))
+ if (!PyArg_Parse(bufobj, "y*", buf)) {
+ Py_XDECREF(overlapped);
return NULL;
+ }
Py_BEGIN_ALLOW_THREADS
- ret = WriteFile(handle, buf.buf, buf.len, &written, NULL);
+ ret = WriteFile(handle, buf->buf, buf->len, &written,
+ overlapped ? &overlapped->overlapped : NULL);
Py_END_ALLOW_THREADS
- PyBuffer_Release(&buf);
+ if (overlapped) {
+ int err = GetLastError();
+ if (!ret) {
+ if (err == ERROR_IO_PENDING)
+ overlapped->pending = 1;
+ else {
+ Py_DECREF(overlapped);
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+ }
+ }
+ return (PyObject *) overlapped;
+ }
+
+ PyBuffer_Release(buf);
if (!ret)
return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
return PyLong_FromLong(written);
}
static PyObject *
-win32_ReadFile(PyObject *self, PyObject *args)
+win32_ReadFile(PyObject *self, PyObject *args, PyObject *kwds)
{
HANDLE handle;
int size;
DWORD nread;
PyObject *buf;
BOOL ret;
+ int use_overlapped = 0;
+ OverlappedObject *overlapped = NULL;
+ static char *kwlist[] = {"handle", "size", "overlapped", NULL};
- if (!PyArg_ParseTuple(args, F_HANDLE "i:ReadFile" , &handle, &size))
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ F_HANDLE "i|i:ReadFile", kwlist,
+ &handle, &size, &use_overlapped))
return NULL;
buf = PyBytes_FromStringAndSize(NULL, size);
if (!buf)
return NULL;
+ if (use_overlapped) {
+ overlapped = new_overlapped(handle);
+ if (!overlapped) {
+ Py_DECREF(buf);
+ return NULL;
+ }
+ /* Steals reference to buf */
+ overlapped->read_buffer = buf;
+ }
Py_BEGIN_ALLOW_THREADS
- ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread, NULL);
+ ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread,
+ overlapped ? &overlapped->overlapped : NULL);
Py_END_ALLOW_THREADS
+ if (overlapped) {
+ int err = GetLastError();
+ if (!ret) {
+ if (err == ERROR_IO_PENDING)
+ overlapped->pending = 1;
+ else if (err != ERROR_MORE_DATA) {
+ Py_DECREF(overlapped);
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+ }
+ }
+ return (PyObject *) overlapped;
+ }
+
if (!ret && GetLastError() != ERROR_MORE_DATA) {
Py_DECREF(buf);
return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
@@ -373,19 +673,71 @@ win32_PeekNamedPipe(PyObject *self, PyObject *args)
}
}
+static PyObject *
+win32_WaitForMultipleObjects(PyObject* self, PyObject* args)
+{
+ DWORD result;
+ PyObject *handle_seq;
+ HANDLE handles[MAXIMUM_WAIT_OBJECTS];
+ Py_ssize_t nhandles, i;
+ int wait_flag;
+ int milliseconds = INFINITE;
+
+ if (!PyArg_ParseTuple(args, "Oi|i:WaitForMultipleObjects",
+ &handle_seq, &wait_flag, &milliseconds))
+ return NULL;
+
+ if (!PySequence_Check(handle_seq)) {
+ PyErr_Format(PyExc_TypeError,
+ "sequence type expected, got '%s'",
+ Py_TYPE(handle_seq)->tp_doc);
+ return NULL;
+ }
+ nhandles = PySequence_Length(handle_seq);
+ if (nhandles == -1)
+ return NULL;
+ if (nhandles < 0 || nhandles >= MAXIMUM_WAIT_OBJECTS) {
+ PyErr_Format(PyExc_ValueError,
+ "need at most %zd handles, got a sequence of length %zd",
+ MAXIMUM_WAIT_OBJECTS, nhandles);
+ return NULL;
+ }
+ for (i = 0; i < nhandles; i++) {
+ HANDLE h;
+ PyObject *v = PySequence_GetItem(handle_seq, i);
+ if (v == NULL)
+ return NULL;
+ if (!PyArg_Parse(v, F_HANDLE, &h))
+ return NULL;
+ handles[i] = h;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ result = WaitForMultipleObjects((DWORD) nhandles, handles,
+ (BOOL) wait_flag, (DWORD) milliseconds);
+ Py_END_ALLOW_THREADS
+
+ if (result == WAIT_FAILED)
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+
+ return PyLong_FromLong((int) result);
+}
+
+
static PyMethodDef win32_methods[] = {
WIN32_FUNCTION(CloseHandle),
WIN32_FUNCTION(GetLastError),
WIN32_FUNCTION(OpenProcess),
WIN32_FUNCTION(ExitProcess),
- WIN32_FUNCTION(ConnectNamedPipe),
+ WIN32_KWARGS_FUNCTION(ConnectNamedPipe),
WIN32_FUNCTION(CreateFile),
WIN32_FUNCTION(CreateNamedPipe),
- WIN32_FUNCTION(ReadFile),
+ WIN32_KWARGS_FUNCTION(ReadFile),
WIN32_FUNCTION(PeekNamedPipe),
WIN32_FUNCTION(SetNamedPipeHandleState),
+ WIN32_FUNCTION(WaitForMultipleObjects),
WIN32_FUNCTION(WaitNamedPipe),
- WIN32_FUNCTION(WriteFile),
+ WIN32_KWARGS_FUNCTION(WriteFile),
WIN32_FUNCTION(closesocket),
WIN32_FUNCTION(recv),
WIN32_FUNCTION(send),
@@ -407,12 +759,18 @@ create_win32_namespace(void)
return NULL;
Py_INCREF(&Win32Type);
+ if (PyType_Ready(&OverlappedType) < 0)
+ return NULL;
+ PyDict_SetItemString(Win32Type.tp_dict, "Overlapped",
+ (PyObject *) &OverlappedType);
+
WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS);
WIN32_CONSTANT(F_DWORD, ERROR_BROKEN_PIPE);
WIN32_CONSTANT(F_DWORD, ERROR_NO_SYSTEM_RESOURCES);
WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY);
WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED);
WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
+ WIN32_CONSTANT(F_DWORD, FILE_FLAG_OVERLAPPED);
WIN32_CONSTANT(F_DWORD, GENERIC_READ);
WIN32_CONSTANT(F_DWORD, GENERIC_WRITE);
WIN32_CONSTANT(F_DWORD, INFINITE);