diff options
Diffstat (limited to 'Doc/library/multiprocessing.rst')
-rw-r--r-- | Doc/library/multiprocessing.rst | 2108 |
1 files changed, 2108 insertions, 0 deletions
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst new file mode 100644 index 0000000..bb374b3 --- /dev/null +++ b/Doc/library/multiprocessing.rst @@ -0,0 +1,2108 @@ +:mod:`multiprocessing` --- Process-based "threading" interface +============================================================== + +.. module:: multiprocessing + :synopsis: Process-based "threading" interface. + +.. versionadded:: 2.6 + +:mod:`multiprocessing` is a package for the Python language which supports the +spawning of processes using a similar API of the :mod:`threading` module. It +runs on both Unix and Windows. + +The :mod:`multiprocessing` module offers the capability of both local and remote +concurrency effectively side-stepping the Global Interpreter Lock by utilizing +subprocesses for "threads". Due to this, the :mod:`multiprocessing` module +allows the programmer to fully leverage multiple processors on a given machine. + + +Introduction +------------ + + +Threads, processes and the GIL +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To run more than one piece of code at the same time on the same computer one has +the choice of either using multiple processes or multiple threads. + +Although a program can be made up of multiple processes, these processes are in +effect completely independent of one another: different processes are not able +to cooperate with one another unless one sets up some means of communication +between them (such as by using sockets). If a lot of data must be transferred +between processes then this can be inefficient. + +On the other hand, multiple threads within a single process are intimately +connected: they share their data but often can interfere badly with one another. +It is often argued that the only way to make multithreaded programming "easy" is +to avoid relying on any shared state and for the threads to only communicate by +passing messages to each other. + +CPython has a *Global Interpreter Lock* (GIL) which in many ways makes threading +easier than it is in most languages by making sure that only one thread can +manipulate the interpreter's objects at a time. As a result, it is often safe +to let multiple threads access data without using any additional locking as one +would need to in a language such as C. + +One downside of the GIL is that on multi-processor (or multi-core) systems a +multithreaded Python program can only make use of one processor at a time unless +your application makes heavy use of I/O which effectively side-steps this. This +is a problem that can be overcome by using multiple processes instead. + +This package allows one to write multi-process programs using much the same API +that one uses for writing threaded programs. + + +Forking and spawning +~~~~~~~~~~~~~~~~~~~~ + +There are two ways of creating a new process in Python: + +* The current process can *fork* a new child process by using the + :func:`os.fork` function. This effectively creates an identical copy of the + current process which is now able to go off and perform some task set by the + parent process. This means that the child process inherits *copies* of all + variables that the parent process had. However, :func:`os.fork` is not + available on every platform: in particular Windows does not support it. + +* Alternatively, the current process can spawn a completely new Python + interpreter by using the :mod:`subprocess` module or one of the + :func:`os.spawn*` functions. Getting this new interpreter in to a fit state + to perform the task set for it by its parent process is, however, a bit of a + challenge. + +The :mod:`multiprocessing` module uses :func:`os.fork` if it is available since +it makes life a lot simpler. Forking the process is also more efficient in +terms of memory usage and the time needed to create the new process. + + +The :class:`Process` class +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +In :mod:`multiprocessing`, processes are spawned by creating a :class:`Process` +object and then calling its :meth:`Process.start` method. :class:`Process` +follows the API of :class:`threading.Thread`. A trivial example of a +multiprocess program is :: + + from multiprocessing import Process + + def f(name): + print 'hello', name + + if __name__ == '__main__': + p = Process(target=f, args=('bob',)) + p.start() + p.join() + +Here the function ``f`` is run in a child process. + +For an explanation of why (on Windows) the ``if __name__ == '__main__'`` part is +necessary, see :ref:`multiprocessing-programming`. + + + +Exchanging objects between processes +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +:mod:`multiprocessing` supports two types of communication channel between +processes: + +**Queues** + + The :class:`Queue` class is a near clone of :class:`Queue.Queue`. For + example:: + + from multiprocessing import Process, Queue + + def f(q): + q.put([42, None, 'hello']) + + if __name__ == '__main__': + q = Queue() + p = Process(target=f, args=(q,)) + p.start() + print q.get() # prints "[42, None, 'hello']" + p.join() + + Queues are thread and process safe. + +**Pipes** + + The :func:`Pipe` function returns a pair of connection objects connected by a + pipe which by default is duplex (two-way). For example:: + + from multiprocessing import Process, Pipe + + def f(conn): + conn.send([42, None, 'hello']) + conn.close() + + if __name__ == '__main__': + parent_conn, child_conn = Pipe() + p = Process(target=f, args=(child_conn,)) + p.start() + print parent_conn.recv() # prints "[42, None, 'hello']" + p.join() + + The two connection objects returned by :func:`Pipe` represent the two ends of + the pipe. Each connection object has :meth:`send` and :meth:`recv` methods + (among others). Note that data in a pipe may become corrupted if two + processes (or threads) try to read from or write to the *same* end of the + pipe at the same time. Of course there is no risk of corruption from + processes using different ends of the pipe at the same time. + + +Synchronization between processes +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +:mod:`multiprocessing` contains equivalents of all the synchronization +primitives from :mod:`threading`. For instance one can use a lock to ensure +that only one process prints to standard output at a time:: + + from multiprocessing import Process, Lock + + def f(l, i): + l.acquire() + print 'hello world', i + l.release() + + if __name__ == '__main__': + lock = Lock() + + for num in range(10): + Process(target=f, args=(lock, num)).start() + +Without using the lock output from the different processes is liable to get all +mixed up. + + +Sharing state between processes +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +As mentioned above, when doing concurrent programming it is usually best to +avoid using shared state as far as possible. This is particularly true when +using multiple processes. + +However, if you really do need to use some shared data then +:mod:`multiprocessing` provides a couple of ways of doing so. + +**Shared memory** + + Data can be stored in a shared memory map using :class:`Value` or + :class:`Array`. For example, the following code :: + + from multiprocessing import Process, Value, Array + + def f(n, a): + n.value = 3.1415927 + for i in range(len(a)): + a[i] = -a[i] + + if __name__ == '__main__': + num = Value('d', 0.0) + arr = Array('i', range(10)) + + p = Process(target=f, args=(num, arr)) + p.start() + p.join() + + print num.value + print arr[:] + + will print :: + + 3.1415927 + [0, -1, -2, -3, -4, -5, -6, -7, -8, -9] + + The ``'d'`` and ``'i'`` arguments used when creating ``num`` and ``arr`` are + typecodes of the kind used by the :mod:`array` module: ``'d'`` indicates a + double precision float and ``'i'`` inidicates a signed integer. These shared + objects will be process and thread safe. + + For more flexibility in using shared memory one can use the + :mod:`multiprocessing.sharedctypes` module which supports the creation of + arbitrary ctypes objects allocated from shared memory. + +**Server process** + + A manager object returned by :func:`Manager` controls a server process which + holds python objects and allows other processes to manipulate them using + proxies. + + A manager returned by :func:`Manager` will support types :class:`list`, + :class:`dict`, :class:`Namespace`, :class:`Lock`, :class:`RLock`, + :class:`Semaphore`, :class:`BoundedSemaphore`, :class:`Condition`, + :class:`Event`, :class:`Queue`, :class:`Value` and :class:`Array`. For + example, :: + + from multiprocessing import Process, Manager + + def f(d, l): + d[1] = '1' + d['2'] = 2 + d[0.25] = None + l.reverse() + + if __name__ == '__main__': + manager = Manager() + + d = manager.dict() + l = manager.list(range(10)) + + p = Process(target=f, args=(d, l)) + p.start() + p.join() + + print d + print l + + will print :: + + {0.25: None, 1: '1', '2': 2} + [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] + + Server process managers are more flexible than using shared memory objects + because they can be made to support arbitrary object types. Also, a single + manager can be shared by processes on different computers over a network. + They are, however, slower than using shared memory. + + +Using a pool of workers +~~~~~~~~~~~~~~~~~~~~~~~ + +The :class:`multiprocessing.pool.Pool()` class represens a pool of worker +processes. It has methods which allows tasks to be offloaded to the worker +processes in a few different ways. + +For example:: + + from multiprocessing import Pool + + def f(x): + return x*x + + if __name__ == '__main__': + pool = Pool(processes=4) # start 4 worker processes + result = pool.applyAsync(f, [10]) # evaluate "f(10)" asynchronously + print result.get(timeout=1) # prints "100" unless your computer is *very* slow + print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]" + + +Reference +--------- + +The :mod:`multiprocessing` package mostly replicates the API of the +:mod:`threading` module. + + +:class:`Process` and exceptions +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. class:: Process([group[, target[, name[, args[, kwargs]]]]]) + + Process objects represent activity that is run in a separate process. The + :class:`Process` class has equivalents of all the methods of + :class:`threading.Thread`. + + The constructor should always be called with keyword arguments. *group* + should always be ``None``; it exists soley for compatibility with + :class:`threading.Thread`. *target* is the callable object to be invoked by + the :meth:`run()` method. It defaults to None, meaning nothing is + called. *name* is the process name. By default, a unique name is constructed + of the form 'Process-N\ :sub:`1`:N\ :sub:`2`:...:N\ :sub:`k`' where N\ + :sub:`1`,N\ :sub:`2`,...,N\ :sub:`k` is a sequence of integers whose length + is determined by the *generation* of the process. *args* is the argument + tuple for the target invocation. *kwargs* is a dictionary of keyword + arguments for the target invocation. By default, no arguments are passed to + *target*. + + If a subclass overrides the constructor, it must make sure it invokes the + base class constructor (:meth:`Process.__init__`) before doing anything else + to the process. + + .. method:: run() + + Method representing the process's activity. + + You may override this method in a subclass. The standard :meth:`run` + method invokes the callable object passed to the object's constructor as + the target argument, if any, with sequential and keyword arguments taken + from the *args* and *kwargs* arguments, respectively. + + .. method:: start() + + Start the process's activity. + + This must be called at most once per process object. It arranges for the + object's :meth:`run` method to be invoked in a separate process. + + .. method:: join([timeout]) + + Block the calling thread until the process whose :meth:`join` method is + called terminates or until the optional timeout occurs. + + If *timeout* is ``None`` then there is no timeout. + + A process can be joined many times. + + A process cannot join itself because this would cause a deadlock. It is + an error to attempt to join a process before it has been started. + + .. method:: get_name() + + Return the process's name. + + .. method:: set_name(name) + + Set the process's name. + + The name is a string used for identification purposes only. It has no + semantics. Multiple processes may be given the same name. The initial + name is set by the constructor. + + .. method:: is_alive() + + Return whether the process is alive. + + Roughly, a process object is alive from the moment the :meth:`start` + method returns until the child process terminates. + + .. method:: is_daemon() + + Return the process's daemon flag. + + .. method:: set_daemon(daemonic) + + Set the process's daemon flag to the Boolean value *daemonic*. This must + be called before :meth:`start` is called. + + The initial value is inherited from the creating process. + + When a process exits, it attempts to terminate all of its daemonic child + processes. + + Note that a daemonic process is not allowed to create child processes. + Otherwise a daemonic process would leave its children orphaned if it gets + terminated when its parent process exits. + + In addition process objects also support the following methods: + + .. method:: get_pid() + + Return the process ID. Before the process is spawned, this will be + ``None``. + + .. method:: get_exit_code() + + Return the child's exit code. This will be ``None`` if the process has + not yet terminated. A negative value *-N* indicates that the child was + terminated by signal *N*. + + .. method:: get_auth_key() + + Return the process's authentication key (a byte string). + + When :mod:`multiprocessing` is initialized the main process is assigned a + random string using :func:`os.random`. + + When a :class:`Process` object is created, it will inherit the + authentication key of its parent process, although this may be changed + using :meth:`set_auth_key` below. + + See :ref:`multiprocessing-auth-keys`. + + .. method:: set_auth_key(authkey) + + Set the process's authentication key which must be a byte string. + + .. method:: terminate()` + + Terminate the process. On Unix this is done using the ``SIGTERM`` signal, + on Windows ``TerminateProcess()`` is used. Note that exit handlers and + finally clauses etc will not be executed. + + Note that descendant processes of the process will *not* be terminated -- + they will simply become orphaned. + + .. warning:: + + If this method is used when the associated process is using a pipe or + queue then the pipe or queue is liable to become corrupted and may + become unusable by other process. Similarly, if the process has + acquired a lock or semaphore etc. then terminating it is liable to + cause other processes to deadlock. + + Note that the :meth:`start`, :meth:`join`, :meth:`is_alive` and + :meth:`get_exit_code` methods should only be called by the process that + created the process object. + + Example usage of some of the methods of :class:`Process`:: + + >>> import processing, time, signal + >>> p = processing.Process(target=time.sleep, args=(1000,)) + >>> print p, p.is_alive() + <Process(Process-1, initial)> False + >>> p.start() + >>> print p, p.is_alive() + <Process(Process-1, started)> True + >>> p.terminate() + >>> print p, p.is_alive() + <Process(Process-1, stopped[SIGTERM])> False + >>> p.get_exit_code() == -signal.SIGTERM + True + + +.. exception:: BufferTooShort + + Exception raised by :meth:`Connection.recv_bytes_into()` when the supplied + buffer object is too small for the message read. + + If ``e`` is an instance of :exc:`BufferTooShort` then ``e.args[0]`` will give + the message as a byte string. + + +Pipes and Queues +~~~~~~~~~~~~~~~~ + +When using multiple processes, one generally uses message passing for +communication between processes and avoids having to use any synchronization +primitives like locks. + +For passing messages one can use :func:`Pipe` (for a connection between two +processes) or a queue (which allows multiple producers and consumers). + +The :class:`Queue` and :class:`JoinableQueue` types are multi-producer, +multi-consumer FIFO queues modelled on the :class:`Queue.Queue` class in the +standard library. They differ in that :class:`Queue` lacks the +:meth:`task_done` and :meth:`join` methods introduced into Python 2.5's +:class:`Queue.Queue` class. + +If you use :class:`JoinableQueue` then you **must** call +:meth:`JoinableQueue.task_done` for each task removed from the queue or else the +semaphore used to count the number of unfinished tasks may eventually overflow +raising an exception. + +.. note:: + + :mod:`multiprocessing` uses the usual :exc:`Queue.Empty` and + :exc:`Queue.Full` exceptions to signal a timeout. They are not available in + the :mod:`multiprocessing` namespace so you need to import them from + :mod:`Queue`. + + +.. warning:: + + If a process is killed using :meth:`Process.terminate` or :func:`os.kill` + while it is trying to use a :class:`Queue`, then the data in the queue is + likely to become corrupted. This may cause any other processes to get an + exception when it tries to use the queue later on. + +.. warning:: + + As mentioned above, if a child process has put items on a queue (and it has + not used :meth:`JoinableQueue.cancel_join_thread`), then that process will + not terminate until all buffered items have been flushed to the pipe. + + This means that if you try joining that process you may get a deadlock unless + you are sure that all items which have been put on the queue have been + consumed. Similarly, if the child process is non-daemonic then the parent + process may hang on exit when it tries to join all it non-daemonic children. + + Note that a queue created using a manager does not have this issue. See + :ref:`multiprocessing-programming`. + +Note that one can also create a shared queue by using a manager object -- see +:ref:`multiprocessing-managers`. + +For an example of the usage of queues for interprocess communication see +:ref:`multiprocessing-examples`. + + +.. function:: Pipe([duplex]) + + Returns a pair ``(conn1, conn2)`` of :class:`Connection` objects representing + the ends of a pipe. + + If *duplex* is ``True`` (the default) then the pipe is bidirectional. If + *duplex* is ``False`` then the pipe is unidirectional: ``conn1`` can only be + used for receiving messages and ``conn2`` can only be used for sending + messages. + + +.. class:: Queue([maxsize]) + + Returns a process shared queue implemented using a pipe and a few + locks/semaphores. When a process first puts an item on the queue a feeder + thread is started which transfers objects from a buffer into the pipe. + + The usual :exc:`Queue.Empty` and :exc:`Queue.Full` exceptions from the + standard library's :mod:`Queue` module are raised to signal timeouts. + + :class:`Queue` implements all the methods of :class:`Queue.Queue` except for + :meth:`task_done` and :meth:`join`. + + .. method:: qsize() + + Return the approximate size of the queue. Because of + multithreading/multiprocessing semantics, this number is not reliable. + + Note that this may raise :exc:`NotImplementedError` on Unix platforms like + MacOS X where ``sem_getvalue()`` is not implemented. + + .. method:: empty() + + Return ``True`` if the queue is empty, ``False`` otherwise. Because of + multithreading/multiprocessing semantics, this is not reliable. + + .. method:: full() + + Return ``True`` if the queue is full, ``False`` otherwise. Because of + multithreading/multiprocessing semantics, this is not reliable. + + .. method:: put(item[, block[, timeout]])` + + Put item into the queue. If optional args *block* is ``True`` (the + default) and *timeout* is ``None`` (the default), block if necessary until + a free slot is available. If *timeout* is a positive number, it blocks at + most *timeout* seconds and raises the :exc:`Queue.Full` exception if no + free slot was available within that time. Otherwise (*block* is + ``False``), put an item on the queue if a free slot is immediately + available, else raise the :exc:`Queue.Full` exception (*timeout* is + ignored in that case). + + .. method:: put_nowait(item) + + Equivalent to ``put(item, False)``. + + .. method:: get([block[, timeout]]) + + Remove and return an item from the queue. If optional args *block* is + ``True`` (the default) and *timeout* is ``None`` (the default), block if + necessary until an item is available. If *timeout* is a positive number, + it blocks at most *timeout* seconds and raises the :exc:`Queue.Empty` + exception if no item was available within that time. Otherwise (block is + ``False``), return an item if one is immediately available, else raise the + :exc:`Queue.Empty` exception (*timeout* is ignored in that case). + + .. method:: get_nowait() + get_no_wait() + + Equivalent to ``get(False)``. + + :class:`multiprocessing.Queue` has a few additional methods not found in + :class:`Queue.Queue` which are usually unnecessary: + + .. method:: close() + + Indicate that no more data will be put on this queue by the current + process. The background thread will quit once it has flushed all buffered + data to the pipe. This is called automatically when the queue is garbage + collected. + + .. method:: join_thread() + + Join the background thread. This can only be used after :meth:`close` has + been called. It blocks until the background thread exits, ensuring that + all data in the buffer has been flushed to the pipe. + + By default if a process is not the creator of the queue then on exit it + will attempt to join the queue's background thread. The process can call + :meth:`cancel_join_thread()` to make :meth:`join_thread()` do nothing. + + .. method:: cancel_join_thread() + + Prevent :meth:`join_thread` from blocking. In particular, this prevents + the background thread from being joined automatically when the process + exits -- see :meth:`join_thread()`. + + +.. class:: JoinableQueue([maxsize]) + + :class:`JoinableQueue`, a :class:`Queue` subclass, is a queue which + additionally has :meth:`task_done` and :meth:`join` methods. + + .. method:: task_done() + + Indicate that a formerly enqueued task is complete. Used by queue consumer + threads. For each :meth:`get` used to fetch a task, a subsequent call to + :meth:`task_done` tells the queue that the processing on the task is + complete. + + If a :meth:`join` is currently blocking, it will resume when all items + have been processed (meaning that a :meth:`task_done` call was received + for every item that had been :meth:`put` into the queue). + + Raises a :exc:`ValueError` if called more times than there were items + placed in the queue. + + + .. method:: join() + + Block until all items in the queue have been gotten and processed. + + The count of unfinished tasks goes up whenever an item is added to the + queue. The count goes down whenever a consumer thread calls + :meth:`task_done` to indicate that the item was retrieved and all work on + it is complete. When the count of unfinished tasks drops to zero, + :meth:`join` unblocks. + + +Miscellaneous +~~~~~~~~~~~~~ + +.. function:: active_children() + + Return list of all live children of the current process. + + Calling this has the side affect of "joining" any processes which have + already finished. + +.. function:: cpu_count() + + Return the number of CPUs in the system. May raise + :exc:`NotImplementedError`. + +.. function:: current_process() + + Return the :class:`Process` object corresponding to the current process. + + An analogue of :func:`threading.current_thread`. + +.. function:: freeze_support() + + Add support for when a program which uses :mod:`multiprocessing` has been + frozen to produce a Windows executable. (Has been tested with **py2exe**, + **PyInstaller** and **cx_Freeze**.) + + One needs to call this function straight after the ``if __name__ == + '__main__'`` line of the main module. For example:: + + from multiprocessing import Process, freeze_support + + def f(): + print 'hello world!' + + if __name__ == '__main__': + freeze_support() + Process(target=f).start() + + If the :func:`freeze_support()` line is missed out then trying to run the + frozen executable will raise :exc:`RuntimeError`. + + If the module is being run normally by the Python interpreter then + :func:`freeze_support()` has no effect. + +.. function:: set_executable() + + Sets the path of the python interpreter to use when starting a child process. + (By default `sys.executable` is used). Embedders will probably need to do + some thing like :: + + setExecutable(os.path.join(sys.exec_prefix, 'pythonw.exe')) + + before they can create child processes. (Windows only) + + +.. note:: + + :mod:`multiprocessing` contains no analogues of + :func:`threading.active_count`, :func:`threading.enumerate`, + :func:`threading.settrace`, :func:`threading.setprofile`, + :class:`threading.Timer`, or :class:`threading.local`. + + +Connection Objects +~~~~~~~~~~~~~~~~~~ + +Connection objects allow the sending and receiving of picklable objects or +strings. They can be thought of as message oriented connected sockets. + +Connection objects usually created using :func:`Pipe()` -- see also +:ref:`multiprocessing-listeners-clients`. + +.. class:: Connection + + .. method:: send(obj) + + Send an object to the other end of the connection which should be read + using :meth:`recv`. + + The object must be picklable. + + .. method:: recv() + + Return an object sent from the other end of the connection using + :meth:`send`. Raises :exc:`EOFError` if there is nothing left to receive + and the other end was closed. + + .. method:: fileno() + + Returns the file descriptor or handle used by the connection. + + .. method:: close() + + Close the connection. + + This is called automatically when the connection is garbage collected. + + .. method:: poll([timeout]) + + Return whether there is any data available to be read. + + If *timeout* is not specified then it will return immediately. If + *timeout* is a number then this specifies the maximum time in seconds to + block. If *timeout* is ``None`` then an infinite timeout is used. + + .. method:: send_bytes(buffer[, offset[, size]]) + + Send byte data from an object supporting the buffer interface as a + complete message. + + If *offset* is given then data is read from that position in *buffer*. If + *size* is given then that many bytes will be read from buffer. + + .. method:: recv_bytes([maxlength]) + + Return a complete message of byte data sent from the other end of the + connection as a string. Raises :exc:`EOFError` if there is nothing left + to receive and the other end has closed. + + If *maxlength* is specified and the message is longer than *maxlength* + then :exc:`IOError` is raised and the connection will no longer be + readable. + + .. method:: recv_bytes_into(buffer[, offset]) + + Read into *buffer* a complete message of byte data sent from the other end + of the connection and return the number of bytes in the message. Raises + :exc:`EOFError` if there is nothing left to receive and the other end was + closed. + + *buffer* must be an object satisfying the writable buffer interface. If + *offset* is given then the message will be written into the buffer from + *that position. Offset must be a non-negative integer less than the + *length of *buffer* (in bytes). + + If the buffer is too short then a :exc:`BufferTooShort` exception is + raised and the complete message is available as ``e.args[0]`` where ``e`` + is the exception instance. + + +For example: + + >>> from multiprocessing import Pipe + >>> a, b = Pipe() + >>> a.send([1, 'hello', None]) + >>> b.recv() + [1, 'hello', None] + >>> b.send_bytes('thank you') + >>> a.recv_bytes() + 'thank you' + >>> import array + >>> arr1 = array.array('i', range(5)) + >>> arr2 = array.array('i', [0] * 10) + >>> a.send_bytes(arr1) + >>> count = b.recv_bytes_into(arr2) + >>> assert count == len(arr1) * arr1.itemsize + >>> arr2 + array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0]) + + +.. warning:: + + The :meth:`Connection.recv` method automatically unpickles the data it + receives, which can be a security risk unless you can trust the process + which sent the message. + + Therefore, unless the connection object was produced using :func:`Pipe()` + you should only use the `recv()` and `send()` methods after performing some + sort of authentication. See :ref:`multiprocessing-auth-keys`. + +.. warning:: + + If a process is killed while it is trying to read or write to a pipe then + the data in the pipe is likely to become corrupted, because it may become + impossible to be sure where the message boundaries lie. + + +Synchronization primitives +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Generally synchronization primitives are not as necessary in a multiprocess +program as they are in a mulithreaded program. See the documentation for the +standard library's :mod:`threading` module. + +Note that one can also create synchronization primitives by using a manager +object -- see :ref:`multiprocessing-managers`. + +.. class:: BoundedSemaphore([value]) + + A bounded semaphore object: a clone of :class:`threading.BoundedSemaphore`. + + (On Mac OSX this is indistiguishable from :class:`Semaphore` because + ``sem_getvalue()`` is not implemented on that platform). + +.. class:: Condition([lock]) + + A condition variable: a clone of `threading.Condition`. + + If *lock* is specified then it should be a :class:`Lock` or :class:`RLock` + object from :mod:`multiprocessing`. + +.. class:: Event() + + A clone of :class:`threading.Event`. + +.. class:: Lock() + + A non-recursive lock object: a clone of :class:`threading.Lock`. + +.. class:: RLock() + + A recursive lock object: a clone of :class:`threading.RLock`. + +.. class:: Semaphore([value]) + + A bounded semaphore object: a clone of :class:`threading.Semaphore`. + +.. note:: + + The :meth:`acquire()` method of :class:`BoundedSemaphore`, :class:`Lock`, + :class:`RLock` and :class:`Semaphore` has a timeout parameter not supported + by the equivalents in :mod:`threading`. The signature is + ``acquire(block=True, timeout=None)`` with keyword parameters being + acceptable. If *block* is ``True`` and *timeout* is not ``None`` then it + specifies a timeout in seconds. If *block* is ``False`` then *timeout* is + ignored. + +.. note:: + + If the SIGINT signal generated by Ctrl-C arrives while the main thread is + blocked by a call to :meth:`BoundedSemaphore.acquire`, :meth:`Lock.acquire`, + :meth:`RLock.acquire`, :meth:`Semaphore.acquire`, :meth:`Condition.acquire` + or :meth:`Condition.wait` then the call will be immediately interrupted and + :exc:`KeyboardInterrupt` will be raised. + + This differs from the behaviour of :mod:`threading` where SIGINT will be + ignored while the equivalent blocking calls are in progress. + + +Shared :mod:`ctypes` Objects +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +It is possible to create shared objects using shared memory which can be +inherited by child processes. + +.. function:: Value(typecode_or_type[, lock[, *args]]) + + Return a :mod:`ctypes` object allocated from shared memory. By default the + return value is actually a synchronized wrapper for the object. + + *typecode_or_type* determines the type of the returned object: it is either a + ctypes type or a one character typecode of the kind used by the :mod:`array` + module. *\*args* is passed on to the constructor for the type. + + If *lock* is ``True`` (the default) then a new lock object is created to + synchronize access to the value. If *lock* is a :class:`Lock` or + :class:`RLock` object then that will be used to synchronize access to the + value. If *lock* is ``False`` then access to the returned object will not be + automatically protected by a lock, so it will not necessarily be + "process-safe". + + Note that *lock* is a keyword-only argument. + +.. function:: Array(typecode_or_type, size_or_initializer, *, lock=True) + + Return a ctypes array allocated from shared memory. By default the return + value is actually a synchronized wrapper for the array. + + *typecode_or_type* determines the type of the elements of the returned array: + it is either a ctypes type or a one character typecode of the kind used by + the :mod:`array` module. If *size_or_initializer* is an integer, then it + determines the length of the array, and the array will be initially zeroed. + Otherwise, *size_or_initializer* is a sequence which is used to initialize + the array and whose length determines the length of the array. + + If *lock* is ``True`` (the default) then a new lock object is created to + synchronize access to the value. If *lock* is a :class:`Lock` or + :class:`RLock` object then that will be used to synchronize access to the + value. If *lock* is ``False`` then access to the returned object will not be + automatically protected by a lock, so it will not necessarily be + "process-safe". + + Note that *lock* is a keyword only argument. + + Note that an array of :data:`ctypes.c_char` has *value* and *rawvalue* + attributes which allow one to use it to store and retrieve strings. + + +The :mod:`multiprocessing.sharedctypes` module +>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> + +.. module:: multiprocessing.sharedctypes + :synopsis: Allocate ctypes objects from shared memory. + +The :mod:`multiprocessing.sharedctypes` module provides functions for allocating +:mod:`ctypes` objects from shared memory which can be inherited by child +processes. + +.. note:: + + Although it is posible to store a pointer in shared memory remember that this + will refer to a location in the address space of a specific process. + However, the pointer is quite likely to be invalid in the context of a second + process and trying to dereference the pointer from the second process may + cause a crash. + +.. function:: RawArray(typecode_or_type, size_or_initializer) + + Return a ctypes array allocated from shared memory. + + *typecode_or_type* determines the type of the elements of the returned array: + it is either a ctypes type or a one character typecode of the kind used by + the :mod:`array` module. If *size_or_initializer* is an integer then it + determines the length of the array, and the array will be initially zeroed. + Otherwise *size_or_initializer* is a sequence which is used to initialize the + array and whose length determines the length of the array. + + Note that setting and getting an element is potentially non-atomic -- use + :func:`Array` instead to make sure that access is automatically synchronized + using a lock. + +.. function:: RawValue(typecode_or_type, *args) + + Return a ctypes object allocated from shared memory. + + *typecode_or_type* determines the type of the returned object: it is either a + ctypes type or a one character typecode of the kind used by the :mod:`array` + module. */*args* is passed on to the constructor for the type. + + Note that setting and getting the value is potentially non-atomic -- use + :func:`Value` instead to make sure that access is automatically synchronized + using a lock. + + Note that an array of :data:`ctypes.c_char` has ``value`` and ``rawvalue`` + attributes which allow one to use it to store and retrieve strings -- see + documentation for :mod:`ctypes`. + +.. function:: Array(typecode_or_type, size_or_initializer[, lock[, *args]]) + + The same as :func:`RawArray` except that depending on the value of *lock* a + process-safe synchronization wrapper may be returned instead of a raw ctypes + array. + + If *lock* is ``True`` (the default) then a new lock object is created to + synchronize access to the value. If *lock* is a :class:`Lock` or + :class:`RLock` object then that will be used to synchronize access to the + value. If *lock* is ``False`` then access to the returned object will not be + automatically protected by a lock, so it will not necessarily be + "process-safe". + + Note that *lock* is a keyword-only argument. + +.. function:: Value(typecode_or_type, *args[, lock]) + + The same as :func:`RawValue` except that depending on the value of *lock* a + process-safe synchronization wrapper may be returned instead of a raw ctypes + object. + + If *lock* is ``True`` (the default) then a new lock object is created to + synchronize access to the value. If *lock* is a :class:`Lock` or + :class:`RLock` object then that will be used to synchronize access to the + value. If *lock* is ``False`` then access to the returned object will not be + automatically protected by a lock, so it will not necessarily be + "process-safe". + + Note that *lock* is a keyword-only argument. + +.. function:: copy(obj) + + Return a ctypes object allocated from shared memory which is a copy of the + ctypes object *obj*. + +.. function:: synchronized(obj[, lock]) + + Return a process-safe wrapper object for a ctypes object which uses *lock* to + synchronize access. If *lock* is ``None`` (the default) then a + :class:`multiprocessing.RLock` object is created automatically. + + A synchronized wrapper will have two methods in addition to those of the + object it wraps: :meth:`get_obj()` returns the wrapped object and + :meth:`get_lock()` returns the lock object used for synchronization. + + Note that accessing the ctypes object through the wrapper can be a lot slower + han accessing the raw ctypes object. + + +The table below compares the syntax for creating shared ctypes objects from +shared memory with the normal ctypes syntax. (In the table ``MyStruct`` is some +subclass of :class:`ctypes.Structure`.) + +==================== ========================== =========================== +ctypes sharedctypes using type sharedctypes using typecode +==================== ========================== =========================== +c_double(2.4) RawValue(c_double, 2.4) RawValue('d', 2.4) +MyStruct(4, 6) RawValue(MyStruct, 4, 6) +(c_short * 7)() RawArray(c_short, 7) RawArray('h', 7) +(c_int * 3)(9, 2, 8) RawArray(c_int, (9, 2, 8)) RawArray('i', (9, 2, 8)) +==================== ========================== =========================== + + +Below is an example where a number of ctypes objects are modified by a child +process:: + + from multiprocessing import Process, Lock + from multiprocessing.sharedctypes import Value, Array + from ctypes import Structure, c_double + + class Point(Structure): + _fields_ = [('x', c_double), ('y', c_double)] + + def modify(n, x, s, A): + n.value **= 2 + x.value **= 2 + s.value = s.value.upper() + for a in A: + a.x **= 2 + a.y **= 2 + + if __name__ == '__main__': + lock = Lock() + + n = Value('i', 7) + x = Value(ctypes.c_double, 1.0/3.0, lock=False) + s = Array('c', 'hello world', lock=lock) + A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock) + + p = Process(target=modify, args=(n, x, s, A)) + p.start() + p.join() + + print n.value + print x.value + print s.value + print [(a.x, a.y) for a in A] + + +.. highlightlang:: none + +The results printed are :: + + 49 + 0.1111111111111111 + HELLO WORLD + [(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)] + +.. highlightlang:: python + + +.. _multiprocessing-managers: + +Managers +~~~~~~~~ + +Managers provide a way to create data which can be shared between different +processes. A manager object controls a server process which manages *shared +objects*. Other processes can access the shared objects by using proxies. + +.. function:: multiprocessing.Manager() + + Returns a started :class:`SyncManager` object which can be used for sharing + objects between processes. The returned manager object corresponds to a + spawned child process and has methods which will create shared objects and + return corresponding proxies. + +.. module:: multiprocessing.managers + :synopsis: Share data between process with shared objects. + +Manager processes will be shutdown as soon as they are garbage collected or +their parent process exits. The manager classes are defined in the +:mod:`multiprocessing.managers` module: + +.. class:: BaseManager([address[, authkey]]) + + Create a BaseManager object. + + Once created one should call :meth:`start` or :meth:`serve_forever` to ensure + that the manager object refers to a started manager process. + + *address* is the address on which the manager process listens for new + connections. If *address* is ``None`` then an arbitrary one is chosen. + + *authkey* is the authentication key which will be used to check the validity + of incoming connections to the server process. If *authkey* is ``None`` then + ``current_process().get_auth_key()``. Otherwise *authkey* is used and it + must be a string. + + .. method:: start() + + Start a subprocess to start the manager. + + .. method:: server_forever() + + Run the server in the current process. + + .. method:: from_address(address, authkey) + + A class method which creates a manager object referring to a pre-existing + server process which is using the given address and authentication key. + + .. method:: shutdown() + + Stop the process used by the manager. This is only available if + meth:`start` has been used to start the server process. + + This can be called multiple times. + + .. method:: register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]]) + + A classmethod which can be used for registering a type or callable with + the manager class. + + *typeid* is a "type identifier" which is used to identify a particular + type of shared object. This must be a string. + + *callable* is a callable used for creating objects for this type + identifier. If a manager instance will be created using the + :meth:`from_address()` classmethod or if the *create_method* argument is + ``False`` then this can be left as ``None``. + + *proxytype* is a subclass of :class:`multiprocessing.managers.BaseProxy` + which is used to create proxies for shared objects with this *typeid*. If + ``None`` then a proxy class is created automatically. + + *exposed* is used to specify a sequence of method names which proxies for + this typeid should be allowed to access using + :meth:`BaseProxy._callMethod`. (If *exposed* is ``None`` then + :attr:`proxytype._exposed_` is used instead if it exists.) In the case + where no exposed list is specified, all "public methods" of the shared + object will be accessible. (Here a "public method" means any attribute + which has a ``__call__()`` method and whose name does not begin with + ``'_'``.) + + *method_to_typeid* is a mapping used to specify the return type of those + exposed methods which should return a proxy. It maps method names to + typeid strings. (If *method_to_typeid* is ``None`` then + :attr:`proxytype._method_to_typeid_` is used instead if it exists.) If a + method's name is not a key of this mapping or if the mapping is ``None`` + then the object returned by the method will be copied by value. + + *create_method* determines whether a method should be created with name + *typeid* which can be used to tell the server process to create a new + shared object and return a proxy for it. By default it is ``True``. + + :class:`BaseManager` instances also have one read-only property: + + .. attribute:: address + + The address used by the manager. + + +.. class:: SyncManager + + A subclass of :class:`BaseManager` which can be used for the synchronization + of processes. Objects of this type are returned by + :func:`multiprocessing.Manager()`. + + It also supports creation of shared lists and dictionaries. + + .. method:: BoundedSemaphore([value]) + + Create a shared :class:`threading.BoundedSemaphore` object and return a + proxy for it. + + .. method:: Condition([lock]) + + Create a shared :class:`threading.Condition` object and return a proxy for + it. + + If *lock* is supplied then it should be a proxy for a + :class:`threading.Lock` or :class:`threading.RLock` object. + + .. method:: Event() + + Create a shared :class:`threading.Event` object and return a proxy for it. + + .. method:: Lock() + + Create a shared :class:`threading.Lock` object and return a proxy for it. + + .. method:: Namespace() + + Create a shared :class:`Namespace` object and return a proxy for it. + + .. method:: Queue([maxsize]) + + Create a shared `Queue.Queue` object and return a proxy for it. + + .. method:: RLock() + + Create a shared :class:`threading.RLock` object and return a proxy for it. + + .. method:: Semaphore([value]) + + Create a shared :class:`threading.Semaphore` object and return a proxy for + it. + + .. method:: Array(typecode, sequence) + + Create an array and return a proxy for it. (*format* is ignored.) + + .. method:: Value(typecode, value) + + Create an object with a writable ``value`` attribute and return a proxy + for it. + + .. method:: dict() + dict(mapping) + dict(sequence) + + Create a shared ``dict`` object and return a proxy for it. + + .. method:: list() + list(sequence) + + Create a shared ``list`` object and return a proxy for it. + + +Namespace objects +>>>>>>>>>>>>>>>>> + +A namespace object has no public methods, but does have writable attributes. +Its representation shows the values of its attributes. + +However, when using a proxy for a namespace object, an attribute beginning with +``'_'`` will be an attribute of the proxy and not an attribute of the referent:: + + >>> manager = multiprocessing.Manager() + >>> Global = manager.Namespace() + >>> Global.x = 10 + >>> Global.y = 'hello' + >>> Global._z = 12.3 # this is an attribute of the proxy + >>> print Global + Namespace(x=10, y='hello') + + +Customized managers +>>>>>>>>>>>>>>>>>>> + +To create one's own manager, one creates a subclass of :class:`BaseManager` and +use the :meth:`resgister()` classmethod to register new types or callables with +the manager class. For example:: + + from multiprocessing.managers import BaseManager + + class MathsClass(object): + def add(self, x, y): + return x + y + def mul(self, x, y): + return x * y + + class MyManager(BaseManager): + pass + + MyManager.register('Maths', MathsClass) + + if __name__ == '__main__': + manager = MyManager() + manager.start() + maths = manager.Maths() + print maths.add(4, 3) # prints 7 + print maths.mul(7, 8) # prints 56 + + +Using a remote manager +>>>>>>>>>>>>>>>>>>>>>> + +It is possible to run a manager server on one machine and have clients use it +from other machines (assuming that the firewalls involved allow it). + +Running the following commands creates a server for a single shared queue which +remote clients can access:: + + >>> from multiprocessing.managers import BaseManager + >>> import Queue + >>> queue = Queue.Queue() + >>> class QueueManager(BaseManager): pass + ... + >>> QueueManager.register('getQueue', callable=lambda:queue) + >>> m = QueueManager(address=('', 50000), authkey='abracadabra') + >>> m.serveForever() + +One client can access the server as follows:: + + >>> from multiprocessing.managers import BaseManager + >>> class QueueManager(BaseManager): pass + ... + >>> QueueManager.register('getQueue') + >>> m = QueueManager.from_address(address=('foo.bar.org', 50000), + >>> authkey='abracadabra') + >>> queue = m.getQueue() + >>> queue.put('hello') + +Another client can also use it:: + + >>> from multiprocessing.managers import BaseManager + >>> class QueueManager(BaseManager): pass + ... + >>> QueueManager.register('getQueue') + >>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='abracadabra') + >>> queue = m.getQueue() + >>> queue.get() + 'hello' + + +Proxy Objects +~~~~~~~~~~~~~ + +A proxy is an object which *refers* to a shared object which lives (presumably) +in a different process. The shared object is said to be the *referent* of the +proxy. Multiple proxy objects may have the same referent. + +A proxy object has methods which invoke corresponding methods of its referent +(although not every method of the referent will necessarily be available through +the proxy). A proxy can usually be used in most of the same ways that its +referent can:: + + >>> from multiprocessing import Manager + >>> manager = Manager() + >>> l = manager.list([i*i for i in range(10)]) + >>> print l + [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] + >>> print repr(l) + <ListProxy object, typeid 'list' at 0xb799974c> + >>> l[4] + 16 + >>> l[2:5] + [4, 9, 16] + +Notice that applying :func:`str` to a proxy will return the representation of +the referent, whereas applying :func:`repr` will return the representation of +the proxy. + +An important feature of proxy objects is that they are picklable so they can be +passed between processes. Note, however, that if a proxy is sent to the +corresponding manager's process then unpickling it will produce the referent +itself. This means, for example, that one shared object can contain a second:: + + >>> a = manager.list() + >>> b = manager.list() + >>> a.append(b) # referent of `a` now contains referent of `b` + >>> print a, b + [[]] [] + >>> b.append('hello') + >>> print a, b + [['hello']] ['hello'] + +.. note:: + + The proxy types in :mod:`multiprocessing` do nothing to support comparisons + by value. So, for instance, :: + + manager.list([1,2,3]) == [1,2,3] + + will return ``False``. One should just use a copy of the referent instead + when making comparisons. + +.. class:: BaseProxy + + Proxy objects are instances of subclasses of :class:`BaseProxy`. + + .. method:: _call_method(methodname[, args[, kwds]]) + + Call and return the result of a method of the proxy's referent. + + If ``proxy`` is a proxy whose referent is ``obj`` then the expression :: + + proxy._call_method(methodname, args, kwds) + + will evaluate the expression :: + + getattr(obj, methodname)(*args, **kwds) + + in the manager's process. + + The returned value will be a copy of the result of the call or a proxy to + a new shared object -- see documentation for the *method_to_typeid* + argument of :meth:`BaseManager.register`. + + If an exception is raised by the call, then then is re-raised by + :meth:`_call_method`. If some other exception is raised in the manager's + process then this is converted into a :exc:`RemoteError` exception and is + raised by :meth:`_call_method`. + + Note in particular that an exception will be raised if *methodname* has + not been *exposed* + + An example of the usage of :meth:`_call_method()`:: + + >>> l = manager.list(range(10)) + >>> l._call_method('__len__') + 10 + >>> l._call_method('__getslice__', (2, 7)) # equiv to `l[2:7]` + [2, 3, 4, 5, 6] + >>> l._call_method('__getitem__', (20,)) # equiv to `l[20]` + Traceback (most recent call last): + ... + IndexError: list index out of range + + .. method:: _get_value() + + Return a copy of the referent. + + If the referent is unpicklable then this will raise an exception. + + .. method:: __repr__ + + Return a representation of the proxy object. + + .. method:: __str__ + + Return the representation of the referent. + + +Cleanup +>>>>>>> + +A proxy object uses a weakref callback so that when it gets garbage collected it +deregisters itself from the manager which owns its referent. + +A shared object gets deleted from the manager process when there are no longer +any proxies referring to it. + + +Process Pools +~~~~~~~~~~~~~ + +.. module:: multiprocessing.pool + :synopsis: Create pools of processes. + +One can create a pool of processes which will carry out tasks submitted to it +with the :class:`Pool` class in :mod:`multiprocess.pool`. + +.. class:: multiprocessing.Pool([processes[, initializer[, initargs]]]) + + A process pool object which controls a pool of worker processes to which jobs + can be submitted. It supports asynchronous results with timeouts and + callbacks and has a parallel map implementation. + + *processes* is the number of worker processes to use. If *processes* is + ``None`` then the number returned by :func:`cpu_count` is used. If + *initializer* is not ``None`` then each worker process will call + ``initializer(*initargs)`` when it starts. + + .. method:: apply(func[, args[, kwds]]) + + Equivalent of the :func:`apply` builtin function. It blocks till the + result is ready. + + .. method:: apply_async(func[, args[, kwds[, callback]]]) + + A variant of the :meth:`apply` method which returns a result object. + + If *callback* is specified then it should be a callable which accepts a + single argument. When the result becomes ready *callback* is applied to + it (unless the call failed). *callback* should complete immediately since + otherwise the thread which handles the results will get blocked. + + .. method:: map(func, iterable[, chunksize]) + + A parallel equivalent of the :func:`map` builtin function. It blocks till + the result is ready. + + This method chops the iterable into a number of chunks which it submits to + the process pool as separate tasks. The (approximate) size of these + chunks can be specified by setting *chunksize* to a positive integer. + + .. method:: map_async(func, iterable[, chunksize[, callback]]) + + A variant of the :meth:`.map` method which returns a result object. + + If *callback* is specified then it should be a callable which accepts a + single argument. When the result becomes ready *callback* is applied to + it (unless the call failed). *callback* should complete immediately since + otherwise the thread which handles the results will get blocked. + + .. method:: imap(func, iterable[, chunksize]) + + An equivalent of :func:`itertools.imap`. + + The *chunksize* argument is the same as the one used by the :meth:`.map` + method. For very long iterables using a large value for *chunksize* can + make make the job complete **much** faster than using the default value of + ``1``. + + Also if *chunksize* is ``1`` then the :meth:`next` method of the iterator + returned by the :meth:`imap` method has an optional *timeout* parameter: + ``next(timeout)`` will raise :exc:`multiprocessing.TimeoutError` if the + result cannot be returned within *timeout* seconds. + + .. method:: imap_unordered(func, iterable[, chunksize]) + + The same as :meth:`imap` except that the ordering of the results from the + returned iterator should be considered arbitrary. (Only when there is + only one worker process is the order guaranteed to be "correct".) + + .. method:: close() + + Prevents any more tasks from being submitted to the pool. Once all the + tasks have been completed the worker processes will exit. + + .. method:: terminate() + + Stops the worker processes immediately without completing outstanding + work. When the pool object is garbage collected :meth:`terminate` will be + called immediately. + + .. method:: join() + + Wait for the worker processes to exit. One must call :meth:`close` or + :meth:`terminate` before using :meth:`join`. + + +.. class:: AsyncResult + + The class of the result returned by :meth:`Pool.apply_async` and + :meth:`Pool.map_async`. + + .. method:: get([timeout) + + Return the result when it arrives. If *timeout* is not ``None`` and the + result does not arrive within *timeout* seconds then + :exc:`multiprocessing.TimeoutError` is raised. If the remote call raised + an exception then that exception will be reraised by :meth:`get`. + + .. method:: wait([timeout]) + + Wait until the result is available or until *timeout* seconds pass. + + .. method:: ready() + + Return whether the call has completed. + + .. method:: successful() + + Return whether the call completed without raising an exception. Will + raise :exc:`AssertionError` if the result is not ready. + +The following example demonstrates the use of a pool:: + + from multiprocessing import Pool + + def f(x): + return x*x + + if __name__ == '__main__': + pool = Pool(processes=4) # start 4 worker processes + + result = pool.applyAsync(f, (10,)) # evaluate "f(10)" asynchronously + print result.get(timeout=1) # prints "100" unless your computer is *very* slow + + print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]" + + it = pool.imap(f, range(10)) + print it.next() # prints "0" + print it.next() # prints "1" + print it.next(timeout=1) # prints "4" unless your computer is *very* slow + + import time + result = pool.applyAsync(time.sleep, (10,)) + print result.get(timeout=1) # raises TimeoutError + + +.. _multiprocessing-listeners-clients: + +Listeners and Clients +~~~~~~~~~~~~~~~~~~~~~ + +.. module:: multiprocessing.connection + :synopsis: API for dealing with sockets. + +Usually message passing between processes is done using queues or by using +:class:`Connection` objects returned by :func:`Pipe`. + +However, the :mod:`multiprocessing.connection` module allows some extra +flexibility. It basically gives a high level message oriented API for dealing +with sockets or Windows named pipes, and also has support for *digest +authentication* using the :mod:`hmac` module from the standard library. + + +.. function:: deliver_challenge(connection, authkey) + + Send a randomly generated message to the other end of the connection and wait + for a reply. + + If the reply matches the digest of the message using *authkey* as the key + then a welcome message is sent to the other end of the connection. Otherwise + :exc:`AuthenticationError` is raised. + +.. function:: answerChallenge(connection, authkey) + + Receive a message, calculate the digest of the message using *authkey* as the + key, and then send the digest back. + + If a welcome message is not received, then :exc:`AuthenticationError` is + raised. + +.. function:: Client(address[, family[, authenticate[, authkey]]]) + + Attempt to set up a connection to the listener which is using address + *address*, returning a :class:`Connection`. + + The type of the connection is determined by *family* argument, but this can + generally be omitted since it can usually be inferred from the format of + *address*. (See :ref:`multiprocessing-address-formats`) + + If *authentication* is ``True`` or *authkey* is a string then digest + authentication is used. The key used for authentication will be either + *authkey* or ``current_process().get_auth_key()`` if *authkey* is ``None``. + If authentication fails then :exc:`AuthenticationError` is raised. See + :ref:`multiprocessing-auth-keys`. + +.. class:: Listener([address[, family[, backlog[, authenticate[, authkey]]]]]) + + A wrapper for a bound socket or Windows named pipe which is 'listening' for + connections. + + *address* is the address to be used by the bound socket or named pipe of the + listener object. + + *family* is the type of socket (or named pipe) to use. This can be one of + the strings ``'AF_INET'`` (for a TCP socket), ``'AF_UNIX'`` (for a Unix + domain socket) or ``'AF_PIPE'`` (for a Windows named pipe). Of these only + the first is guaranteed to be available. If *family* is ``None`` then the + family is inferred from the format of *address*. If *address* is also + ``None`` then a default is chosen. This default is the family which is + assumed to be the fastest available. See + :ref:`multiprocessing-address-formats`. Note that if *family* is + ``'AF_UNIX'`` and address is ``None`` then the socket will be created in a + private temporary directory created using :func:`tempfile.mkstemp`. + + If the listener object uses a socket then *backlog* (1 by default) is passed + to the :meth:`listen` method of the socket once it has been bound. + + If *authenticate* is ``True`` (``False`` by default) or *authkey* is not + ``None`` then digest authentication is used. + + If *authkey* is a string then it will be used as the authentication key; + otherwise it must be *None*. + + If *authkey* is ``None`` and *authenticate* is ``True`` then + ``current_process().get_auth_key()`` is used as the authentication key. If + *authkey* is ``None`` and *authentication* is ``False`` then no + authentication is done. If authentication fails then + :exc:`AuthenticationError` is raised. See :ref:`multiprocessing-auth-keys`. + + .. method:: accept() + + Accept a connection on the bound socket or named pipe of the listener + object and return a :class:`Connection` object. If authentication is + attempted and fails, then :exc:`AuthenticationError` is raised. + + .. method:: close() + + Close the bound socket or named pipe of the listener object. This is + called automatically when the listener is garbage collected. However it + is advisable to call it explicitly. + + Listener objects have the following read-only properties: + + .. attribute:: address + + The address which is being used by the Listener object. + + .. attribute:: last_accepted + + The address from which the last accepted connection came. If this is + unavailable then it is ``None``. + + +The module defines two exceptions: + +.. exception:: AuthenticationError + + Exception raised when there is an authentication error. + +.. exception:: BufferTooShort + + Exception raise by the :meth:`Connection.recv_bytes_into` method of a + connection object when the supplied buffer object is too small for the + message read. + + If *e* is an instance of :exc:`BufferTooShort` then ``e.args[0]`` will give + the message as a byte string. + + +**Examples** + +The following server code creates a listener which uses ``'secret password'`` as +an authentication key. It then waits for a connection and sends some data to +the client:: + + from multiprocessing.connection import Listener + from array import array + + address = ('localhost', 6000) # family is deduced to be 'AF_INET' + listener = Listener(address, authkey='secret password') + + conn = listener.accept() + print 'connection accepted from', listener.last_accepted + + conn.send([2.25, None, 'junk', float]) + + conn.send_bytes('hello') + + conn.send_bytes(array('i', [42, 1729])) + + conn.close() + listener.close() + +The following code connects to the server and receives some data from the +server:: + + from multiprocessing.connection import Client + from array import array + + address = ('localhost', 6000) + conn = Client(address, authkey='secret password') + + print conn.recv() # => [2.25, None, 'junk', float] + + print conn.recv_bytes() # => 'hello' + + arr = array('i', [0, 0, 0, 0, 0]) + print conn.recv_bytes_into(arr) # => 8 + print arr # => array('i', [42, 1729, 0, 0, 0]) + + conn.close() + + +.. _multiprocessing-address-formats: + +Address Formats +>>>>>>>>>>>>>>> + +* An ``'AF_INET'`` address is a tuple of the form ``(hostname, port)``` where + *hostname* is a string and *port* is an integer. + +* An ``'AF_UNIX'``` address is a string representing a filename on the + filesystem. + +* An ``'AF_PIPE'`` address is a string of the form + ``r'\\\\.\\pipe\\PipeName'``. To use :func:`Client` to connect to a named + pipe on a remote computer called ServerName* one should use an address of the + form ``r'\\\\ServerName\\pipe\\PipeName'`` instead. + +Note that any string beginning with two backslashes is assumed by default to be +an ``'AF_PIPE'`` address rather than an ``'AF_UNIX'`` address. + + +.. _multiprocessing-auth-keys: + +Authentication keys +~~~~~~~~~~~~~~~~~~~ + +When one uses :meth:`Connection.recv`, the data received is automatically +unpickled. Unfortunately unpickling data from an untrusted source is a security +risk. Therefore :class:`Listener` and :func:`Client` use the :mod:`hmac` module +to provide digest authentication. + +An authentication key is a string which can be thought of as a password: once a +connection is established both ends will demand proof that the other knows the +authentication key. (Demonstrating that both ends are using the same key does +**not** involve sending the key over the connection.) + +If authentication is requested but do authentication key is specified then the +return value of ``current_process().get_auth_key`` is used (see +:class:`Process`). This value will automatically inherited by any +:class:`Process` object that the current process creates. This means that (by +default) all processes of a multi-process program will share a single +authentication key which can be used when setting up connections between the +themselves. + +Suitable authentication keys can also be generated by using :func:`os.urandom`. + + +Logging +~~~~~~~ + +Some support for logging is available. Note, however, that the :mod:`logging` +package does not use process shared locks so it is possible (depending on the +handler type) for messages from different processes to get mixed up. + +.. currentmodule:: multiprocessing +.. function:: get_logger() + + Returns the logger used by :mod:`multiprocessing`. If necessary, a new one + will be created. + + When first created the logger has level :data:`logging.NOTSET` and has a + handler which sends output to :data:`sys.stderr` using format + ``'[%(levelname)s/%(processName)s] %(message)s'``. (The logger allows use of + the non-standard ``'%(processName)s'`` format.) Message sent to this logger + will not by default propogate to the root logger. + + Note that on Windows child processes will only inherit the level of the + parent process's logger -- any other customization of the logger will not be + inherited. + +Below is an example session with logging turned on:: + + >>> import processing, logging + >>> logger = processing.getLogger() + >>> logger.setLevel(logging.INFO) + >>> logger.warning('doomed') + [WARNING/MainProcess] doomed + >>> m = processing.Manager() + [INFO/SyncManager-1] child process calling self.run() + [INFO/SyncManager-1] manager bound to '\\\\.\\pipe\\pyc-2776-0-lj0tfa' + >>> del m + [INFO/MainProcess] sending shutdown message to manager + [INFO/SyncManager-1] manager exiting with exitcode 0 + + +The :mod:`multiprocessing.dummy` module +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. module:: multiprocessing.dummy + :synopsis: Dumb wrapper around threading. + +:mod:`multiprocessing.dummy` replicates the API of :mod:`multiprocessing` but is +no more than a wrapper around the `threading` module. + + +.. _multiprocessing-programming: + +Programming guidelines +---------------------- + +There are certain guidelines and idioms which should be adhered to when using +:mod:`multiprocessing`. + + +All platforms +~~~~~~~~~~~~~ + +Avoid shared state + + As far as possible one should try to avoid shifting large amounts of data + between processes. + + It is probably best to stick to using queues or pipes for communication + between processes rather than using the lower level synchronization + primitives from the :mod:`threading` module. + +Picklability + + Ensure that the arguments to the methods of proxies are picklable. + +Thread safety of proxies + + Do not use a proxy object from more than one thread unless you protect it + with a lock. + + (There is never a problem with different processes using the *same* proxy.) + +Joining zombie processes + + On Unix when a process finishes but has not been joined it becomes a zombie. + There should never be very many because each time a new process starts (or + :func:`active_children` is called) all completed processes which have not + yet been joined will be joined. Also calling a finished process's + :meth:`Process.is_alive` will join the process. Even so it is probably good + practice to explicitly join all the processes that you start. + +Better to inherit than pickle/unpickle + + On Windows many of types from :mod:`multiprocessing` need to be picklable so + that child processes can use them. However, one should generally avoid + sending shared objects to other processes using pipes or queues. Instead + you should arrange the program so that a process which need access to a + shared resource created elsewhere can inherit it from an ancestor process. + +Avoid terminating processes + + Using the :meth:`Process.terminate` method to stop a process is liable to + cause any shared resources (such as locks, semaphores, pipes and queues) + currently being used by the process to become broken or unavailable to other + processes. + + Therefore it is probably best to only consider using + :meth:`Process.terminate()` on processes which never use any shared + resources. + +Joining processes that use queues + + Bear in mind that a process that has put items in a queue will wait before + terminating until all the buffered items are fed by the "feeder" thread to + the underlying pipe. (The child process can call the + :meth:`Queue.cancel_join` method of the queue to avoid this behaviour.) + + This means that whenever you use a queue you need to make sure that all + items which have been put on the queue will eventually be removed before the + process is joined. Otherwise you cannot be sure that processes which have + put items on the queue will terminate. Remember also that non-daemonic + processes will be automatically be joined. + + An example which will deadlock is the following:: + + from multiprocessing import Process, Queue + + def f(q): + q.put('X' * 1000000) + + if __name__ == '__main__': + queue = Queue() + p = Process(target=f, args=(queue,)) + p.start() + p.join() # this deadlocks + obj = queue.get() + + A fix here would be to swap the last two lines round (or simply remove the + ``p.join()`` line). + +Explicity pass resources to child processes + + On Unix a child process can make use of a shared resource created in a + parent process using a global resource. However, it is better to pass the + object as an argument to the constructor for the child process. + + Apart from making the code (potentially) compatible with Windows this also + ensures that as long as the child process is still alive the object will not + be garbage collected in the parent process. This might be important if some + resource is freed when the object is garbage collected in the parent + process. + + So for instance :: + + from multiprocessing import Process, Lock + + def f(): + ... do something using "lock" ... + + if __name__ == '__main__': + lock = Lock() + for i in range(10): + Process(target=f).start() + + should be rewritten as :: + + from multiprocessing import Process, Lock + + def f(l): + ... do something using "l" ... + + if __name__ == '__main__': + lock = Lock() + for i in range(10): + Process(target=f, args=(lock,)).start() + + +Windows +~~~~~~~ + +Since Windows lacks :func:`os.fork` it has a few extra restrictions: + +More picklability + + Ensure that all arguments to :meth:`Process.__init__` are picklable. This + means, in particular, that bound or unbound methods cannot be used directly + as the ``target`` argument on Windows --- just define a function and use + that instead. + + Also, if you subclass :class:`Process` then make sure that instances will be + picklable when the :meth:`Process.start` method is called. + +Global variables + + Bear in mind that if code run in a child process tries to access a global + variable, then the value it sees (if any) may not be the same as the value + in the parent process at the time that :meth:`Process.start` was called. + + However, global variables which are just module level constants cause no + problems. + +Safe importing of main module + + Make sure that the main module can be safely imported by a new Python + interpreter without causing unintended side effects (such a starting a new + process). + + For example, under Windows running the following module would fail with a + :exc:`RuntimeError`:: + + from multiprocessing import Process + + def foo(): + print 'hello' + + p = Process(target=foo) + p.start() + + Instead one should protect the "entry point" of the program by using ``if + __name__ == '__main__':`` as follows:: + + from multiprocessing import Process, freeze_support + + def foo(): + print 'hello' + + if __name__ == '__main__': + freeze_support() + p = Process(target=foo) + p.start() + + (The :func:`freeze_support()` line can be omitted if the program will be run + normally instead of frozen.) + + This allows the newly spawned Python interpreter to safely import the module + and then run the module's ``foo()`` function. + + Similar restrictions apply if a pool or manager is created in the main + module. + + +.. _multiprocessing-examples: + +Examples +-------- + +Demonstration of how to create and use customized managers and proxies: + +.. literalinclude:: ../includes/mp_newtype.py + + +Using :class:`Pool`: + +.. literalinclude:: ../includes/mp_pool.py + + +Synchronization types like locks, conditions and queues: + +.. literalinclude:: ../includes/mp_synchronize.py + + +An showing how to use queues to feed tasks to a collection of worker process and +collect the results: + +.. literalinclude:: ../includes/mp_workers.py + + +An example of how a pool of worker processes can each run a +:class:`SimpleHTTPServer.HttpServer` instance while sharing a single listening +socket. + +.. literalinclude:: ../includes/mp_webserver.py + + +Some simple benchmarks comparing :mod:`multiprocessing` with :mod:`threading`: + +.. literalinclude:: ../includes/mp_benchmarks.py + +An example/demo of how to use the :class:`managers.SyncManager`, :class:`Process` +and others to build a system which can distribute processes and work via a +distributed queue to a "cluster" of machines on a network, accessible via SSH. +You will need to have private key authentication for all hosts configured for +this to work. + +.. literalinclude:: ../includes/mp_distributing.py
\ No newline at end of file |