summaryrefslogtreecommitdiffstats
path: root/Doc/includes
diff options
context:
space:
mode:
authorBenjamin Peterson <benjamin@python.org>2008-06-11 16:44:04 (GMT)
committerBenjamin Peterson <benjamin@python.org>2008-06-11 16:44:04 (GMT)
commite711cafab13efc9c1fe6c5cd75826401445eb585 (patch)
tree091a6334fdf6ccdcb93027302c5e038570ca04a4 /Doc/includes
parenteec3d7137929611b98dd593cd2f122cd91b723b2 (diff)
downloadcpython-e711cafab13efc9c1fe6c5cd75826401445eb585.zip
cpython-e711cafab13efc9c1fe6c5cd75826401445eb585.tar.gz
cpython-e711cafab13efc9c1fe6c5cd75826401445eb585.tar.bz2
Merged revisions 64104,64117 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk ........ r64104 | benjamin.peterson | 2008-06-10 21:40:25 -0500 (Tue, 10 Jun 2008) | 2 lines add the multiprocessing package to fulfill PEP 371 ........ r64117 | benjamin.peterson | 2008-06-11 07:26:31 -0500 (Wed, 11 Jun 2008) | 2 lines fix import of multiprocessing by juggling imports ........
Diffstat (limited to 'Doc/includes')
-rw-r--r--Doc/includes/mp_benchmarks.py235
-rw-r--r--Doc/includes/mp_distributing.py362
-rw-r--r--Doc/includes/mp_newtype.py98
-rw-r--r--Doc/includes/mp_pool.py311
-rw-r--r--Doc/includes/mp_synchronize.py273
-rw-r--r--Doc/includes/mp_webserver.py67
-rw-r--r--Doc/includes/mp_workers.py87
7 files changed, 1433 insertions, 0 deletions
diff --git a/Doc/includes/mp_benchmarks.py b/Doc/includes/mp_benchmarks.py
new file mode 100644
index 0000000..425d6de
--- /dev/null
+++ b/Doc/includes/mp_benchmarks.py
@@ -0,0 +1,235 @@
+#
+# Simple benchmarks for the multiprocessing package
+#
+
+import time, sys, multiprocessing, threading, Queue, gc
+
+if sys.platform == 'win32':
+ _timer = time.clock
+else:
+ _timer = time.time
+
+delta = 1
+
+
+#### TEST_QUEUESPEED
+
+def queuespeed_func(q, c, iterations):
+ a = '0' * 256
+ c.acquire()
+ c.notify()
+ c.release()
+
+ for i in xrange(iterations):
+ q.put(a)
+
+ q.put('STOP')
+
+def test_queuespeed(Process, q, c):
+ elapsed = 0
+ iterations = 1
+
+ while elapsed < delta:
+ iterations *= 2
+
+ p = Process(target=queuespeed_func, args=(q, c, iterations))
+ c.acquire()
+ p.start()
+ c.wait()
+ c.release()
+
+ result = None
+ t = _timer()
+
+ while result != 'STOP':
+ result = q.get()
+
+ elapsed = _timer() - t
+
+ p.join()
+
+ print iterations, 'objects passed through the queue in', elapsed, 'seconds'
+ print 'average number/sec:', iterations/elapsed
+
+
+#### TEST_PIPESPEED
+
+def pipe_func(c, cond, iterations):
+ a = '0' * 256
+ cond.acquire()
+ cond.notify()
+ cond.release()
+
+ for i in xrange(iterations):
+ c.send(a)
+
+ c.send('STOP')
+
+def test_pipespeed():
+ c, d = multiprocessing.Pipe()
+ cond = multiprocessing.Condition()
+ elapsed = 0
+ iterations = 1
+
+ while elapsed < delta:
+ iterations *= 2
+
+ p = multiprocessing.Process(target=pipe_func,
+ args=(d, cond, iterations))
+ cond.acquire()
+ p.start()
+ cond.wait()
+ cond.release()
+
+ result = None
+ t = _timer()
+
+ while result != 'STOP':
+ result = c.recv()
+
+ elapsed = _timer() - t
+ p.join()
+
+ print iterations, 'objects passed through connection in',elapsed,'seconds'
+ print 'average number/sec:', iterations/elapsed
+
+
+#### TEST_SEQSPEED
+
+def test_seqspeed(seq):
+ elapsed = 0
+ iterations = 1
+
+ while elapsed < delta:
+ iterations *= 2
+
+ t = _timer()
+
+ for i in xrange(iterations):
+ a = seq[5]
+
+ elapsed = _timer()-t
+
+ print iterations, 'iterations in', elapsed, 'seconds'
+ print 'average number/sec:', iterations/elapsed
+
+
+#### TEST_LOCK
+
+def test_lockspeed(l):
+ elapsed = 0
+ iterations = 1
+
+ while elapsed < delta:
+ iterations *= 2
+
+ t = _timer()
+
+ for i in xrange(iterations):
+ l.acquire()
+ l.release()
+
+ elapsed = _timer()-t
+
+ print iterations, 'iterations in', elapsed, 'seconds'
+ print 'average number/sec:', iterations/elapsed
+
+
+#### TEST_CONDITION
+
+def conditionspeed_func(c, N):
+ c.acquire()
+ c.notify()
+
+ for i in xrange(N):
+ c.wait()
+ c.notify()
+
+ c.release()
+
+def test_conditionspeed(Process, c):
+ elapsed = 0
+ iterations = 1
+
+ while elapsed < delta:
+ iterations *= 2
+
+ c.acquire()
+ p = Process(target=conditionspeed_func, args=(c, iterations))
+ p.start()
+
+ c.wait()
+
+ t = _timer()
+
+ for i in xrange(iterations):
+ c.notify()
+ c.wait()
+
+ elapsed = _timer()-t
+
+ c.release()
+ p.join()
+
+ print iterations * 2, 'waits in', elapsed, 'seconds'
+ print 'average number/sec:', iterations * 2 / elapsed
+
+####
+
+def test():
+ manager = multiprocessing.Manager()
+
+ gc.disable()
+
+ print '\n\t######## testing Queue.Queue\n'
+ test_queuespeed(threading.Thread, Queue.Queue(),
+ threading.Condition())
+ print '\n\t######## testing multiprocessing.Queue\n'
+ test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
+ multiprocessing.Condition())
+ print '\n\t######## testing Queue managed by server process\n'
+ test_queuespeed(multiprocessing.Process, manager.Queue(),
+ manager.Condition())
+ print '\n\t######## testing multiprocessing.Pipe\n'
+ test_pipespeed()
+
+ print
+
+ print '\n\t######## testing list\n'
+ test_seqspeed(range(10))
+ print '\n\t######## testing list managed by server process\n'
+ test_seqspeed(manager.list(range(10)))
+ print '\n\t######## testing Array("i", ..., lock=False)\n'
+ test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
+ print '\n\t######## testing Array("i", ..., lock=True)\n'
+ test_seqspeed(multiprocessing.Array('i', range(10), lock=True))
+
+ print
+
+ print '\n\t######## testing threading.Lock\n'
+ test_lockspeed(threading.Lock())
+ print '\n\t######## testing threading.RLock\n'
+ test_lockspeed(threading.RLock())
+ print '\n\t######## testing multiprocessing.Lock\n'
+ test_lockspeed(multiprocessing.Lock())
+ print '\n\t######## testing multiprocessing.RLock\n'
+ test_lockspeed(multiprocessing.RLock())
+ print '\n\t######## testing lock managed by server process\n'
+ test_lockspeed(manager.Lock())
+ print '\n\t######## testing rlock managed by server process\n'
+ test_lockspeed(manager.RLock())
+
+ print
+
+ print '\n\t######## testing threading.Condition\n'
+ test_conditionspeed(threading.Thread, threading.Condition())
+ print '\n\t######## testing multiprocessing.Condition\n'
+ test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
+ print '\n\t######## testing condition managed by a server process\n'
+ test_conditionspeed(multiprocessing.Process, manager.Condition())
+
+ gc.enable()
+
+if __name__ == '__main__':
+ multiprocessing.freeze_support()
+ test()
diff --git a/Doc/includes/mp_distributing.py b/Doc/includes/mp_distributing.py
new file mode 100644
index 0000000..24ae8f8
--- /dev/null
+++ b/Doc/includes/mp_distributing.py
@@ -0,0 +1,362 @@
+#
+# Module to allow spawning of processes on foreign host
+#
+# Depends on `multiprocessing` package -- tested with `processing-0.60`
+#
+
+__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
+
+#
+# Imports
+#
+
+import sys
+import os
+import tarfile
+import shutil
+import subprocess
+import logging
+import itertools
+import Queue
+
+try:
+ import cPickle as pickle
+except ImportError:
+ import pickle
+
+from multiprocessing import Process, current_process, cpu_count
+from multiprocessing import util, managers, connection, forking, pool
+
+#
+# Logging
+#
+
+def get_logger():
+ return _logger
+
+_logger = logging.getLogger('distributing')
+_logger.propogate = 0
+
+util.fix_up_logger(_logger)
+_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
+_handler = logging.StreamHandler()
+_handler.setFormatter(_formatter)
+_logger.addHandler(_handler)
+
+info = _logger.info
+debug = _logger.debug
+
+#
+# Get number of cpus
+#
+
+try:
+ slot_count = cpu_count()
+except NotImplemented:
+ slot_count = 1
+
+#
+# Manager type which spawns subprocesses
+#
+
+class HostManager(managers.SyncManager):
+ '''
+ Manager type used for spawning processes on a (presumably) foreign host
+ '''
+ def __init__(self, address, authkey):
+ managers.SyncManager.__init__(self, address, authkey)
+ self._name = 'Host-unknown'
+
+ def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
+ if hasattr(sys.modules['__main__'], '__file__'):
+ main_path = os.path.basename(sys.modules['__main__'].__file__)
+ else:
+ main_path = None
+ data = pickle.dumps((target, args, kwargs))
+ p = self._RemoteProcess(data, main_path)
+ if name is None:
+ temp = self._name.split('Host-')[-1] + '/Process-%s'
+ name = temp % ':'.join(map(str, p.get_identity()))
+ p.set_name(name)
+ return p
+
+ @classmethod
+ def from_address(cls, address, authkey):
+ manager = cls(address, authkey)
+ managers.transact(address, authkey, 'dummy')
+ manager._state.value = managers.State.STARTED
+ manager._name = 'Host-%s:%s' % manager.address
+ manager.shutdown = util.Finalize(
+ manager, HostManager._finalize_host,
+ args=(manager._address, manager._authkey, manager._name),
+ exitpriority=-10
+ )
+ return manager
+
+ @staticmethod
+ def _finalize_host(address, authkey, name):
+ managers.transact(address, authkey, 'shutdown')
+
+ def __repr__(self):
+ return '<Host(%s)>' % self._name
+
+#
+# Process subclass representing a process on (possibly) a remote machine
+#
+
+class RemoteProcess(Process):
+ '''
+ Represents a process started on a remote host
+ '''
+ def __init__(self, data, main_path):
+ assert not main_path or os.path.basename(main_path) == main_path
+ Process.__init__(self)
+ self._data = data
+ self._main_path = main_path
+
+ def _bootstrap(self):
+ forking.prepare({'main_path': self._main_path})
+ self._target, self._args, self._kwargs = pickle.loads(self._data)
+ return Process._bootstrap(self)
+
+ def get_identity(self):
+ return self._identity
+
+HostManager.register('_RemoteProcess', RemoteProcess)
+
+#
+# A Pool class that uses a cluster
+#
+
+class DistributedPool(pool.Pool):
+
+ def __init__(self, cluster, processes=None, initializer=None, initargs=()):
+ self._cluster = cluster
+ self.Process = cluster.Process
+ pool.Pool.__init__(self, processes or len(cluster),
+ initializer, initargs)
+
+ def _setup_queues(self):
+ self._inqueue = self._cluster._SettableQueue()
+ self._outqueue = self._cluster._SettableQueue()
+ self._quick_put = self._inqueue.put
+ self._quick_get = self._outqueue.get
+
+ @staticmethod
+ def _help_stuff_finish(inqueue, task_handler, size):
+ inqueue.set_contents([None] * size)
+
+#
+# Manager type which starts host managers on other machines
+#
+
+def LocalProcess(**kwds):
+ p = Process(**kwds)
+ p.set_name('localhost/' + p.get_name())
+ return p
+
+class Cluster(managers.SyncManager):
+ '''
+ Represents collection of slots running on various hosts.
+
+ `Cluster` is a subclass of `SyncManager` so it allows creation of
+ various types of shared objects.
+ '''
+ def __init__(self, hostlist, modules):
+ managers.SyncManager.__init__(self, address=('localhost', 0))
+ self._hostlist = hostlist
+ self._modules = modules
+ if __name__ not in modules:
+ modules.append(__name__)
+ files = [sys.modules[name].__file__ for name in modules]
+ for i, file in enumerate(files):
+ if file.endswith('.pyc') or file.endswith('.pyo'):
+ files[i] = file[:-4] + '.py'
+ self._files = [os.path.abspath(file) for file in files]
+
+ def start(self):
+ managers.SyncManager.start(self)
+
+ l = connection.Listener(family='AF_INET', authkey=self._authkey)
+
+ for i, host in enumerate(self._hostlist):
+ host._start_manager(i, self._authkey, l.address, self._files)
+
+ for host in self._hostlist:
+ if host.hostname != 'localhost':
+ conn = l.accept()
+ i, address, cpus = conn.recv()
+ conn.close()
+ other_host = self._hostlist[i]
+ other_host.manager = HostManager.from_address(address,
+ self._authkey)
+ other_host.slots = other_host.slots or cpus
+ other_host.Process = other_host.manager.Process
+ else:
+ host.slots = host.slots or slot_count
+ host.Process = LocalProcess
+
+ self._slotlist = [
+ Slot(host) for host in self._hostlist for i in range(host.slots)
+ ]
+ self._slot_iterator = itertools.cycle(self._slotlist)
+ self._base_shutdown = self.shutdown
+ del self.shutdown
+
+ def shutdown(self):
+ for host in self._hostlist:
+ if host.hostname != 'localhost':
+ host.manager.shutdown()
+ self._base_shutdown()
+
+ def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
+ slot = self._slot_iterator.next()
+ return slot.Process(
+ group=group, target=target, name=name, args=args, kwargs=kwargs
+ )
+
+ def Pool(self, processes=None, initializer=None, initargs=()):
+ return DistributedPool(self, processes, initializer, initargs)
+
+ def __getitem__(self, i):
+ return self._slotlist[i]
+
+ def __len__(self):
+ return len(self._slotlist)
+
+ def __iter__(self):
+ return iter(self._slotlist)
+
+#
+# Queue subclass used by distributed pool
+#
+
+class SettableQueue(Queue.Queue):
+ def empty(self):
+ return not self.queue
+ def full(self):
+ return self.maxsize > 0 and len(self.queue) == self.maxsize
+ def set_contents(self, contents):
+ # length of contents must be at least as large as the number of
+ # threads which have potentially called get()
+ self.not_empty.acquire()
+ try:
+ self.queue.clear()
+ self.queue.extend(contents)
+ self.not_empty.notifyAll()
+ finally:
+ self.not_empty.release()
+
+Cluster.register('_SettableQueue', SettableQueue)
+
+#
+# Class representing a notional cpu in the cluster
+#
+
+class Slot(object):
+ def __init__(self, host):
+ self.host = host
+ self.Process = host.Process
+
+#
+# Host
+#
+
+class Host(object):
+ '''
+ Represents a host to use as a node in a cluster.
+
+ `hostname` gives the name of the host. If hostname is not
+ "localhost" then ssh is used to log in to the host. To log in as
+ a different user use a host name of the form
+ "username@somewhere.org"
+
+ `slots` is used to specify the number of slots for processes on
+ the host. This affects how often processes will be allocated to
+ this host. Normally this should be equal to the number of cpus on
+ that host.
+ '''
+ def __init__(self, hostname, slots=None):
+ self.hostname = hostname
+ self.slots = slots
+
+ def _start_manager(self, index, authkey, address, files):
+ if self.hostname != 'localhost':
+ tempdir = copy_to_remote_temporary_directory(self.hostname, files)
+ debug('startup files copied to %s:%s', self.hostname, tempdir)
+ p = subprocess.Popen(
+ ['ssh', self.hostname, 'python', '-c',
+ '"import os; os.chdir(%r); '
+ 'from distributing import main; main()"' % tempdir],
+ stdin=subprocess.PIPE
+ )
+ data = dict(
+ name='BoostrappingHost', index=index,
+ dist_log_level=_logger.getEffectiveLevel(),
+ dir=tempdir, authkey=str(authkey), parent_address=address
+ )
+ pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
+ p.stdin.close()
+
+#
+# Copy files to remote directory, returning name of directory
+#
+
+unzip_code = '''"
+import tempfile, os, sys, tarfile
+tempdir = tempfile.mkdtemp(prefix='distrib-')
+os.chdir(tempdir)
+tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
+for ti in tf:
+ tf.extract(ti)
+print tempdir
+"'''
+
+def copy_to_remote_temporary_directory(host, files):
+ p = subprocess.Popen(
+ ['ssh', host, 'python', '-c', unzip_code],
+ stdout=subprocess.PIPE, stdin=subprocess.PIPE
+ )
+ tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
+ for name in files:
+ tf.add(name, os.path.basename(name))
+ tf.close()
+ p.stdin.close()
+ return p.stdout.read().rstrip()
+
+#
+# Code which runs a host manager
+#
+
+def main():
+ # get data from parent over stdin
+ data = pickle.load(sys.stdin)
+ sys.stdin.close()
+
+ # set some stuff
+ _logger.setLevel(data['dist_log_level'])
+ forking.prepare(data)
+
+ # create server for a `HostManager` object
+ server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
+ current_process()._server = server
+
+ # report server address and number of cpus back to parent
+ conn = connection.Client(data['parent_address'], authkey=data['authkey'])
+ conn.send((data['index'], server.address, slot_count))
+ conn.close()
+
+ # set name etc
+ current_process().set_name('Host-%s:%s' % server.address)
+ util._run_after_forkers()
+
+ # register a cleanup function
+ def cleanup(directory):
+ debug('removing directory %s', directory)
+ shutil.rmtree(directory)
+ debug('shutting down host manager')
+ util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
+
+ # start host manager
+ debug('remote host manager starting in %s', data['dir'])
+ server.serve_forever()
diff --git a/Doc/includes/mp_newtype.py b/Doc/includes/mp_newtype.py
new file mode 100644
index 0000000..b9edc9e
--- /dev/null
+++ b/Doc/includes/mp_newtype.py
@@ -0,0 +1,98 @@
+#
+# This module shows how to use arbitrary callables with a subclass of
+# `BaseManager`.
+#
+
+from multiprocessing import freeze_support
+from multiprocessing.managers import BaseManager, BaseProxy
+import operator
+
+##
+
+class Foo(object):
+ def f(self):
+ print 'you called Foo.f()'
+ def g(self):
+ print 'you called Foo.g()'
+ def _h(self):
+ print 'you called Foo._h()'
+
+# A simple generator function
+def baz():
+ for i in xrange(10):
+ yield i*i
+
+# Proxy type for generator objects
+class GeneratorProxy(BaseProxy):
+ _exposed_ = ('next', '__next__')
+ def __iter__(self):
+ return self
+ def next(self):
+ return self._callmethod('next')
+ def __next__(self):
+ return self._callmethod('__next__')
+
+# Function to return the operator module
+def get_operator_module():
+ return operator
+
+##
+
+class MyManager(BaseManager):
+ pass
+
+# register the Foo class; make `f()` and `g()` accessible via proxy
+MyManager.register('Foo1', Foo)
+
+# register the Foo class; make `g()` and `_h()` accessible via proxy
+MyManager.register('Foo2', Foo, exposed=('g', '_h'))
+
+# register the generator function baz; use `GeneratorProxy` to make proxies
+MyManager.register('baz', baz, proxytype=GeneratorProxy)
+
+# register get_operator_module(); make public functions accessible via proxy
+MyManager.register('operator', get_operator_module)
+
+##
+
+def test():
+ manager = MyManager()
+ manager.start()
+
+ print '-' * 20
+
+ f1 = manager.Foo1()
+ f1.f()
+ f1.g()
+ assert not hasattr(f1, '_h')
+ assert sorted(f1._exposed_) == sorted(['f', 'g'])
+
+ print '-' * 20
+
+ f2 = manager.Foo2()
+ f2.g()
+ f2._h()
+ assert not hasattr(f2, 'f')
+ assert sorted(f2._exposed_) == sorted(['g', '_h'])
+
+ print '-' * 20
+
+ it = manager.baz()
+ for i in it:
+ print '<%d>' % i,
+ print
+
+ print '-' * 20
+
+ op = manager.operator()
+ print 'op.add(23, 45) =', op.add(23, 45)
+ print 'op.pow(2, 94) =', op.pow(2, 94)
+ print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6)
+ print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3)
+ print 'op._exposed_ =', op._exposed_
+
+##
+
+if __name__ == '__main__':
+ freeze_support()
+ test()
diff --git a/Doc/includes/mp_pool.py b/Doc/includes/mp_pool.py
new file mode 100644
index 0000000..b937b86
--- /dev/null
+++ b/Doc/includes/mp_pool.py
@@ -0,0 +1,311 @@
+#
+# A test of `multiprocessing.Pool` class
+#
+
+import multiprocessing
+import time
+import random
+import sys
+
+#
+# Functions used by test code
+#
+
+def calculate(func, args):
+ result = func(*args)
+ return '%s says that %s%s = %s' % (
+ multiprocessing.current_process().get_name(),
+ func.__name__, args, result
+ )
+
+def calculatestar(args):
+ return calculate(*args)
+
+def mul(a, b):
+ time.sleep(0.5*random.random())
+ return a * b
+
+def plus(a, b):
+ time.sleep(0.5*random.random())
+ return a + b
+
+def f(x):
+ return 1.0 / (x-5.0)
+
+def pow3(x):
+ return x**3
+
+def noop(x):
+ pass
+
+#
+# Test code
+#
+
+def test():
+ print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
+
+ #
+ # Create pool
+ #
+
+ PROCESSES = 4
+ print 'Creating pool with %d processes\n' % PROCESSES
+ pool = multiprocessing.Pool(PROCESSES)
+ print 'pool = %s' % pool
+ print
+
+ #
+ # Tests
+ #
+
+ TASKS = [(mul, (i, 7)) for i in range(10)] + \
+ [(plus, (i, 8)) for i in range(10)]
+
+ results = [pool.apply_async(calculate, t) for t in TASKS]
+ imap_it = pool.imap(calculatestar, TASKS)
+ imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
+
+ print 'Ordered results using pool.apply_async():'
+ for r in results:
+ print '\t', r.get()
+ print
+
+ print 'Ordered results using pool.imap():'
+ for x in imap_it:
+ print '\t', x
+ print
+
+ print 'Unordered results using pool.imap_unordered():'
+ for x in imap_unordered_it:
+ print '\t', x
+ print
+
+ print 'Ordered results using pool.map() --- will block till complete:'
+ for x in pool.map(calculatestar, TASKS):
+ print '\t', x
+ print
+
+ #
+ # Simple benchmarks
+ #
+
+ N = 100000
+ print 'def pow3(x): return x**3'
+
+ t = time.time()
+ A = map(pow3, xrange(N))
+ print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
+ (N, time.time() - t)
+
+ t = time.time()
+ B = pool.map(pow3, xrange(N))
+ print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
+ (N, time.time() - t)
+
+ t = time.time()
+ C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
+ print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
+ ' seconds' % (N, N//8, time.time() - t)
+
+ assert A == B == C, (len(A), len(B), len(C))
+ print
+
+ L = [None] * 1000000
+ print 'def noop(x): pass'
+ print 'L = [None] * 1000000'
+
+ t = time.time()
+ A = map(noop, L)
+ print '\tmap(noop, L):\n\t\t%s seconds' % \
+ (time.time() - t)
+
+ t = time.time()
+ B = pool.map(noop, L)
+ print '\tpool.map(noop, L):\n\t\t%s seconds' % \
+ (time.time() - t)
+
+ t = time.time()
+ C = list(pool.imap(noop, L, chunksize=len(L)//8))
+ print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
+ (len(L)//8, time.time() - t)
+
+ assert A == B == C, (len(A), len(B), len(C))
+ print
+
+ del A, B, C, L
+
+ #
+ # Test error handling
+ #
+
+ print 'Testing error handling:'
+
+ try:
+ print pool.apply(f, (5,))
+ except ZeroDivisionError:
+ print '\tGot ZeroDivisionError as expected from pool.apply()'
+ else:
+ raise AssertionError, 'expected ZeroDivisionError'
+
+ try:
+ print pool.map(f, range(10))
+ except ZeroDivisionError:
+ print '\tGot ZeroDivisionError as expected from pool.map()'
+ else:
+ raise AssertionError, 'expected ZeroDivisionError'
+
+ try:
+ print list(pool.imap(f, range(10)))
+ except ZeroDivisionError:
+ print '\tGot ZeroDivisionError as expected from list(pool.imap())'
+ else:
+ raise AssertionError, 'expected ZeroDivisionError'
+
+ it = pool.imap(f, range(10))
+ for i in range(10):
+ try:
+ x = it.next()
+ except ZeroDivisionError:
+ if i == 5:
+ pass
+ except StopIteration:
+ break
+ else:
+ if i == 5:
+ raise AssertionError, 'expected ZeroDivisionError'
+
+ assert i == 9
+ print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
+ print
+
+ #
+ # Testing timeouts
+ #
+
+ print 'Testing ApplyResult.get() with timeout:',
+ res = pool.apply_async(calculate, TASKS[0])
+ while 1:
+ sys.stdout.flush()
+ try:
+ sys.stdout.write('\n\t%s' % res.get(0.02))
+ break
+ except multiprocessing.TimeoutError:
+ sys.stdout.write('.')
+ print
+ print
+
+ print 'Testing IMapIterator.next() with timeout:',
+ it = pool.imap(calculatestar, TASKS)
+ while 1:
+ sys.stdout.flush()
+ try:
+ sys.stdout.write('\n\t%s' % it.next(0.02))
+ except StopIteration:
+ break
+ except multiprocessing.TimeoutError:
+ sys.stdout.write('.')
+ print
+ print
+
+ #
+ # Testing callback
+ #
+
+ print 'Testing callback:'
+
+ A = []
+ B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
+
+ r = pool.apply_async(mul, (7, 8), callback=A.append)
+ r.wait()
+
+ r = pool.map_async(pow3, range(10), callback=A.extend)
+ r.wait()
+
+ if A == B:
+ print '\tcallbacks succeeded\n'
+ else:
+ print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
+
+ #
+ # Check there are no outstanding tasks
+ #
+
+ assert not pool._cache, 'cache = %r' % pool._cache
+
+ #
+ # Check close() methods
+ #
+
+ print 'Testing close():'
+
+ for worker in pool._pool:
+ assert worker.is_alive()
+
+ result = pool.apply_async(time.sleep, [0.5])
+ pool.close()
+ pool.join()
+
+ assert result.get() is None
+
+ for worker in pool._pool:
+ assert not worker.is_alive()
+
+ print '\tclose() succeeded\n'
+
+ #
+ # Check terminate() method
+ #
+
+ print 'Testing terminate():'
+
+ pool = multiprocessing.Pool(2)
+ DELTA = 0.1
+ ignore = pool.apply(pow3, [2])
+ results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
+ pool.terminate()
+ pool.join()
+
+ for worker in pool._pool:
+ assert not worker.is_alive()
+
+ print '\tterminate() succeeded\n'
+
+ #
+ # Check garbage collection
+ #
+
+ print 'Testing garbage collection:'
+
+ pool = multiprocessing.Pool(2)
+ DELTA = 0.1
+ processes = pool._pool
+ ignore = pool.apply(pow3, [2])
+ results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
+
+ results = pool = None
+
+ time.sleep(DELTA * 2)
+
+ for worker in processes:
+ assert not worker.is_alive()
+
+ print '\tgarbage collection succeeded\n'
+
+
+if __name__ == '__main__':
+ multiprocessing.freeze_support()
+
+ assert len(sys.argv) in (1, 2)
+
+ if len(sys.argv) == 1 or sys.argv[1] == 'processes':
+ print ' Using processes '.center(79, '-')
+ elif sys.argv[1] == 'threads':
+ print ' Using threads '.center(79, '-')
+ import multiprocessing.dummy as multiprocessing
+ else:
+ print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
+ raise SystemExit(2)
+
+ test()
diff --git a/Doc/includes/mp_synchronize.py b/Doc/includes/mp_synchronize.py
new file mode 100644
index 0000000..8cf11bd
--- /dev/null
+++ b/Doc/includes/mp_synchronize.py
@@ -0,0 +1,273 @@
+#
+# A test file for the `multiprocessing` package
+#
+
+import time, sys, random
+from Queue import Empty
+
+import multiprocessing # may get overwritten
+
+
+#### TEST_VALUE
+
+def value_func(running, mutex):
+ random.seed()
+ time.sleep(random.random()*4)
+
+ mutex.acquire()
+ print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
+ running.value -= 1
+ mutex.release()
+
+def test_value():
+ TASKS = 10
+ running = multiprocessing.Value('i', TASKS)
+ mutex = multiprocessing.Lock()
+
+ for i in range(TASKS):
+ p = multiprocessing.Process(target=value_func, args=(running, mutex))
+ p.start()
+
+ while running.value > 0:
+ time.sleep(0.08)
+ mutex.acquire()
+ print running.value,
+ sys.stdout.flush()
+ mutex.release()
+
+ print
+ print 'No more running processes'
+
+
+#### TEST_QUEUE
+
+def queue_func(queue):
+ for i in range(30):
+ time.sleep(0.5 * random.random())
+ queue.put(i*i)
+ queue.put('STOP')
+
+def test_queue():
+ q = multiprocessing.Queue()
+
+ p = multiprocessing.Process(target=queue_func, args=(q,))
+ p.start()
+
+ o = None
+ while o != 'STOP':
+ try:
+ o = q.get(timeout=0.3)
+ print o,
+ sys.stdout.flush()
+ except Empty:
+ print 'TIMEOUT'
+
+ print
+
+
+#### TEST_CONDITION
+
+def condition_func(cond):
+ cond.acquire()
+ print '\t' + str(cond)
+ time.sleep(2)
+ print '\tchild is notifying'
+ print '\t' + str(cond)
+ cond.notify()
+ cond.release()
+
+def test_condition():
+ cond = multiprocessing.Condition()
+
+ p = multiprocessing.Process(target=condition_func, args=(cond,))
+ print cond
+
+ cond.acquire()
+ print cond
+ cond.acquire()
+ print cond
+
+ p.start()
+
+ print 'main is waiting'
+ cond.wait()
+ print 'main has woken up'
+
+ print cond
+ cond.release()
+ print cond
+ cond.release()
+
+ p.join()
+ print cond
+
+
+#### TEST_SEMAPHORE
+
+def semaphore_func(sema, mutex, running):
+ sema.acquire()
+
+ mutex.acquire()
+ running.value += 1
+ print running.value, 'tasks are running'
+ mutex.release()
+
+ random.seed()
+ time.sleep(random.random()*2)
+
+ mutex.acquire()
+ running.value -= 1
+ print '%s has finished' % multiprocessing.current_process()
+ mutex.release()
+
+ sema.release()
+
+def test_semaphore():
+ sema = multiprocessing.Semaphore(3)
+ mutex = multiprocessing.RLock()
+ running = multiprocessing.Value('i', 0)
+
+ processes = [
+ multiprocessing.Process(target=semaphore_func,
+ args=(sema, mutex, running))
+ for i in range(10)
+ ]
+
+ for p in processes:
+ p.start()
+
+ for p in processes:
+ p.join()
+
+
+#### TEST_JOIN_TIMEOUT
+
+def join_timeout_func():
+ print '\tchild sleeping'
+ time.sleep(5.5)
+ print '\n\tchild terminating'
+
+def test_join_timeout():
+ p = multiprocessing.Process(target=join_timeout_func)
+ p.start()
+
+ print 'waiting for process to finish'
+
+ while 1:
+ p.join(timeout=1)
+ if not p.is_alive():
+ break
+ print '.',
+ sys.stdout.flush()
+
+
+#### TEST_EVENT
+
+def event_func(event):
+ print '\t%r is waiting' % multiprocessing.current_process()
+ event.wait()
+ print '\t%r has woken up' % multiprocessing.current_process()
+
+def test_event():
+ event = multiprocessing.Event()
+
+ processes = [multiprocessing.Process(target=event_func, args=(event,))
+ for i in range(5)]
+
+ for p in processes:
+ p.start()
+
+ print 'main is sleeping'
+ time.sleep(2)
+
+ print 'main is setting event'
+ event.set()
+
+ for p in processes:
+ p.join()
+
+
+#### TEST_SHAREDVALUES
+
+def sharedvalues_func(values, arrays, shared_values, shared_arrays):
+ for i in range(len(values)):
+ v = values[i][1]
+ sv = shared_values[i].value
+ assert v == sv
+
+ for i in range(len(values)):
+ a = arrays[i][1]
+ sa = list(shared_arrays[i][:])
+ assert a == sa
+
+ print 'Tests passed'
+
+def test_sharedvalues():
+ values = [
+ ('i', 10),
+ ('h', -2),
+ ('d', 1.25)
+ ]
+ arrays = [
+ ('i', range(100)),
+ ('d', [0.25 * i for i in range(100)]),
+ ('H', range(1000))
+ ]
+
+ shared_values = [multiprocessing.Value(id, v) for id, v in values]
+ shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
+
+ p = multiprocessing.Process(
+ target=sharedvalues_func,
+ args=(values, arrays, shared_values, shared_arrays)
+ )
+ p.start()
+ p.join()
+
+ assert p.get_exitcode() == 0
+
+
+####
+
+def test(namespace=multiprocessing):
+ global multiprocessing
+
+ multiprocessing = namespace
+
+ for func in [ test_value, test_queue, test_condition,
+ test_semaphore, test_join_timeout, test_event,
+ test_sharedvalues ]:
+
+ print '\n\t######## %s\n' % func.__name__
+ func()
+
+ ignore = multiprocessing.active_children() # cleanup any old processes
+ if hasattr(multiprocessing, '_debug_info'):
+ info = multiprocessing._debug_info()
+ if info:
+ print info
+ raise ValueError, 'there should be no positive refcounts left'
+
+
+if __name__ == '__main__':
+ multiprocessing.freeze_support()
+
+ assert len(sys.argv) in (1, 2)
+
+ if len(sys.argv) == 1 or sys.argv[1] == 'processes':
+ print ' Using processes '.center(79, '-')
+ namespace = multiprocessing
+ elif sys.argv[1] == 'manager':
+ print ' Using processes and a manager '.center(79, '-')
+ namespace = multiprocessing.Manager()
+ namespace.Process = multiprocessing.Process
+ namespace.current_process = multiprocessing.current_process
+ namespace.active_children = multiprocessing.active_children
+ elif sys.argv[1] == 'threads':
+ print ' Using threads '.center(79, '-')
+ import multiprocessing.dummy as namespace
+ else:
+ print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
+ raise SystemExit, 2
+
+ test(namespace)
diff --git a/Doc/includes/mp_webserver.py b/Doc/includes/mp_webserver.py
new file mode 100644
index 0000000..15d2b6b
--- /dev/null
+++ b/Doc/includes/mp_webserver.py
@@ -0,0 +1,67 @@
+#
+# Example where a pool of http servers share a single listening socket
+#
+# On Windows this module depends on the ability to pickle a socket
+# object so that the worker processes can inherit a copy of the server
+# object. (We import `multiprocessing.reduction` to enable this pickling.)
+#
+# Not sure if we should synchronize access to `socket.accept()` method by
+# using a process-shared lock -- does not seem to be necessary.
+#
+
+import os
+import sys
+
+from multiprocessing import Process, current_process, freeze_support
+from BaseHTTPServer import HTTPServer
+from SimpleHTTPServer import SimpleHTTPRequestHandler
+
+if sys.platform == 'win32':
+ import multiprocessing.reduction # make sockets pickable/inheritable
+
+
+def note(format, *args):
+ sys.stderr.write('[%s]\t%s\n' % (current_process().get_name(),format%args))
+
+
+class RequestHandler(SimpleHTTPRequestHandler):
+ # we override log_message() to show which process is handling the request
+ def log_message(self, format, *args):
+ note(format, *args)
+
+def serve_forever(server):
+ note('starting server')
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ pass
+
+
+def runpool(address, number_of_processes):
+ # create a single server object -- children will each inherit a copy
+ server = HTTPServer(address, RequestHandler)
+
+ # create child processes to act as workers
+ for i in range(number_of_processes-1):
+ Process(target=serve_forever, args=(server,)).start()
+
+ # main process also acts as a worker
+ serve_forever(server)
+
+
+def test():
+ DIR = os.path.join(os.path.dirname(__file__), '..')
+ ADDRESS = ('localhost', 8000)
+ NUMBER_OF_PROCESSES = 4
+
+ print 'Serving at http://%s:%d using %d worker processes' % \
+ (ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
+ print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']
+
+ os.chdir(DIR)
+ runpool(ADDRESS, NUMBER_OF_PROCESSES)
+
+
+if __name__ == '__main__':
+ freeze_support()
+ test()
diff --git a/Doc/includes/mp_workers.py b/Doc/includes/mp_workers.py
new file mode 100644
index 0000000..795e6cb
--- /dev/null
+++ b/Doc/includes/mp_workers.py
@@ -0,0 +1,87 @@
+#
+# Simple example which uses a pool of workers to carry out some tasks.
+#
+# Notice that the results will probably not come out of the output
+# queue in the same in the same order as the corresponding tasks were
+# put on the input queue. If it is important to get the results back
+# in the original order then consider using `Pool.map()` or
+# `Pool.imap()` (which will save on the amount of code needed anyway).
+#
+
+import time
+import random
+
+from multiprocessing import Process, Queue, current_process, freeze_support
+
+#
+# Function run by worker processes
+#
+
+def worker(input, output):
+ for func, args in iter(input.get, 'STOP'):
+ result = calculate(func, args)
+ output.put(result)
+
+#
+# Function used to calculate result
+#
+
+def calculate(func, args):
+ result = func(*args)
+ return '%s says that %s%s = %s' % \
+ (current_process().get_name(), func.__name__, args, result)
+
+#
+# Functions referenced by tasks
+#
+
+def mul(a, b):
+ time.sleep(0.5*random.random())
+ return a * b
+
+def plus(a, b):
+ time.sleep(0.5*random.random())
+ return a + b
+
+#
+#
+#
+
+def test():
+ NUMBER_OF_PROCESSES = 4
+ TASKS1 = [(mul, (i, 7)) for i in range(20)]
+ TASKS2 = [(plus, (i, 8)) for i in range(10)]
+
+ # Create queues
+ task_queue = Queue()
+ done_queue = Queue()
+
+ # Submit tasks
+ for task in TASKS1:
+ task_queue.put(task)
+
+ # Start worker processes
+ for i in range(NUMBER_OF_PROCESSES):
+ Process(target=worker, args=(task_queue, done_queue)).start()
+
+ # Get and print results
+ print 'Unordered results:'
+ for i in range(len(TASKS1)):
+ print '\t', done_queue.get()
+
+ # Add more tasks using `put()`
+ for task in TASKS2:
+ task_queue.put(task)
+
+ # Get and print some more results
+ for i in range(len(TASKS2)):
+ print '\t', done_queue.get()
+
+ # Tell child processes to stop
+ for i in range(NUMBER_OF_PROCESSES):
+ task_queue.put('STOP')
+
+
+if __name__ == '__main__':
+ freeze_support()
+ test()