diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2011-06-08 15:21:55 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2011-06-08 15:21:55 (GMT) |
commit | dd696496605883a44da983ad81e55a01e996a004 (patch) | |
tree | 505cc588dc520311ff2935b8804c04663b26c054 | |
parent | 4a5e5de03f47b7076fc0eabc9817a59efd20049d (diff) | |
download | cpython-dd696496605883a44da983ad81e55a01e996a004.zip cpython-dd696496605883a44da983ad81e55a01e996a004.tar.gz cpython-dd696496605883a44da983ad81e55a01e996a004.tar.bz2 |
Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed
children and raises BrokenProcessPool in such a situation. Previously it
would reliably freeze/deadlock.
-rw-r--r-- | Doc/library/concurrent.futures.rst | 19 | ||||
-rw-r--r-- | Lib/concurrent/futures/process.py | 105 | ||||
-rw-r--r-- | Lib/multiprocessing/connection.py | 149 | ||||
-rw-r--r-- | Lib/multiprocessing/forking.py | 1 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 6 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures.py | 20 | ||||
-rw-r--r-- | Misc/NEWS | 4 | ||||
-rw-r--r-- | Modules/_multiprocessing/win32_functions.c | 390 |
8 files changed, 587 insertions, 107 deletions
diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index e5d13f3..3bd4531 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -169,6 +169,12 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. of at most *max_workers* processes. If *max_workers* is ``None`` or not given, it will default to the number of processors on the machine. + .. versionchanged:: 3.3 + When one of the worker processes terminates abruptly, a + :exc:`BrokenProcessPool` error is now raised. Previously, behaviour + was undefined but operations on the executor or its futures would often + freeze or deadlock. + .. _processpoolexecutor-example: @@ -369,3 +375,16 @@ Module Functions :pep:`3148` -- futures - execute computations asynchronously The proposal which described this feature for inclusion in the Python standard library. + + +Exception classes +----------------- + +.. exception:: BrokenProcessPool + + Derived from :exc:`RuntimeError`, this exception class is raised when + one of the workers of a :class:`ProcessPoolExecutor` has terminated + in a non-clean fashion (for example, if it was killed from the outside). + + .. versionadded:: 3.3 + diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index f0bf6d5..c2331e7 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -46,10 +46,11 @@ Process #1..n: __author__ = 'Brian Quinlan (brian@sweetapp.com)' import atexit +import os from concurrent.futures import _base import queue import multiprocessing -from multiprocessing.queues import SimpleQueue +from multiprocessing.queues import SimpleQueue, SentinelReady import threading import weakref @@ -122,7 +123,7 @@ def _process_worker(call_queue, result_queue): call_item = call_queue.get(block=True) if call_item is None: # Wake up queue management thread - result_queue.put(None) + result_queue.put(os.getpid()) return try: r = call_item.fn(*call_item.args, **call_item.kwargs) @@ -194,29 +195,63 @@ def _queue_management_worker(executor_reference, result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. """ - nb_shutdown_processes = 0 - def shutdown_one_process(): - """Tell a worker to terminate, which will in turn wake us again""" - nonlocal nb_shutdown_processes - call_queue.put(None) - nb_shutdown_processes += 1 + + def shutdown_worker(): + # This is an upper bound + nb_children_alive = sum(p.is_alive() for p in processes.values()) + for i in range(0, nb_children_alive): + call_queue.put(None) + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS + # X. + for p in processes.values(): + p.join() + while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) - result_item = result_queue.get() - if result_item is not None: - work_item = pending_work_items[result_item.work_id] - del pending_work_items[result_item.work_id] - - if result_item.exception: - work_item.future.set_exception(result_item.exception) - else: - work_item.future.set_result(result_item.result) - continue - # If we come here, we either got a timeout or were explicitly woken up. - # In either case, check whether we should start shutting down. + sentinels = [p.sentinel for p in processes.values()] + assert sentinels + try: + result_item = result_queue.get(sentinels=sentinels) + except SentinelReady as e: + # Mark the process pool broken so that submits fail right now. + executor = executor_reference() + if executor is not None: + executor._broken = True + executor._shutdown_thread = True + del executor + # All futures in flight must be marked failed + for work_id, work_item in pending_work_items.items(): + work_item.future.set_exception( + BrokenProcessPool( + "A process in the process pool was " + "terminated abruptly while the future was " + "running or pending." + )) + pending_work_items.clear() + # Terminate remaining workers forcibly: the queues or their + # locks may be in a dirty state and block forever. + for p in processes.values(): + p.terminate() + for p in processes.values(): + p.join() + return + if isinstance(result_item, int): + # Clean shutdown of a worker using its PID + # (avoids marking the executor broken) + del processes[result_item] + elif result_item is not None: + work_item = pending_work_items.pop(result_item.work_id, None) + # work_item can be None if another process terminated (see above) + if work_item is not None: + if result_item.exception: + work_item.future.set_exception(result_item.exception) + else: + work_item.future.set_result(result_item.result) + # Check whether we should start shutting down. executor = executor_reference() # No more work items can be added if: # - The interpreter is shutting down OR @@ -226,17 +261,11 @@ def _queue_management_worker(executor_reference, # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. if not pending_work_items: - while nb_shutdown_processes < len(processes): - shutdown_one_process() - # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. - for p in processes: - p.join() + shutdown_worker() return else: # Start shutting down by telling a process it can exit. - shutdown_one_process() + call_queue.put(None) del executor _system_limits_checked = False @@ -264,6 +293,14 @@ def _check_system_limits(): _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max raise NotImplementedError(_system_limited) + +class BrokenProcessPool(RuntimeError): + """ + Raised when a process in a ProcessPoolExecutor terminated abruptly + while a future was in the running state. + """ + + class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None): """Initializes a new ProcessPoolExecutor instance. @@ -288,11 +325,13 @@ class ProcessPoolExecutor(_base.Executor): self._result_queue = SimpleQueue() self._work_ids = queue.Queue() self._queue_management_thread = None - self._processes = set() + # Map of pids to processes + self._processes = {} # Shutdown is a two-step process. self._shutdown_thread = False self._shutdown_lock = threading.Lock() + self._broken = False self._queue_count = 0 self._pending_work_items = {} @@ -302,6 +341,8 @@ class ProcessPoolExecutor(_base.Executor): def weakref_cb(_, q=self._result_queue): q.put(None) if self._queue_management_thread is None: + # Start the processes so that their sentinels are known. + self._adjust_process_count() self._queue_management_thread = threading.Thread( target=_queue_management_worker, args=(weakref.ref(self, weakref_cb), @@ -321,10 +362,13 @@ class ProcessPoolExecutor(_base.Executor): args=(self._call_queue, self._result_queue)) p.start() - self._processes.add(p) + self._processes[p.pid] = p def submit(self, fn, *args, **kwargs): with self._shutdown_lock: + if self._broken: + raise BrokenProcessPool('A child process terminated ' + 'abruptly, the process pool is not usable anymore') if self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') @@ -338,7 +382,6 @@ class ProcessPoolExecutor(_base.Executor): self._result_queue.put(None) self._start_queue_management_thread() - self._adjust_process_count() return f submit.__doc__ = _base.Executor.submit.__doc__ diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 415e210..ede2908 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -48,14 +48,18 @@ import itertools import _multiprocessing from multiprocessing import current_process, AuthenticationError, BufferTooShort -from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug +from multiprocessing.util import ( + get_temp_dir, Finalize, sub_debug, debug, _eintr_retry) try: from _multiprocessing import win32 + from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE except ImportError: if sys.platform == 'win32': raise win32 = None +_select = _eintr_retry(select.select) + # # # @@ -118,6 +122,15 @@ def address_type(address): else: raise ValueError('address type of %r unrecognized' % address) + +class SentinelReady(Exception): + """ + Raised when a sentinel is ready when polling. + """ + def __init__(self, *args): + Exception.__init__(self, *args) + self.sentinels = args[0] + # # Connection classes # @@ -253,19 +266,17 @@ class _ConnectionBase: (offset + size) // itemsize]) return size - def recv(self): + def recv(self, sentinels=None): """Receive a (picklable) object""" self._check_closed() self._check_readable() - buf = self._recv_bytes() + buf = self._recv_bytes(sentinels=sentinels) return pickle.loads(buf.getbuffer()) def poll(self, timeout=0.0): """Whether there is any input available to be read""" self._check_closed() self._check_readable() - if timeout < 0.0: - timeout = None return self._poll(timeout) @@ -274,61 +285,88 @@ if win32: class PipeConnection(_ConnectionBase): """ Connection class based on a Windows named pipe. + Overlapped I/O is used, so the handles must have been created + with FILE_FLAG_OVERLAPPED. """ + _buffered = b'' def _close(self): win32.CloseHandle(self._handle) def _send_bytes(self, buf): - nwritten = win32.WriteFile(self._handle, buf) + overlapped = win32.WriteFile(self._handle, buf, overlapped=True) + nwritten, complete = overlapped.GetOverlappedResult(True) + assert complete assert nwritten == len(buf) - def _recv_bytes(self, maxsize=None): + def _recv_bytes(self, maxsize=None, sentinels=()): + if sentinels: + self._poll(-1.0, sentinels) buf = io.BytesIO() - bufsize = 512 - if maxsize is not None: - bufsize = min(bufsize, maxsize) - try: - firstchunk, complete = win32.ReadFile(self._handle, bufsize) - except IOError as e: - if e.errno == win32.ERROR_BROKEN_PIPE: - raise EOFError - raise - lenfirstchunk = len(firstchunk) - buf.write(firstchunk) - if complete: - return buf + firstchunk = self._buffered + if firstchunk: + lenfirstchunk = len(firstchunk) + buf.write(firstchunk) + self._buffered = b'' + else: + # A reasonable size for the first chunk transfer + bufsize = 128 + if maxsize is not None and maxsize < bufsize: + bufsize = maxsize + try: + overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True) + lenfirstchunk, complete = overlapped.GetOverlappedResult(True) + firstchunk = overlapped.getbuffer() + assert lenfirstchunk == len(firstchunk) + except IOError as e: + if e.errno == win32.ERROR_BROKEN_PIPE: + raise EOFError + raise + buf.write(firstchunk) + if complete: + return buf navail, nleft = win32.PeekNamedPipe(self._handle) if maxsize is not None and lenfirstchunk + nleft > maxsize: return None - lastchunk, complete = win32.ReadFile(self._handle, nleft) - assert complete - buf.write(lastchunk) + if nleft > 0: + overlapped = win32.ReadFile(self._handle, nleft, overlapped=True) + res, complete = overlapped.GetOverlappedResult(True) + assert res == nleft + assert complete + buf.write(overlapped.getbuffer()) return buf - def _poll(self, timeout): + def _poll(self, timeout, sentinels=()): + # Fast non-blocking path navail, nleft = win32.PeekNamedPipe(self._handle) if navail > 0: return True elif timeout == 0.0: return False - # Setup a polling loop (translated straight from old - # pipe_connection.c) + # Blocking: use overlapped I/O if timeout < 0.0: - deadline = None + timeout = INFINITE else: - deadline = time.time() + timeout - delay = 0.001 - max_delay = 0.02 - while True: - time.sleep(delay) - navail, nleft = win32.PeekNamedPipe(self._handle) - if navail > 0: - return True - if deadline and time.time() > deadline: - return False - if delay < max_delay: - delay += 0.001 + timeout = int(timeout * 1000 + 0.5) + overlapped = win32.ReadFile(self._handle, 1, overlapped=True) + try: + handles = [overlapped.event] + handles += sentinels + res = win32.WaitForMultipleObjects(handles, False, timeout) + finally: + # Always cancel overlapped I/O in the same thread + # (because CancelIoEx() appears only in Vista) + overlapped.cancel() + if res == WAIT_TIMEOUT: + return False + idx = res - WAIT_OBJECT_0 + if idx == 0: + # I/O was successful, store received data + overlapped.GetOverlappedResult(True) + self._buffered += overlapped.getbuffer() + return True + assert 0 < idx < len(handles) + raise SentinelReady([handles[idx]]) class Connection(_ConnectionBase): @@ -357,11 +395,18 @@ class Connection(_ConnectionBase): break buf = buf[n:] - def _recv(self, size, read=_read): + def _recv(self, size, sentinels=(), read=_read): buf = io.BytesIO() + handle = self._handle + if sentinels: + handles = [handle] + sentinels remaining = size while remaining > 0: - chunk = read(self._handle, remaining) + if sentinels: + r = _select(handles, [], [])[0] + if handle not in r: + raise SentinelReady(r) + chunk = read(handle, remaining) n = len(chunk) if n == 0: if remaining == size: @@ -381,15 +426,17 @@ class Connection(_ConnectionBase): if n > 0: self._send(buf) - def _recv_bytes(self, maxsize=None): - buf = self._recv(4) + def _recv_bytes(self, maxsize=None, sentinels=()): + buf = self._recv(4, sentinels) size, = struct.unpack("=i", buf.getvalue()) if maxsize is not None and size > maxsize: return None - return self._recv(size) + return self._recv(size, sentinels) def _poll(self, timeout): - r = select.select([self._handle], [], [], timeout)[0] + if timeout < 0.0: + timeout = None + r = _select([self._handle], [], [], timeout)[0] return bool(r) @@ -495,23 +542,21 @@ else: obsize, ibsize = 0, BUFSIZE h1 = win32.CreateNamedPipe( - address, openmode, + address, openmode | win32.FILE_FLAG_OVERLAPPED, win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | win32.PIPE_WAIT, 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL ) h2 = win32.CreateFile( - address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL + address, access, 0, win32.NULL, win32.OPEN_EXISTING, + win32.FILE_FLAG_OVERLAPPED, win32.NULL ) win32.SetNamedPipeHandleState( h2, win32.PIPE_READMODE_MESSAGE, None, None ) - try: - win32.ConnectNamedPipe(h1, win32.NULL) - except WindowsError as e: - if e.args[0] != win32.ERROR_PIPE_CONNECTED: - raise + overlapped = win32.ConnectNamedPipe(h1, overlapped=True) + overlapped.GetOverlappedResult(True) c1 = PipeConnection(h1, writable=duplex) c2 = PipeConnection(h2, readable=duplex) diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 3c359cb..a2c61ef 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -35,6 +35,7 @@ import os import sys import signal +import select from multiprocessing import util, process diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 3280a25..3324363 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -44,7 +44,7 @@ import weakref from queue import Empty, Full import _multiprocessing -from multiprocessing import Pipe +from multiprocessing.connection import Pipe, SentinelReady from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition from multiprocessing.util import debug, info, Finalize, register_after_fork from multiprocessing.forking import assert_spawning @@ -372,10 +372,10 @@ class SimpleQueue(object): def _make_methods(self): recv = self._reader.recv racquire, rrelease = self._rlock.acquire, self._rlock.release - def get(): + def get(*, sentinels=None): racquire() try: - return recv() + return recv(sentinels) finally: rrelease() self.get = get diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 7457f39..5968980 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -19,7 +19,7 @@ import unittest from concurrent import futures from concurrent.futures._base import ( PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) -import concurrent.futures.process +from concurrent.futures.process import BrokenProcessPool def create_future(state=PENDING, exception=None, result=None): @@ -154,7 +154,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): processes = self.executor._processes self.executor.shutdown() - for p in processes: + for p in processes.values(): p.join() def test_context_manager_shutdown(self): @@ -163,7 +163,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - for p in processes: + for p in processes.values(): p.join() def test_del_shutdown(self): @@ -174,7 +174,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): del executor queue_management_thread.join() - for p in processes: + for p in processes.values(): p.join() class WaitTests(unittest.TestCase): @@ -381,7 +381,17 @@ class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): - pass + def test_killed_child(self): + # When a child process is abruptly terminated, the whole pool gets + # "broken". + futures = [self.executor.submit(time.sleep, 3)] + # Get one of the processes, and terminate (kill) it + p = next(iter(self.executor._processes.values())) + p.terminate() + for fut in futures: + self.assertRaises(BrokenProcessPool, fut.result) + # Submitting other jobs fails as well. + self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) class FutureTests(unittest.TestCase): @@ -187,6 +187,10 @@ Core and Builtins Library ------- +- Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed + children and raises BrokenProcessPool in such a situation. Previously it + would reliably freeze/deadlock. + - Issue #12040: Expose a new attribute ``sentinel`` on instances of :class:`multiprocessing.Process`. Also, fix Process.join() to not use polling anymore, when given a timeout. diff --git a/Modules/_multiprocessing/win32_functions.c b/Modules/_multiprocessing/win32_functions.c index 12dc0cd..c017b2a 100644 --- a/Modules/_multiprocessing/win32_functions.c +++ b/Modules/_multiprocessing/win32_functions.c @@ -12,10 +12,223 @@ #define WIN32_FUNCTION(func) \ {#func, (PyCFunction)win32_ ## func, METH_VARARGS | METH_STATIC, ""} +#define WIN32_KWARGS_FUNCTION(func) \ + {#func, (PyCFunction)win32_ ## func, METH_VARARGS | METH_KEYWORDS | METH_STATIC, ""} + #define WIN32_CONSTANT(fmt, con) \ PyDict_SetItemString(Win32Type.tp_dict, #con, Py_BuildValue(fmt, con)) +/* Grab CancelIoEx dynamically from kernel32 */ +static int has_CancelIoEx = -1; +static BOOL (CALLBACK *Py_CancelIoEx)(HANDLE, LPOVERLAPPED); + +static int +check_CancelIoEx() +{ + if (has_CancelIoEx == -1) + { + HINSTANCE hKernel32 = GetModuleHandle("KERNEL32"); + * (FARPROC *) &Py_CancelIoEx = GetProcAddress(hKernel32, + "CancelIoEx"); + has_CancelIoEx = (Py_CancelIoEx != NULL); + } + return has_CancelIoEx; +} + + +/* + * A Python object wrapping an OVERLAPPED structure and other useful data + * for overlapped I/O + */ + +typedef struct { + PyObject_HEAD + OVERLAPPED overlapped; + /* For convenience, we store the file handle too */ + HANDLE handle; + /* Whether there's I/O in flight */ + int pending; + /* Whether I/O completed successfully */ + int completed; + /* Buffer used for reading (optional) */ + PyObject *read_buffer; + /* Buffer used for writing (optional) */ + Py_buffer write_buffer; +} OverlappedObject; + +static void +overlapped_dealloc(OverlappedObject *self) +{ + int err = GetLastError(); + if (self->pending) { + if (check_CancelIoEx()) + Py_CancelIoEx(self->handle, &self->overlapped); + else { + PyErr_SetString(PyExc_RuntimeError, + "I/O operations still in flight while destroying " + "Overlapped object, the process may crash"); + PyErr_WriteUnraisable(NULL); + } + } + CloseHandle(self->overlapped.hEvent); + SetLastError(err); + if (self->write_buffer.obj) + PyBuffer_Release(&self->write_buffer); + Py_CLEAR(self->read_buffer); + PyObject_Del(self); +} + +static PyObject * +overlapped_GetOverlappedResult(OverlappedObject *self, PyObject *waitobj) +{ + int wait; + BOOL res; + DWORD transferred = 0; + + wait = PyObject_IsTrue(waitobj); + if (wait < 0) + return NULL; + Py_BEGIN_ALLOW_THREADS + res = GetOverlappedResult(self->handle, &self->overlapped, &transferred, + wait != 0); + Py_END_ALLOW_THREADS + + if (!res) { + int err = GetLastError(); + if (err == ERROR_IO_INCOMPLETE) + Py_RETURN_NONE; + if (err != ERROR_MORE_DATA) { + self->pending = 0; + return PyErr_SetExcFromWindowsErr(PyExc_IOError, err); + } + } + self->pending = 0; + self->completed = 1; + if (self->read_buffer) { + assert(PyBytes_CheckExact(self->read_buffer)); + if (_PyBytes_Resize(&self->read_buffer, transferred)) + return NULL; + } + return Py_BuildValue("lN", (long) transferred, PyBool_FromLong(res)); +} + +static PyObject * +overlapped_getbuffer(OverlappedObject *self) +{ + PyObject *res; + if (!self->completed) { + PyErr_SetString(PyExc_ValueError, + "can't get read buffer before GetOverlappedResult() " + "signals the operation completed"); + return NULL; + } + res = self->read_buffer ? self->read_buffer : Py_None; + Py_INCREF(res); + return res; +} + +static PyObject * +overlapped_cancel(OverlappedObject *self) +{ + BOOL res = TRUE; + + if (self->pending) { + Py_BEGIN_ALLOW_THREADS + if (check_CancelIoEx()) + res = Py_CancelIoEx(self->handle, &self->overlapped); + else + res = CancelIo(self->handle); + Py_END_ALLOW_THREADS + } + + /* CancelIoEx returns ERROR_NOT_FOUND if the I/O completed in-between */ + if (!res && GetLastError() != ERROR_NOT_FOUND) + return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); + self->pending = 0; + Py_RETURN_NONE; +} + +static PyMethodDef overlapped_methods[] = { + {"GetOverlappedResult", (PyCFunction) overlapped_GetOverlappedResult, + METH_O, NULL}, + {"getbuffer", (PyCFunction) overlapped_getbuffer, METH_NOARGS, NULL}, + {"cancel", (PyCFunction) overlapped_cancel, METH_NOARGS, NULL}, + {NULL} +}; + +static PyMemberDef overlapped_members[] = { + {"event", T_HANDLE, + offsetof(OverlappedObject, overlapped) + offsetof(OVERLAPPED, hEvent), + READONLY, "overlapped event handle"}, + {NULL} +}; + +PyTypeObject OverlappedType = { + PyVarObject_HEAD_INIT(NULL, 0) + /* tp_name */ "_multiprocessing.win32.Overlapped", + /* tp_basicsize */ sizeof(OverlappedObject), + /* tp_itemsize */ 0, + /* tp_dealloc */ (destructor) overlapped_dealloc, + /* tp_print */ 0, + /* tp_getattr */ 0, + /* tp_setattr */ 0, + /* tp_reserved */ 0, + /* tp_repr */ 0, + /* tp_as_number */ 0, + /* tp_as_sequence */ 0, + /* tp_as_mapping */ 0, + /* tp_hash */ 0, + /* tp_call */ 0, + /* tp_str */ 0, + /* tp_getattro */ 0, + /* tp_setattro */ 0, + /* tp_as_buffer */ 0, + /* tp_flags */ Py_TPFLAGS_DEFAULT, + /* tp_doc */ "OVERLAPPED structure wrapper", + /* tp_traverse */ 0, + /* tp_clear */ 0, + /* tp_richcompare */ 0, + /* tp_weaklistoffset */ 0, + /* tp_iter */ 0, + /* tp_iternext */ 0, + /* tp_methods */ overlapped_methods, + /* tp_members */ overlapped_members, + /* tp_getset */ 0, + /* tp_base */ 0, + /* tp_dict */ 0, + /* tp_descr_get */ 0, + /* tp_descr_set */ 0, + /* tp_dictoffset */ 0, + /* tp_init */ 0, + /* tp_alloc */ 0, + /* tp_new */ 0, +}; + +static OverlappedObject * +new_overlapped(HANDLE handle) +{ + OverlappedObject *self; + + self = PyObject_New(OverlappedObject, &OverlappedType); + if (!self) + return NULL; + self->handle = handle; + self->read_buffer = NULL; + self->pending = 0; + self->completed = 0; + memset(&self->overlapped, 0, sizeof(OVERLAPPED)); + memset(&self->write_buffer, 0, sizeof(Py_buffer)); + /* Manual reset, initially non-signalled */ + self->overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + return self; +} + + +/* + * Module functions + */ + static PyObject * win32_CloseHandle(PyObject *self, PyObject *args) { @@ -36,20 +249,44 @@ win32_CloseHandle(PyObject *self, PyObject *args) } static PyObject * -win32_ConnectNamedPipe(PyObject *self, PyObject *args) +win32_ConnectNamedPipe(PyObject *self, PyObject *args, PyObject *kwds) { HANDLE hNamedPipe; - LPOVERLAPPED lpOverlapped; + int use_overlapped = 0; BOOL success; + OverlappedObject *overlapped = NULL; + static char *kwlist[] = {"handle", "overlapped", NULL}; - if (!PyArg_ParseTuple(args, F_HANDLE F_POINTER, - &hNamedPipe, &lpOverlapped)) + if (!PyArg_ParseTupleAndKeywords(args, kwds, + F_HANDLE "|i", kwlist, + &hNamedPipe, &use_overlapped)) return NULL; + if (use_overlapped) { + overlapped = new_overlapped(hNamedPipe); + if (!overlapped) + return NULL; + } + Py_BEGIN_ALLOW_THREADS - success = ConnectNamedPipe(hNamedPipe, lpOverlapped); + success = ConnectNamedPipe(hNamedPipe, + overlapped ? &overlapped->overlapped : NULL); Py_END_ALLOW_THREADS + if (overlapped) { + int err = GetLastError(); + /* Overlapped ConnectNamedPipe never returns a success code */ + assert(success == 0); + if (err == ERROR_IO_PENDING) + overlapped->pending = 1; + else if (err == ERROR_PIPE_CONNECTED) + SetEvent(overlapped->overlapped.hEvent); + else { + Py_DECREF(overlapped); + return PyErr_SetFromWindowsErr(err); + } + return (PyObject *) overlapped; + } if (!success) return PyErr_SetFromWindowsErr(0); @@ -280,46 +517,109 @@ win32_send(PyObject *self, PyObject *args) } static PyObject * -win32_WriteFile(PyObject *self, PyObject *args) +win32_WriteFile(PyObject *self, PyObject *args, PyObject *kwds) { HANDLE handle; - Py_buffer buf; + Py_buffer _buf, *buf; + PyObject *bufobj; int written; BOOL ret; + int use_overlapped = 0; + OverlappedObject *overlapped = NULL; + static char *kwlist[] = {"handle", "buffer", "overlapped", NULL}; + + /* First get handle and use_overlapped to know which Py_buffer to use */ + if (!PyArg_ParseTupleAndKeywords(args, kwds, + F_HANDLE "O|i:WriteFile", kwlist, + &handle, &bufobj, &use_overlapped)) + return NULL; + + if (use_overlapped) { + overlapped = new_overlapped(handle); + if (!overlapped) + return NULL; + buf = &overlapped->write_buffer; + } + else + buf = &_buf; - if (!PyArg_ParseTuple(args, F_HANDLE "y*:WriteFile" , &handle, &buf)) + if (!PyArg_Parse(bufobj, "y*", buf)) { + Py_XDECREF(overlapped); return NULL; + } Py_BEGIN_ALLOW_THREADS - ret = WriteFile(handle, buf.buf, buf.len, &written, NULL); + ret = WriteFile(handle, buf->buf, buf->len, &written, + overlapped ? &overlapped->overlapped : NULL); Py_END_ALLOW_THREADS - PyBuffer_Release(&buf); + if (overlapped) { + int err = GetLastError(); + if (!ret) { + if (err == ERROR_IO_PENDING) + overlapped->pending = 1; + else { + Py_DECREF(overlapped); + return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); + } + } + return (PyObject *) overlapped; + } + + PyBuffer_Release(buf); if (!ret) return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); return PyLong_FromLong(written); } static PyObject * -win32_ReadFile(PyObject *self, PyObject *args) +win32_ReadFile(PyObject *self, PyObject *args, PyObject *kwds) { HANDLE handle; int size; DWORD nread; PyObject *buf; BOOL ret; + int use_overlapped = 0; + OverlappedObject *overlapped = NULL; + static char *kwlist[] = {"handle", "size", "overlapped", NULL}; - if (!PyArg_ParseTuple(args, F_HANDLE "i:ReadFile" , &handle, &size)) + if (!PyArg_ParseTupleAndKeywords(args, kwds, + F_HANDLE "i|i:ReadFile", kwlist, + &handle, &size, &use_overlapped)) return NULL; buf = PyBytes_FromStringAndSize(NULL, size); if (!buf) return NULL; + if (use_overlapped) { + overlapped = new_overlapped(handle); + if (!overlapped) { + Py_DECREF(buf); + return NULL; + } + /* Steals reference to buf */ + overlapped->read_buffer = buf; + } Py_BEGIN_ALLOW_THREADS - ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread, NULL); + ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread, + overlapped ? &overlapped->overlapped : NULL); Py_END_ALLOW_THREADS + if (overlapped) { + int err = GetLastError(); + if (!ret) { + if (err == ERROR_IO_PENDING) + overlapped->pending = 1; + else if (err != ERROR_MORE_DATA) { + Py_DECREF(overlapped); + return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); + } + } + return (PyObject *) overlapped; + } + if (!ret && GetLastError() != ERROR_MORE_DATA) { Py_DECREF(buf); return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); @@ -373,19 +673,71 @@ win32_PeekNamedPipe(PyObject *self, PyObject *args) } } +static PyObject * +win32_WaitForMultipleObjects(PyObject* self, PyObject* args) +{ + DWORD result; + PyObject *handle_seq; + HANDLE handles[MAXIMUM_WAIT_OBJECTS]; + Py_ssize_t nhandles, i; + int wait_flag; + int milliseconds = INFINITE; + + if (!PyArg_ParseTuple(args, "Oi|i:WaitForMultipleObjects", + &handle_seq, &wait_flag, &milliseconds)) + return NULL; + + if (!PySequence_Check(handle_seq)) { + PyErr_Format(PyExc_TypeError, + "sequence type expected, got '%s'", + Py_TYPE(handle_seq)->tp_doc); + return NULL; + } + nhandles = PySequence_Length(handle_seq); + if (nhandles == -1) + return NULL; + if (nhandles < 0 || nhandles >= MAXIMUM_WAIT_OBJECTS) { + PyErr_Format(PyExc_ValueError, + "need at most %zd handles, got a sequence of length %zd", + MAXIMUM_WAIT_OBJECTS, nhandles); + return NULL; + } + for (i = 0; i < nhandles; i++) { + HANDLE h; + PyObject *v = PySequence_GetItem(handle_seq, i); + if (v == NULL) + return NULL; + if (!PyArg_Parse(v, F_HANDLE, &h)) + return NULL; + handles[i] = h; + } + + Py_BEGIN_ALLOW_THREADS + result = WaitForMultipleObjects((DWORD) nhandles, handles, + (BOOL) wait_flag, (DWORD) milliseconds); + Py_END_ALLOW_THREADS + + if (result == WAIT_FAILED) + return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); + + return PyLong_FromLong((int) result); +} + + static PyMethodDef win32_methods[] = { WIN32_FUNCTION(CloseHandle), WIN32_FUNCTION(GetLastError), WIN32_FUNCTION(OpenProcess), WIN32_FUNCTION(ExitProcess), - WIN32_FUNCTION(ConnectNamedPipe), + WIN32_KWARGS_FUNCTION(ConnectNamedPipe), WIN32_FUNCTION(CreateFile), WIN32_FUNCTION(CreateNamedPipe), - WIN32_FUNCTION(ReadFile), + WIN32_KWARGS_FUNCTION(ReadFile), WIN32_FUNCTION(PeekNamedPipe), WIN32_FUNCTION(SetNamedPipeHandleState), + WIN32_FUNCTION(WaitForMultipleObjects), WIN32_FUNCTION(WaitNamedPipe), - WIN32_FUNCTION(WriteFile), + WIN32_KWARGS_FUNCTION(WriteFile), WIN32_FUNCTION(closesocket), WIN32_FUNCTION(recv), WIN32_FUNCTION(send), @@ -407,12 +759,18 @@ create_win32_namespace(void) return NULL; Py_INCREF(&Win32Type); + if (PyType_Ready(&OverlappedType) < 0) + return NULL; + PyDict_SetItemString(Win32Type.tp_dict, "Overlapped", + (PyObject *) &OverlappedType); + WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS); WIN32_CONSTANT(F_DWORD, ERROR_BROKEN_PIPE); WIN32_CONSTANT(F_DWORD, ERROR_NO_SYSTEM_RESOURCES); WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY); WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED); WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT); + WIN32_CONSTANT(F_DWORD, FILE_FLAG_OVERLAPPED); WIN32_CONSTANT(F_DWORD, GENERIC_READ); WIN32_CONSTANT(F_DWORD, GENERIC_WRITE); WIN32_CONSTANT(F_DWORD, INFINITE); |