diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2012-03-05 18:28:37 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2012-03-05 18:28:37 (GMT) |
commit | bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973 (patch) | |
tree | 54137f9699833726def7c803cff7c995af22cfa5 /Doc/library/multiprocessing.rst | |
parent | 1e88f3faa61dbaa9ea0d2404aa8563c1eeceba54 (diff) | |
download | cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.zip cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.tar.gz cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.tar.bz2 |
Issue #12328: Fix multiprocessing's use of overlapped I/O on Windows.
Also, add a multiprocessing.connection.wait(rlist, timeout=None) function
for polling multiple objects at once. Patch by sbt.
Complete changelist from sbt's patch:
* Adds a wait(rlist, timeout=None) function for polling multiple
objects at once. On Unix this is just a wrapper for
select(rlist, [], [], timeout=None).
* Removes use of the SentinelReady exception and the sentinels argument
to certain methods. concurrent.futures.process has been changed to
use wait() instead of SentinelReady.
* Fixes bugs concerning PipeConnection.poll() and messages of zero
length.
* Fixes PipeListener.accept() to call ConnectNamedPipe() with
overlapped=True.
* Fixes Queue.empty() and SimpleQueue.empty() so that they are
threadsafe on Windows.
* Now PipeConnection.poll() and wait() will not modify the pipe except
possibly by consuming a zero length message. (Previously poll()
could consume a partial message.)
* All of multiprocesing's pipe related blocking functions/methods are
now interruptible by SIGINT on Windows.
Diffstat (limited to 'Doc/library/multiprocessing.rst')
-rw-r--r-- | Doc/library/multiprocessing.rst | 82 |
1 files changed, 77 insertions, 5 deletions
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 5019eff..d8e1d92 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -415,13 +415,14 @@ The :mod:`multiprocessing` package mostly replicates the API of the A numeric handle of a system object which will become "ready" when the process ends. + You can use this value if you want to wait on several events at + once using :func:`multiprocessing.connection.wait`. Otherwise + calling :meth:`join()` is simpler. + On Windows, this is an OS handle usable with the ``WaitForSingleObject`` and ``WaitForMultipleObjects`` family of API calls. On Unix, this is a file descriptor usable with primitives from the :mod:`select` module. - You can use this value if you want to wait on several events at once. - Otherwise calling :meth:`join()` is simpler. - .. versionadded:: 3.3 .. method:: terminate() @@ -785,6 +786,9 @@ Connection objects are usually created using :func:`Pipe` -- see also *timeout* is a number then this specifies the maximum time in seconds to block. If *timeout* is ``None`` then an infinite timeout is used. + Note that multiple connection objects may be polled at once by + using :func:`multiprocessing.connection.wait`. + .. method:: send_bytes(buffer[, offset[, size]]) Send byte data from an object supporting the buffer interface as a @@ -1779,8 +1783,9 @@ Usually message passing between processes is done using queues or by using However, the :mod:`multiprocessing.connection` module allows some extra flexibility. It basically gives a high level message oriented API for dealing -with sockets or Windows named pipes, and also has support for *digest -authentication* using the :mod:`hmac` module. +with sockets or Windows named pipes. It also has support for *digest +authentication* using the :mod:`hmac` module, and for polling +multiple connections at the same time. .. function:: deliver_challenge(connection, authkey) @@ -1878,6 +1883,38 @@ authentication* using the :mod:`hmac` module. The address from which the last accepted connection came. If this is unavailable then it is ``None``. +.. function:: wait(object_list, timeout=None) + + Wait till an object in *object_list* is ready. Returns the list of + those objects in *object_list* which are ready. If *timeout* is a + float then the call blocks for at most that many seconds. If + *timeout* is ``None`` then it will block for an unlimited period. + + For both Unix and Windows, an object can appear in *object_list* if + it is + + * a readable :class:`~multiprocessing.Connection` object; + * a connected and readable :class:`socket.socket` object; or + * the :attr:`~multiprocessing.Process.sentinel` attribute of a + :class:`~multiprocessing.Process` object. + + A connection or socket object is ready when there is data available + to be read from it, or the other end has been closed. + + **Unix**: ``wait(object_list, timeout)`` almost equivalent + ``select.select(object_list, [], [], timeout)``. The difference is + that, if :func:`select.select` is interrupted by a signal, it can + raise :exc:`OSError` with an error number of ``EINTR``, whereas + :func:`wait` will not. + + **Windows**: An item in *object_list* must either be an integer + handle which is waitable (according to the definition used by the + documentation of the Win32 function ``WaitForMultipleObjects()``) + or it can be an object with a :meth:`fileno` method which returns a + socket handle or pipe handle. (Note that pipe handles and socket + handles are **not** waitable handles.) + + .. versionadded:: 3.3 The module defines two exceptions: @@ -1929,6 +1966,41 @@ server:: conn.close() +The following code uses :func:`~multiprocessing.connection.wait` to +wait for messages from multiple processes at once:: + + import time, random + from multiprocessing import Process, Pipe, current_process + from multiprocessing.connection import wait + + def foo(w): + for i in range(10): + w.send((i, current_process().name)) + w.close() + + if __name__ == '__main__': + readers = [] + + for i in range(4): + r, w = Pipe(duplex=False) + readers.append(r) + p = Process(target=foo, args=(w,)) + p.start() + # We close the writable end of the pipe now to be sure that + # p is the only process which owns a handle for it. This + # ensures that when p closes its handle for the writable end, + # wait() will promptly report the readable end as being ready. + w.close() + + while readers: + for r in wait(readers): + try: + msg = r.recv() + except EOFError: + readers.remove(r) + else: + print(msg) + .. _multiprocessing-address-formats: |