summaryrefslogtreecommitdiffstats
path: root/Doc/library/multiprocessing.rst
diff options
context:
space:
mode:
Diffstat (limited to 'Doc/library/multiprocessing.rst')
-rw-r--r--Doc/library/multiprocessing.rst375
1 files changed, 269 insertions, 106 deletions
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
index 4271fc2..a783efc 100644
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -29,7 +29,7 @@ Windows.
Functionality within this package requires that the ``__main__`` module be
importable by the children. This is covered in :ref:`multiprocessing-programming`
however it is worth pointing out here. This means that some examples, such
- as the :class:`multiprocessing.Pool` examples will not work in the
+ as the :class:`multiprocessing.pool.Pool` examples will not work in the
interactive interpreter. For example::
>>> from multiprocessing import Pool
@@ -121,9 +121,7 @@ processes:
print(q.get()) # prints "[42, None, 'hello']"
p.join()
- Queues are thread and process safe, but note that they must never
- be instantiated as a side effect of importing a module: this can lead
- to a deadlock! (see :ref:`threaded-imports`)
+ Queues are thread and process safe.
**Pipes**
@@ -229,11 +227,11 @@ However, if you really do need to use some shared data then
holds Python objects and allows other processes to manipulate them using
proxies.
- A manager returned by :func:`Manager` will support types :class:`list`,
- :class:`dict`, :class:`Namespace`, :class:`Lock`, :class:`RLock`,
- :class:`Semaphore`, :class:`BoundedSemaphore`, :class:`Condition`,
- :class:`Event`, :class:`Queue`, :class:`Value` and :class:`Array`. For
- example, ::
+ A manager returned by :func:`Manager` will support types
+ :class:`list`, :class:`dict`, :class:`Namespace`, :class:`Lock`,
+ :class:`RLock`, :class:`Semaphore`, :class:`BoundedSemaphore`,
+ :class:`Condition`, :class:`Event`, :class:`Barrier`,
+ :class:`Queue`, :class:`Value` and :class:`Array`. For example, ::
from multiprocessing import Process, Manager
@@ -244,17 +242,16 @@ However, if you really do need to use some shared data then
l.reverse()
if __name__ == '__main__':
- manager = Manager()
+ with Manager() as manager:
+ d = manager.dict()
+ l = manager.list(range(10))
- d = manager.dict()
- l = manager.list(range(10))
+ p = Process(target=f, args=(d, l))
+ p.start()
+ p.join()
- p = Process(target=f, args=(d, l))
- p.start()
- p.join()
-
- print(d)
- print(l)
+ print(d)
+ print(l)
will print ::
@@ -282,10 +279,10 @@ For example::
return x*x
if __name__ == '__main__':
- pool = Pool(processes=4) # start 4 worker processes
- result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
- print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
- print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
+ with Pool(processes=4) as pool: # start 4 worker processes
+ result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
+ print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
+ print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
Reference
@@ -298,7 +295,8 @@ The :mod:`multiprocessing` package mostly replicates the API of the
:class:`Process` and exceptions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-.. class:: Process(group=None, target=None, name=None, args=(), kwargs={})
+.. class:: Process(group=None, target=None, name=None, args=(), kwargs={}, \
+ *, daemon=None)
Process objects represent activity that is run in a separate process. The
:class:`Process` class has equivalents of all the methods of
@@ -308,18 +306,22 @@ The :mod:`multiprocessing` package mostly replicates the API of the
should always be ``None``; it exists solely for compatibility with
:class:`threading.Thread`. *target* is the callable object to be invoked by
the :meth:`run()` method. It defaults to ``None``, meaning nothing is
- called. *name* is the process name. By default, a unique name is constructed
- of the form 'Process-N\ :sub:`1`:N\ :sub:`2`:...:N\ :sub:`k`' where N\
- :sub:`1`,N\ :sub:`2`,...,N\ :sub:`k` is a sequence of integers whose length
- is determined by the *generation* of the process. *args* is the argument
- tuple for the target invocation. *kwargs* is a dictionary of keyword
- arguments for the target invocation. By default, no arguments are passed to
- *target*.
+ called. *name* is the process name (see :attr:`name` for more details).
+ *args* is the argument tuple for the target invocation. *kwargs* is a
+ dictionary of keyword arguments for the target invocation. If provided,
+ the keyword-only *daemon* argument sets the process :attr:`daemon` flag
+ to ``True`` or ``False``. If ``None`` (the default), this flag will be
+ inherited from the creating process.
+
+ By default, no arguments are passed to *target*.
If a subclass overrides the constructor, it must make sure it invokes the
base class constructor (:meth:`Process.__init__`) before doing anything else
to the process.
+ .. versionchanged:: 3.3
+ Added the *daemon* argument.
+
.. method:: run()
Method representing the process's activity.
@@ -338,10 +340,9 @@ The :mod:`multiprocessing` package mostly replicates the API of the
.. method:: join([timeout])
- Block the calling thread until the process whose :meth:`join` method is
- called terminates or until the optional timeout occurs.
-
- If *timeout* is ``None`` then there is no timeout.
+ If the optional argument *timeout* is ``None`` (the default), the method
+ blocks until the process whose :meth:`join` method is called terminates.
+ If *timeout* is a positive number, it blocks at most *timeout* seconds.
A process can be joined many times.
@@ -350,11 +351,14 @@ The :mod:`multiprocessing` package mostly replicates the API of the
.. attribute:: name
- The process's name.
+ The process's name. The name is a string used for identification purposes
+ only. It has no semantics. Multiple processes may be given the same
+ name.
- The name is a string used for identification purposes only. It has no
- semantics. Multiple processes may be given the same name. The initial
- name is set by the constructor.
+ The initial name is set by the constructor. If no explicit name is
+ provided to the constructor, a name of the form
+ 'Process-N\ :sub:`1`:N\ :sub:`2`:...:N\ :sub:`k`' is constructed, where
+ each N\ :sub:`k` is the N-th child of its parent.
.. method:: is_alive
@@ -406,6 +410,21 @@ The :mod:`multiprocessing` package mostly replicates the API of the
See :ref:`multiprocessing-auth-keys`.
+ .. attribute:: sentinel
+
+ A numeric handle of a system object which will become "ready" when
+ the process ends.
+
+ You can use this value if you want to wait on several events at
+ once using :func:`multiprocessing.connection.wait`. Otherwise
+ calling :meth:`join()` is simpler.
+
+ On Windows, this is an OS handle usable with the ``WaitForSingleObject``
+ and ``WaitForMultipleObjects`` family of API calls. On Unix, this is
+ a file descriptor usable with primitives from the :mod:`select` module.
+
+ .. versionadded:: 3.3
+
.. method:: terminate()
Terminate the process. On Unix this is done using the ``SIGTERM`` signal;
@@ -445,6 +464,9 @@ The :mod:`multiprocessing` package mostly replicates the API of the
>>> p.exitcode == -signal.SIGTERM
True
+.. exception:: ProcessError
+
+ The base class of all :mod:`multiprocessing` exceptions.
.. exception:: BufferTooShort
@@ -454,6 +476,13 @@ The :mod:`multiprocessing` package mostly replicates the API of the
If ``e`` is an instance of :exc:`BufferTooShort` then ``e.args[0]`` will give
the message as a byte string.
+.. exception:: AuthenticationError
+
+ Raised when there is an authentication error.
+
+.. exception:: TimeoutError
+
+ Raised by methods with a timeout when the timeout expires.
Pipes and Queues
~~~~~~~~~~~~~~~~
@@ -465,7 +494,7 @@ primitives like locks.
For passing messages one can use :func:`Pipe` (for a connection between two
processes) or a queue (which allows multiple producers and consumers).
-The :class:`Queue`, :class:`multiprocessing.queues.SimpleQueue` and :class:`JoinableQueue` types are multi-producer,
+The :class:`Queue`, :class:`SimpleQueue` and :class:`JoinableQueue` types are multi-producer,
multi-consumer FIFO queues modelled on the :class:`queue.Queue` class in the
standard library. They differ in that :class:`Queue` lacks the
:meth:`~queue.Queue.task_done` and :meth:`~queue.Queue.join` methods introduced
@@ -611,7 +640,7 @@ For an example of the usage of queues for interprocess communication see
exits -- see :meth:`join_thread`.
-.. class:: multiprocessing.queues.SimpleQueue()
+.. class:: SimpleQueue()
It is a simplified :class:`Queue` type, very close to a locked :class:`Pipe`.
@@ -635,8 +664,8 @@ For an example of the usage of queues for interprocess communication see
.. method:: task_done()
- Indicate that a formerly enqueued task is complete. Used by queue consumer
- threads. For each :meth:`~Queue.get` used to fetch a task, a subsequent
+ Indicate that a formerly enqueued task is complete. Used by queue
+ consumers. For each :meth:`~Queue.get` used to fetch a task, a subsequent
call to :meth:`task_done` tells the queue that the processing on the task
is complete.
@@ -653,7 +682,7 @@ For an example of the usage of queues for interprocess communication see
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the
- queue. The count goes down whenever a consumer thread calls
+ queue. The count goes down whenever a consumer calls
:meth:`task_done` to indicate that the item was retrieved and all work on
it is complete. When the count of unfinished tasks drops to zero,
:meth:`~Queue.join` unblocks.
@@ -767,6 +796,9 @@ Connection objects are usually created using :func:`Pipe` -- see also
*timeout* is a number then this specifies the maximum time in seconds to
block. If *timeout* is ``None`` then an infinite timeout is used.
+ Note that multiple connection objects may be polled at once by
+ using :func:`multiprocessing.connection.wait`.
+
.. method:: send_bytes(buffer[, offset[, size]])
Send byte data from an object supporting the buffer interface as a
@@ -785,9 +817,14 @@ Connection objects are usually created using :func:`Pipe` -- see also
to receive and the other end has closed.
If *maxlength* is specified and the message is longer than *maxlength*
- then :exc:`IOError` is raised and the connection will no longer be
+ then :exc:`OSError` is raised and the connection will no longer be
readable.
+ .. versionchanged:: 3.3
+ This function used to raise a :exc:`IOError`, which is now an
+ alias of :exc:`OSError`.
+
+
.. method:: recv_bytes_into(buffer[, offset])
Read into *buffer* a complete message of byte data sent from the other end
@@ -805,6 +842,14 @@ Connection objects are usually created using :func:`Pipe` -- see also
raised and the complete message is available as ``e.args[0]`` where ``e``
is the exception instance.
+ .. versionchanged:: 3.3
+ Connection objects themselves can now be transferred between processes
+ using :meth:`Connection.send` and :meth:`Connection.recv`.
+
+ .. versionadded:: 3.3
+ Connection objects now support the context manager protocol -- see
+ :ref:`typecontextmanager`. :meth:`__enter__` returns the
+ connection object, and :meth:`__exit__` calls :meth:`close`.
For example:
@@ -856,6 +901,12 @@ program as they are in a multithreaded program. See the documentation for
Note that one can also create synchronization primitives by using a manager
object -- see :ref:`multiprocessing-managers`.
+.. class:: Barrier(parties[, action[, timeout]])
+
+ A barrier object: a clone of :class:`threading.Barrier`.
+
+ .. versionadded:: 3.3
+
.. class:: BoundedSemaphore([value])
A bounded semaphore object: a clone of :class:`threading.BoundedSemaphore`.
@@ -865,20 +916,17 @@ object -- see :ref:`multiprocessing-managers`.
.. class:: Condition([lock])
- A condition variable: a clone of :class:`threading.Condition`.
+ A condition variable: an alias for :class:`threading.Condition`.
If *lock* is specified then it should be a :class:`Lock` or :class:`RLock`
object from :mod:`multiprocessing`.
+ .. versionchanged:: 3.3
+ The :meth:`wait_for` method was added.
+
.. class:: Event()
A clone of :class:`threading.Event`.
- This method returns the state of the internal semaphore on exit, so it
- will always return ``True`` except if a timeout is given and the operation
- times out.
-
- .. versionchanged:: 3.1
- Previously, the method always returned ``None``.
.. class:: Lock()
@@ -894,6 +942,12 @@ object -- see :ref:`multiprocessing-managers`.
.. note::
+ The :meth:`acquire` and :meth:`wait` methods of each of these types
+ treat negative timeouts as zero timeouts. This differs from
+ :mod:`threading` where, since version 3.2, the equivalent
+ :meth:`acquire` methods treat negative timeouts as infinite
+ timeouts.
+
On Mac OS X, ``sem_timedwait`` is unsupported, so calling ``acquire()`` with
a timeout will emulate that function's behavior using a sleeping loop.
@@ -915,10 +969,11 @@ Shared :mod:`ctypes` Objects
It is possible to create shared objects using shared memory which can be
inherited by child processes.
-.. function:: Value(typecode_or_type, *args[, lock])
+.. function:: Value(typecode_or_type, *args, lock=True)
Return a :mod:`ctypes` object allocated from shared memory. By default the
- return value is actually a synchronized wrapper for the object.
+ return value is actually a synchronized wrapper for the object. The object
+ itself can be accessed via the *value* attribute of a :class:`Value`.
*typecode_or_type* determines the type of the returned object: it is either a
ctypes type or a one character typecode of the kind used by the :mod:`array`
@@ -1007,7 +1062,7 @@ processes.
attributes which allow one to use it to store and retrieve strings -- see
documentation for :mod:`ctypes`.
-.. function:: Array(typecode_or_type, size_or_initializer, *args[, lock])
+.. function:: Array(typecode_or_type, size_or_initializer, *, lock=True)
The same as :func:`RawArray` except that depending on the value of *lock* a
process-safe synchronization wrapper may be returned instead of a raw ctypes
@@ -1022,7 +1077,7 @@ processes.
Note that *lock* is a keyword-only argument.
-.. function:: Value(typecode_or_type, *args[, lock])
+.. function:: Value(typecode_or_type, *args, lock=True)
The same as :func:`RawValue` except that depending on the value of *lock* a
process-safe synchronization wrapper may be returned instead of a raw ctypes
@@ -1124,8 +1179,10 @@ Managers
~~~~~~~~
Managers provide a way to create data which can be shared between different
-processes. A manager object controls a server process which manages *shared
-objects*. Other processes can access the shared objects by using proxies.
+processes, including sharing over a network between processes running on
+different machines. A manager object controls a server process which manages
+*shared objects*. Other processes can access the shared objects by using
+proxies.
.. function:: multiprocessing.Manager()
@@ -1198,9 +1255,10 @@ their parent process exits. The manager classes are defined in the
type of shared object. This must be a string.
*callable* is a callable used for creating objects for this type
- identifier. If a manager instance will be created using the
- :meth:`from_address` classmethod or if the *create_method* argument is
- ``False`` then this can be left as ``None``.
+ identifier. If a manager instance will be connected to the
+ server using the :meth:`connect` method, or if the
+ *create_method* argument is ``False`` then this can be left as
+ ``None``.
*proxytype* is a subclass of :class:`BaseProxy` which is used to create
proxies for shared objects with this *typeid*. If ``None`` then a proxy
@@ -1232,6 +1290,14 @@ their parent process exits. The manager classes are defined in the
The address used by the manager.
+ .. versionchanged:: 3.3
+ Manager objects support the context manager protocol -- see
+ :ref:`typecontextmanager`. :meth:`__enter__` starts the server
+ process (if it has not already started) and then returns the
+ manager object. :meth:`__exit__` calls :meth:`shutdown`.
+
+ In previous versions :meth:`__enter__` did not start the
+ manager's server process if it was not already started.
.. class:: SyncManager
@@ -1241,6 +1307,13 @@ their parent process exits. The manager classes are defined in the
It also supports creation of shared lists and dictionaries.
+ .. method:: Barrier(parties[, action[, timeout]])
+
+ Create a shared :class:`threading.Barrier` object and return a
+ proxy for it.
+
+ .. versionadded:: 3.3
+
.. method:: BoundedSemaphore([value])
Create a shared :class:`threading.BoundedSemaphore` object and return a
@@ -1254,6 +1327,9 @@ their parent process exits. The manager classes are defined in the
If *lock* is supplied then it should be a proxy for a
:class:`threading.Lock` or :class:`threading.RLock` object.
+ .. versionchanged:: 3.3
+ The :meth:`wait_for` method was added.
+
.. method:: Event()
Create a shared :class:`threading.Event` object and return a proxy for it.
@@ -1359,11 +1435,10 @@ callables with the manager class. For example::
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
- manager = MyManager()
- manager.start()
- maths = manager.Maths()
- print(maths.add(4, 3)) # prints 7
- print(maths.mul(7, 8)) # prints 56
+ with MyManager() as manager:
+ maths = manager.Maths()
+ print(maths.add(4, 3)) # prints 7
+ print(maths.mul(7, 8)) # prints 56
Using a remote manager
@@ -1563,7 +1638,7 @@ Process Pools
One can create a pool of processes which will carry out tasks submitted to it
with the :class:`Pool` class.
-.. class:: multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
+.. class:: Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
A process pool object which controls a pool of worker processes to which jobs
can be submitted. It supports asynchronous results with timeouts and
@@ -1658,6 +1733,24 @@ with the :class:`Pool` class.
returned iterator should be considered arbitrary. (Only when there is
only one worker process is the order guaranteed to be "correct".)
+ .. method:: starmap(func, iterable[, chunksize])
+
+ Like :meth:`map` except that the elements of the `iterable` are expected
+ to be iterables that are unpacked as arguments.
+
+ Hence an `iterable` of `[(1,2), (3, 4)]` results in `[func(1,2),
+ func(3,4)]`.
+
+ .. versionadded:: 3.3
+
+ .. method:: starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
+
+ A combination of :meth:`starmap` and :meth:`map_async` that iterates over
+ `iterable` of iterables and calls `func` with the iterables unpacked.
+ Returns a result object.
+
+ .. versionadded:: 3.3
+
.. method:: close()
Prevents any more tasks from being submitted to the pool. Once all the
@@ -1674,6 +1767,11 @@ with the :class:`Pool` class.
Wait for the worker processes to exit. One must call :meth:`close` or
:meth:`terminate` before using :meth:`join`.
+ .. versionadded:: 3.3
+ Pool objects now support the context manager protocol -- see
+ :ref:`typecontextmanager`. :meth:`__enter__` returns the pool
+ object, and :meth:`__exit__` calls :meth:`terminate`.
+
.. class:: AsyncResult
@@ -1708,21 +1806,20 @@ The following example demonstrates the use of a pool::
return x*x
if __name__ == '__main__':
- pool = Pool(processes=4) # start 4 worker processes
+ with Pool(processes=4) as pool: # start 4 worker processes
+ result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously
+ print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
- result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously
- print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
+ print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
- print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
+ it = pool.imap(f, range(10))
+ print(next(it)) # prints "0"
+ print(next(it)) # prints "1"
+ print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
- it = pool.imap(f, range(10))
- print(next(it)) # prints "0"
- print(next(it)) # prints "1"
- print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
-
- import time
- result = pool.apply_async(time.sleep, (10,))
- print(result.get(timeout=1)) # raises TimeoutError
+ import time
+ result = pool.apply_async(time.sleep, (10,))
+ print(result.get(timeout=1)) # raises TimeoutError
.. _multiprocessing-listeners-clients:
@@ -1738,8 +1835,9 @@ Usually message passing between processes is done using queues or by using
However, the :mod:`multiprocessing.connection` module allows some extra
flexibility. It basically gives a high level message oriented API for dealing
-with sockets or Windows named pipes, and also has support for *digest
-authentication* using the :mod:`hmac` module.
+with sockets or Windows named pipes. It also has support for *digest
+authentication* using the :mod:`hmac` module, and for polling
+multiple connections at the same time.
.. function:: deliver_challenge(connection, authkey)
@@ -1749,15 +1847,15 @@ authentication* using the :mod:`hmac` module.
If the reply matches the digest of the message using *authkey* as the key
then a welcome message is sent to the other end of the connection. Otherwise
- :exc:`AuthenticationError` is raised.
+ :exc:`~multiprocessing.AuthenticationError` is raised.
.. function:: answerChallenge(connection, authkey)
Receive a message, calculate the digest of the message using *authkey* as the
key, and then send the digest back.
- If a welcome message is not received, then :exc:`AuthenticationError` is
- raised.
+ If a welcome message is not received, then
+ :exc:`~multiprocessing.AuthenticationError` is raised.
.. function:: Client(address[, family[, authenticate[, authkey]]])
@@ -1771,7 +1869,8 @@ authentication* using the :mod:`hmac` module.
If *authenticate* is ``True`` or *authkey* is a byte string then digest
authentication is used. The key used for authentication will be either
*authkey* or ``current_process().authkey`` if *authkey* is ``None``.
- If authentication fails then :exc:`AuthenticationError` is raised. See
+ If authentication fails then
+ :exc:`~multiprocessing.AuthenticationError` is raised. See
:ref:`multiprocessing-auth-keys`.
.. class:: Listener([address[, family[, backlog[, authenticate[, authkey]]]]])
@@ -1812,13 +1911,15 @@ authentication* using the :mod:`hmac` module.
``current_process().authkey`` is used as the authentication key. If
*authkey* is ``None`` and *authenticate* is ``False`` then no
authentication is done. If authentication fails then
- :exc:`AuthenticationError` is raised. See :ref:`multiprocessing-auth-keys`.
+ :exc:`~multiprocessing.AuthenticationError` is raised.
+ See :ref:`multiprocessing-auth-keys`.
.. method:: accept()
Accept a connection on the bound socket or named pipe of the listener
object and return a :class:`Connection` object. If authentication is
- attempted and fails, then :exc:`AuthenticationError` is raised.
+ attempted and fails, then
+ :exc:`~multiprocessing.AuthenticationError` is raised.
.. method:: close()
@@ -1837,12 +1938,44 @@ authentication* using the :mod:`hmac` module.
The address from which the last accepted connection came. If this is
unavailable then it is ``None``.
+ .. versionadded:: 3.3
+ Listener objects now support the context manager protocol -- see
+ :ref:`typecontextmanager`. :meth:`__enter__` returns the
+ listener object, and :meth:`__exit__` calls :meth:`close`.
-The module defines two exceptions:
+.. function:: wait(object_list, timeout=None)
-.. exception:: AuthenticationError
+ Wait till an object in *object_list* is ready. Returns the list of
+ those objects in *object_list* which are ready. If *timeout* is a
+ float then the call blocks for at most that many seconds. If
+ *timeout* is ``None`` then it will block for an unlimited period.
+ A negative timeout is equivalent to a zero timeout.
+
+ For both Unix and Windows, an object can appear in *object_list* if
+ it is
+
+ * a readable :class:`~multiprocessing.Connection` object;
+ * a connected and readable :class:`socket.socket` object; or
+ * the :attr:`~multiprocessing.Process.sentinel` attribute of a
+ :class:`~multiprocessing.Process` object.
+
+ A connection or socket object is ready when there is data available
+ to be read from it, or the other end has been closed.
+
+ **Unix**: ``wait(object_list, timeout)`` almost equivalent
+ ``select.select(object_list, [], [], timeout)``. The difference is
+ that, if :func:`select.select` is interrupted by a signal, it can
+ raise :exc:`OSError` with an error number of ``EINTR``, whereas
+ :func:`wait` will not.
+
+ **Windows**: An item in *object_list* must either be an integer
+ handle which is waitable (according to the definition used by the
+ documentation of the Win32 function ``WaitForMultipleObjects()``)
+ or it can be an object with a :meth:`fileno` method which returns a
+ socket handle or pipe handle. (Note that pipe handles and socket
+ handles are **not** waitable handles.)
- Exception raised when there is an authentication error.
+ .. versionadded:: 3.3
**Examples**
@@ -1855,19 +1988,16 @@ the client::
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
- listener = Listener(address, authkey=b'secret password')
- conn = listener.accept()
- print('connection accepted from', listener.last_accepted)
+ with Listener(address, authkey=b'secret password') as listener:
+ with listener.accept() as conn:
+ print('connection accepted from', listener.last_accepted)
- conn.send([2.25, None, 'junk', float])
+ conn.send([2.25, None, 'junk', float])
- conn.send_bytes(b'hello')
+ conn.send_bytes(b'hello')
- conn.send_bytes(array('i', [42, 1729]))
-
- conn.close()
- listener.close()
+ conn.send_bytes(array('i', [42, 1729]))
The following code connects to the server and receives some data from the
server::
@@ -1876,17 +2006,50 @@ server::
from array import array
address = ('localhost', 6000)
- conn = Client(address, authkey=b'secret password')
- print(conn.recv()) # => [2.25, None, 'junk', float]
+ with Client(address, authkey=b'secret password') as conn:
+ print(conn.recv()) # => [2.25, None, 'junk', float]
+
+ print(conn.recv_bytes()) # => 'hello'
+
+ arr = array('i', [0, 0, 0, 0, 0])
+ print(conn.recv_bytes_into(arr)) # => 8
+ print(arr) # => array('i', [42, 1729, 0, 0, 0])
+
+The following code uses :func:`~multiprocessing.connection.wait` to
+wait for messages from multiple processes at once::
- print(conn.recv_bytes()) # => 'hello'
+ import time, random
+ from multiprocessing import Process, Pipe, current_process
+ from multiprocessing.connection import wait
- arr = array('i', [0, 0, 0, 0, 0])
- print(conn.recv_bytes_into(arr)) # => 8
- print(arr) # => array('i', [42, 1729, 0, 0, 0])
+ def foo(w):
+ for i in range(10):
+ w.send((i, current_process().name))
+ w.close()
- conn.close()
+ if __name__ == '__main__':
+ readers = []
+
+ for i in range(4):
+ r, w = Pipe(duplex=False)
+ readers.append(r)
+ p = Process(target=foo, args=(w,))
+ p.start()
+ # We close the writable end of the pipe now to be sure that
+ # p is the only process which owns a handle for it. This
+ # ensures that when p closes its handle for the writable end,
+ # wait() will promptly report the readable end as being ready.
+ w.close()
+
+ while readers:
+ for r in wait(readers):
+ try:
+ msg = r.recv()
+ except EOFError:
+ readers.remove(r)
+ else:
+ print(msg)
.. _multiprocessing-address-formats:
@@ -2047,7 +2210,7 @@ Avoid shared state
It is probably best to stick to using queues or pipes for communication
between processes rather than using the lower level synchronization
- primitives from the :mod:`threading` module.
+ primitives.
Picklability