diff options
author | Benjamin Peterson <benjamin@python.org> | 2008-06-11 16:44:04 (GMT) |
---|---|---|
committer | Benjamin Peterson <benjamin@python.org> | 2008-06-11 16:44:04 (GMT) |
commit | e711cafab13efc9c1fe6c5cd75826401445eb585 (patch) | |
tree | 091a6334fdf6ccdcb93027302c5e038570ca04a4 /Doc/includes | |
parent | eec3d7137929611b98dd593cd2f122cd91b723b2 (diff) | |
download | cpython-e711cafab13efc9c1fe6c5cd75826401445eb585.zip cpython-e711cafab13efc9c1fe6c5cd75826401445eb585.tar.gz cpython-e711cafab13efc9c1fe6c5cd75826401445eb585.tar.bz2 |
Merged revisions 64104,64117 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r64104 | benjamin.peterson | 2008-06-10 21:40:25 -0500 (Tue, 10 Jun 2008) | 2 lines
add the multiprocessing package to fulfill PEP 371
........
r64117 | benjamin.peterson | 2008-06-11 07:26:31 -0500 (Wed, 11 Jun 2008) | 2 lines
fix import of multiprocessing by juggling imports
........
Diffstat (limited to 'Doc/includes')
-rw-r--r-- | Doc/includes/mp_benchmarks.py | 235 | ||||
-rw-r--r-- | Doc/includes/mp_distributing.py | 362 | ||||
-rw-r--r-- | Doc/includes/mp_newtype.py | 98 | ||||
-rw-r--r-- | Doc/includes/mp_pool.py | 311 | ||||
-rw-r--r-- | Doc/includes/mp_synchronize.py | 273 | ||||
-rw-r--r-- | Doc/includes/mp_webserver.py | 67 | ||||
-rw-r--r-- | Doc/includes/mp_workers.py | 87 |
7 files changed, 1433 insertions, 0 deletions
diff --git a/Doc/includes/mp_benchmarks.py b/Doc/includes/mp_benchmarks.py new file mode 100644 index 0000000..425d6de --- /dev/null +++ b/Doc/includes/mp_benchmarks.py @@ -0,0 +1,235 @@ +# +# Simple benchmarks for the multiprocessing package +# + +import time, sys, multiprocessing, threading, Queue, gc + +if sys.platform == 'win32': + _timer = time.clock +else: + _timer = time.time + +delta = 1 + + +#### TEST_QUEUESPEED + +def queuespeed_func(q, c, iterations): + a = '0' * 256 + c.acquire() + c.notify() + c.release() + + for i in xrange(iterations): + q.put(a) + + q.put('STOP') + +def test_queuespeed(Process, q, c): + elapsed = 0 + iterations = 1 + + while elapsed < delta: + iterations *= 2 + + p = Process(target=queuespeed_func, args=(q, c, iterations)) + c.acquire() + p.start() + c.wait() + c.release() + + result = None + t = _timer() + + while result != 'STOP': + result = q.get() + + elapsed = _timer() - t + + p.join() + + print iterations, 'objects passed through the queue in', elapsed, 'seconds' + print 'average number/sec:', iterations/elapsed + + +#### TEST_PIPESPEED + +def pipe_func(c, cond, iterations): + a = '0' * 256 + cond.acquire() + cond.notify() + cond.release() + + for i in xrange(iterations): + c.send(a) + + c.send('STOP') + +def test_pipespeed(): + c, d = multiprocessing.Pipe() + cond = multiprocessing.Condition() + elapsed = 0 + iterations = 1 + + while elapsed < delta: + iterations *= 2 + + p = multiprocessing.Process(target=pipe_func, + args=(d, cond, iterations)) + cond.acquire() + p.start() + cond.wait() + cond.release() + + result = None + t = _timer() + + while result != 'STOP': + result = c.recv() + + elapsed = _timer() - t + p.join() + + print iterations, 'objects passed through connection in',elapsed,'seconds' + print 'average number/sec:', iterations/elapsed + + +#### TEST_SEQSPEED + +def test_seqspeed(seq): + elapsed = 0 + iterations = 1 + + while elapsed < delta: + iterations *= 2 + + t = _timer() + + for i in xrange(iterations): + a = seq[5] + + elapsed = _timer()-t + + print iterations, 'iterations in', elapsed, 'seconds' + print 'average number/sec:', iterations/elapsed + + +#### TEST_LOCK + +def test_lockspeed(l): + elapsed = 0 + iterations = 1 + + while elapsed < delta: + iterations *= 2 + + t = _timer() + + for i in xrange(iterations): + l.acquire() + l.release() + + elapsed = _timer()-t + + print iterations, 'iterations in', elapsed, 'seconds' + print 'average number/sec:', iterations/elapsed + + +#### TEST_CONDITION + +def conditionspeed_func(c, N): + c.acquire() + c.notify() + + for i in xrange(N): + c.wait() + c.notify() + + c.release() + +def test_conditionspeed(Process, c): + elapsed = 0 + iterations = 1 + + while elapsed < delta: + iterations *= 2 + + c.acquire() + p = Process(target=conditionspeed_func, args=(c, iterations)) + p.start() + + c.wait() + + t = _timer() + + for i in xrange(iterations): + c.notify() + c.wait() + + elapsed = _timer()-t + + c.release() + p.join() + + print iterations * 2, 'waits in', elapsed, 'seconds' + print 'average number/sec:', iterations * 2 / elapsed + +#### + +def test(): + manager = multiprocessing.Manager() + + gc.disable() + + print '\n\t######## testing Queue.Queue\n' + test_queuespeed(threading.Thread, Queue.Queue(), + threading.Condition()) + print '\n\t######## testing multiprocessing.Queue\n' + test_queuespeed(multiprocessing.Process, multiprocessing.Queue(), + multiprocessing.Condition()) + print '\n\t######## testing Queue managed by server process\n' + test_queuespeed(multiprocessing.Process, manager.Queue(), + manager.Condition()) + print '\n\t######## testing multiprocessing.Pipe\n' + test_pipespeed() + + print + + print '\n\t######## testing list\n' + test_seqspeed(range(10)) + print '\n\t######## testing list managed by server process\n' + test_seqspeed(manager.list(range(10))) + print '\n\t######## testing Array("i", ..., lock=False)\n' + test_seqspeed(multiprocessing.Array('i', range(10), lock=False)) + print '\n\t######## testing Array("i", ..., lock=True)\n' + test_seqspeed(multiprocessing.Array('i', range(10), lock=True)) + + print + + print '\n\t######## testing threading.Lock\n' + test_lockspeed(threading.Lock()) + print '\n\t######## testing threading.RLock\n' + test_lockspeed(threading.RLock()) + print '\n\t######## testing multiprocessing.Lock\n' + test_lockspeed(multiprocessing.Lock()) + print '\n\t######## testing multiprocessing.RLock\n' + test_lockspeed(multiprocessing.RLock()) + print '\n\t######## testing lock managed by server process\n' + test_lockspeed(manager.Lock()) + print '\n\t######## testing rlock managed by server process\n' + test_lockspeed(manager.RLock()) + + print + + print '\n\t######## testing threading.Condition\n' + test_conditionspeed(threading.Thread, threading.Condition()) + print '\n\t######## testing multiprocessing.Condition\n' + test_conditionspeed(multiprocessing.Process, multiprocessing.Condition()) + print '\n\t######## testing condition managed by a server process\n' + test_conditionspeed(multiprocessing.Process, manager.Condition()) + + gc.enable() + +if __name__ == '__main__': + multiprocessing.freeze_support() + test() diff --git a/Doc/includes/mp_distributing.py b/Doc/includes/mp_distributing.py new file mode 100644 index 0000000..24ae8f8 --- /dev/null +++ b/Doc/includes/mp_distributing.py @@ -0,0 +1,362 @@ +# +# Module to allow spawning of processes on foreign host +# +# Depends on `multiprocessing` package -- tested with `processing-0.60` +# + +__all__ = ['Cluster', 'Host', 'get_logger', 'current_process'] + +# +# Imports +# + +import sys +import os +import tarfile +import shutil +import subprocess +import logging +import itertools +import Queue + +try: + import cPickle as pickle +except ImportError: + import pickle + +from multiprocessing import Process, current_process, cpu_count +from multiprocessing import util, managers, connection, forking, pool + +# +# Logging +# + +def get_logger(): + return _logger + +_logger = logging.getLogger('distributing') +_logger.propogate = 0 + +util.fix_up_logger(_logger) +_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT) +_handler = logging.StreamHandler() +_handler.setFormatter(_formatter) +_logger.addHandler(_handler) + +info = _logger.info +debug = _logger.debug + +# +# Get number of cpus +# + +try: + slot_count = cpu_count() +except NotImplemented: + slot_count = 1 + +# +# Manager type which spawns subprocesses +# + +class HostManager(managers.SyncManager): + ''' + Manager type used for spawning processes on a (presumably) foreign host + ''' + def __init__(self, address, authkey): + managers.SyncManager.__init__(self, address, authkey) + self._name = 'Host-unknown' + + def Process(self, group=None, target=None, name=None, args=(), kwargs={}): + if hasattr(sys.modules['__main__'], '__file__'): + main_path = os.path.basename(sys.modules['__main__'].__file__) + else: + main_path = None + data = pickle.dumps((target, args, kwargs)) + p = self._RemoteProcess(data, main_path) + if name is None: + temp = self._name.split('Host-')[-1] + '/Process-%s' + name = temp % ':'.join(map(str, p.get_identity())) + p.set_name(name) + return p + + @classmethod + def from_address(cls, address, authkey): + manager = cls(address, authkey) + managers.transact(address, authkey, 'dummy') + manager._state.value = managers.State.STARTED + manager._name = 'Host-%s:%s' % manager.address + manager.shutdown = util.Finalize( + manager, HostManager._finalize_host, + args=(manager._address, manager._authkey, manager._name), + exitpriority=-10 + ) + return manager + + @staticmethod + def _finalize_host(address, authkey, name): + managers.transact(address, authkey, 'shutdown') + + def __repr__(self): + return '<Host(%s)>' % self._name + +# +# Process subclass representing a process on (possibly) a remote machine +# + +class RemoteProcess(Process): + ''' + Represents a process started on a remote host + ''' + def __init__(self, data, main_path): + assert not main_path or os.path.basename(main_path) == main_path + Process.__init__(self) + self._data = data + self._main_path = main_path + + def _bootstrap(self): + forking.prepare({'main_path': self._main_path}) + self._target, self._args, self._kwargs = pickle.loads(self._data) + return Process._bootstrap(self) + + def get_identity(self): + return self._identity + +HostManager.register('_RemoteProcess', RemoteProcess) + +# +# A Pool class that uses a cluster +# + +class DistributedPool(pool.Pool): + + def __init__(self, cluster, processes=None, initializer=None, initargs=()): + self._cluster = cluster + self.Process = cluster.Process + pool.Pool.__init__(self, processes or len(cluster), + initializer, initargs) + + def _setup_queues(self): + self._inqueue = self._cluster._SettableQueue() + self._outqueue = self._cluster._SettableQueue() + self._quick_put = self._inqueue.put + self._quick_get = self._outqueue.get + + @staticmethod + def _help_stuff_finish(inqueue, task_handler, size): + inqueue.set_contents([None] * size) + +# +# Manager type which starts host managers on other machines +# + +def LocalProcess(**kwds): + p = Process(**kwds) + p.set_name('localhost/' + p.get_name()) + return p + +class Cluster(managers.SyncManager): + ''' + Represents collection of slots running on various hosts. + + `Cluster` is a subclass of `SyncManager` so it allows creation of + various types of shared objects. + ''' + def __init__(self, hostlist, modules): + managers.SyncManager.__init__(self, address=('localhost', 0)) + self._hostlist = hostlist + self._modules = modules + if __name__ not in modules: + modules.append(__name__) + files = [sys.modules[name].__file__ for name in modules] + for i, file in enumerate(files): + if file.endswith('.pyc') or file.endswith('.pyo'): + files[i] = file[:-4] + '.py' + self._files = [os.path.abspath(file) for file in files] + + def start(self): + managers.SyncManager.start(self) + + l = connection.Listener(family='AF_INET', authkey=self._authkey) + + for i, host in enumerate(self._hostlist): + host._start_manager(i, self._authkey, l.address, self._files) + + for host in self._hostlist: + if host.hostname != 'localhost': + conn = l.accept() + i, address, cpus = conn.recv() + conn.close() + other_host = self._hostlist[i] + other_host.manager = HostManager.from_address(address, + self._authkey) + other_host.slots = other_host.slots or cpus + other_host.Process = other_host.manager.Process + else: + host.slots = host.slots or slot_count + host.Process = LocalProcess + + self._slotlist = [ + Slot(host) for host in self._hostlist for i in range(host.slots) + ] + self._slot_iterator = itertools.cycle(self._slotlist) + self._base_shutdown = self.shutdown + del self.shutdown + + def shutdown(self): + for host in self._hostlist: + if host.hostname != 'localhost': + host.manager.shutdown() + self._base_shutdown() + + def Process(self, group=None, target=None, name=None, args=(), kwargs={}): + slot = self._slot_iterator.next() + return slot.Process( + group=group, target=target, name=name, args=args, kwargs=kwargs + ) + + def Pool(self, processes=None, initializer=None, initargs=()): + return DistributedPool(self, processes, initializer, initargs) + + def __getitem__(self, i): + return self._slotlist[i] + + def __len__(self): + return len(self._slotlist) + + def __iter__(self): + return iter(self._slotlist) + +# +# Queue subclass used by distributed pool +# + +class SettableQueue(Queue.Queue): + def empty(self): + return not self.queue + def full(self): + return self.maxsize > 0 and len(self.queue) == self.maxsize + def set_contents(self, contents): + # length of contents must be at least as large as the number of + # threads which have potentially called get() + self.not_empty.acquire() + try: + self.queue.clear() + self.queue.extend(contents) + self.not_empty.notifyAll() + finally: + self.not_empty.release() + +Cluster.register('_SettableQueue', SettableQueue) + +# +# Class representing a notional cpu in the cluster +# + +class Slot(object): + def __init__(self, host): + self.host = host + self.Process = host.Process + +# +# Host +# + +class Host(object): + ''' + Represents a host to use as a node in a cluster. + + `hostname` gives the name of the host. If hostname is not + "localhost" then ssh is used to log in to the host. To log in as + a different user use a host name of the form + "username@somewhere.org" + + `slots` is used to specify the number of slots for processes on + the host. This affects how often processes will be allocated to + this host. Normally this should be equal to the number of cpus on + that host. + ''' + def __init__(self, hostname, slots=None): + self.hostname = hostname + self.slots = slots + + def _start_manager(self, index, authkey, address, files): + if self.hostname != 'localhost': + tempdir = copy_to_remote_temporary_directory(self.hostname, files) + debug('startup files copied to %s:%s', self.hostname, tempdir) + p = subprocess.Popen( + ['ssh', self.hostname, 'python', '-c', + '"import os; os.chdir(%r); ' + 'from distributing import main; main()"' % tempdir], + stdin=subprocess.PIPE + ) + data = dict( + name='BoostrappingHost', index=index, + dist_log_level=_logger.getEffectiveLevel(), + dir=tempdir, authkey=str(authkey), parent_address=address + ) + pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL) + p.stdin.close() + +# +# Copy files to remote directory, returning name of directory +# + +unzip_code = '''" +import tempfile, os, sys, tarfile +tempdir = tempfile.mkdtemp(prefix='distrib-') +os.chdir(tempdir) +tf = tarfile.open(fileobj=sys.stdin, mode='r|gz') +for ti in tf: + tf.extract(ti) +print tempdir +"''' + +def copy_to_remote_temporary_directory(host, files): + p = subprocess.Popen( + ['ssh', host, 'python', '-c', unzip_code], + stdout=subprocess.PIPE, stdin=subprocess.PIPE + ) + tf = tarfile.open(fileobj=p.stdin, mode='w|gz') + for name in files: + tf.add(name, os.path.basename(name)) + tf.close() + p.stdin.close() + return p.stdout.read().rstrip() + +# +# Code which runs a host manager +# + +def main(): + # get data from parent over stdin + data = pickle.load(sys.stdin) + sys.stdin.close() + + # set some stuff + _logger.setLevel(data['dist_log_level']) + forking.prepare(data) + + # create server for a `HostManager` object + server = managers.Server(HostManager._registry, ('', 0), data['authkey']) + current_process()._server = server + + # report server address and number of cpus back to parent + conn = connection.Client(data['parent_address'], authkey=data['authkey']) + conn.send((data['index'], server.address, slot_count)) + conn.close() + + # set name etc + current_process().set_name('Host-%s:%s' % server.address) + util._run_after_forkers() + + # register a cleanup function + def cleanup(directory): + debug('removing directory %s', directory) + shutil.rmtree(directory) + debug('shutting down host manager') + util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0) + + # start host manager + debug('remote host manager starting in %s', data['dir']) + server.serve_forever() diff --git a/Doc/includes/mp_newtype.py b/Doc/includes/mp_newtype.py new file mode 100644 index 0000000..b9edc9e --- /dev/null +++ b/Doc/includes/mp_newtype.py @@ -0,0 +1,98 @@ +# +# This module shows how to use arbitrary callables with a subclass of +# `BaseManager`. +# + +from multiprocessing import freeze_support +from multiprocessing.managers import BaseManager, BaseProxy +import operator + +## + +class Foo(object): + def f(self): + print 'you called Foo.f()' + def g(self): + print 'you called Foo.g()' + def _h(self): + print 'you called Foo._h()' + +# A simple generator function +def baz(): + for i in xrange(10): + yield i*i + +# Proxy type for generator objects +class GeneratorProxy(BaseProxy): + _exposed_ = ('next', '__next__') + def __iter__(self): + return self + def next(self): + return self._callmethod('next') + def __next__(self): + return self._callmethod('__next__') + +# Function to return the operator module +def get_operator_module(): + return operator + +## + +class MyManager(BaseManager): + pass + +# register the Foo class; make `f()` and `g()` accessible via proxy +MyManager.register('Foo1', Foo) + +# register the Foo class; make `g()` and `_h()` accessible via proxy +MyManager.register('Foo2', Foo, exposed=('g', '_h')) + +# register the generator function baz; use `GeneratorProxy` to make proxies +MyManager.register('baz', baz, proxytype=GeneratorProxy) + +# register get_operator_module(); make public functions accessible via proxy +MyManager.register('operator', get_operator_module) + +## + +def test(): + manager = MyManager() + manager.start() + + print '-' * 20 + + f1 = manager.Foo1() + f1.f() + f1.g() + assert not hasattr(f1, '_h') + assert sorted(f1._exposed_) == sorted(['f', 'g']) + + print '-' * 20 + + f2 = manager.Foo2() + f2.g() + f2._h() + assert not hasattr(f2, 'f') + assert sorted(f2._exposed_) == sorted(['g', '_h']) + + print '-' * 20 + + it = manager.baz() + for i in it: + print '<%d>' % i, + print + + print '-' * 20 + + op = manager.operator() + print 'op.add(23, 45) =', op.add(23, 45) + print 'op.pow(2, 94) =', op.pow(2, 94) + print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6) + print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3) + print 'op._exposed_ =', op._exposed_ + +## + +if __name__ == '__main__': + freeze_support() + test() diff --git a/Doc/includes/mp_pool.py b/Doc/includes/mp_pool.py new file mode 100644 index 0000000..b937b86 --- /dev/null +++ b/Doc/includes/mp_pool.py @@ -0,0 +1,311 @@ +# +# A test of `multiprocessing.Pool` class +# + +import multiprocessing +import time +import random +import sys + +# +# Functions used by test code +# + +def calculate(func, args): + result = func(*args) + return '%s says that %s%s = %s' % ( + multiprocessing.current_process().get_name(), + func.__name__, args, result + ) + +def calculatestar(args): + return calculate(*args) + +def mul(a, b): + time.sleep(0.5*random.random()) + return a * b + +def plus(a, b): + time.sleep(0.5*random.random()) + return a + b + +def f(x): + return 1.0 / (x-5.0) + +def pow3(x): + return x**3 + +def noop(x): + pass + +# +# Test code +# + +def test(): + print 'cpu_count() = %d\n' % multiprocessing.cpu_count() + + # + # Create pool + # + + PROCESSES = 4 + print 'Creating pool with %d processes\n' % PROCESSES + pool = multiprocessing.Pool(PROCESSES) + print 'pool = %s' % pool + print + + # + # Tests + # + + TASKS = [(mul, (i, 7)) for i in range(10)] + \ + [(plus, (i, 8)) for i in range(10)] + + results = [pool.apply_async(calculate, t) for t in TASKS] + imap_it = pool.imap(calculatestar, TASKS) + imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) + + print 'Ordered results using pool.apply_async():' + for r in results: + print '\t', r.get() + print + + print 'Ordered results using pool.imap():' + for x in imap_it: + print '\t', x + print + + print 'Unordered results using pool.imap_unordered():' + for x in imap_unordered_it: + print '\t', x + print + + print 'Ordered results using pool.map() --- will block till complete:' + for x in pool.map(calculatestar, TASKS): + print '\t', x + print + + # + # Simple benchmarks + # + + N = 100000 + print 'def pow3(x): return x**3' + + t = time.time() + A = map(pow3, xrange(N)) + print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \ + (N, time.time() - t) + + t = time.time() + B = pool.map(pow3, xrange(N)) + print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \ + (N, time.time() - t) + + t = time.time() + C = list(pool.imap(pow3, xrange(N), chunksize=N//8)) + print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \ + ' seconds' % (N, N//8, time.time() - t) + + assert A == B == C, (len(A), len(B), len(C)) + print + + L = [None] * 1000000 + print 'def noop(x): pass' + print 'L = [None] * 1000000' + + t = time.time() + A = map(noop, L) + print '\tmap(noop, L):\n\t\t%s seconds' % \ + (time.time() - t) + + t = time.time() + B = pool.map(noop, L) + print '\tpool.map(noop, L):\n\t\t%s seconds' % \ + (time.time() - t) + + t = time.time() + C = list(pool.imap(noop, L, chunksize=len(L)//8)) + print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \ + (len(L)//8, time.time() - t) + + assert A == B == C, (len(A), len(B), len(C)) + print + + del A, B, C, L + + # + # Test error handling + # + + print 'Testing error handling:' + + try: + print pool.apply(f, (5,)) + except ZeroDivisionError: + print '\tGot ZeroDivisionError as expected from pool.apply()' + else: + raise AssertionError, 'expected ZeroDivisionError' + + try: + print pool.map(f, range(10)) + except ZeroDivisionError: + print '\tGot ZeroDivisionError as expected from pool.map()' + else: + raise AssertionError, 'expected ZeroDivisionError' + + try: + print list(pool.imap(f, range(10))) + except ZeroDivisionError: + print '\tGot ZeroDivisionError as expected from list(pool.imap())' + else: + raise AssertionError, 'expected ZeroDivisionError' + + it = pool.imap(f, range(10)) + for i in range(10): + try: + x = it.next() + except ZeroDivisionError: + if i == 5: + pass + except StopIteration: + break + else: + if i == 5: + raise AssertionError, 'expected ZeroDivisionError' + + assert i == 9 + print '\tGot ZeroDivisionError as expected from IMapIterator.next()' + print + + # + # Testing timeouts + # + + print 'Testing ApplyResult.get() with timeout:', + res = pool.apply_async(calculate, TASKS[0]) + while 1: + sys.stdout.flush() + try: + sys.stdout.write('\n\t%s' % res.get(0.02)) + break + except multiprocessing.TimeoutError: + sys.stdout.write('.') + print + print + + print 'Testing IMapIterator.next() with timeout:', + it = pool.imap(calculatestar, TASKS) + while 1: + sys.stdout.flush() + try: + sys.stdout.write('\n\t%s' % it.next(0.02)) + except StopIteration: + break + except multiprocessing.TimeoutError: + sys.stdout.write('.') + print + print + + # + # Testing callback + # + + print 'Testing callback:' + + A = [] + B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729] + + r = pool.apply_async(mul, (7, 8), callback=A.append) + r.wait() + + r = pool.map_async(pow3, range(10), callback=A.extend) + r.wait() + + if A == B: + print '\tcallbacks succeeded\n' + else: + print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B) + + # + # Check there are no outstanding tasks + # + + assert not pool._cache, 'cache = %r' % pool._cache + + # + # Check close() methods + # + + print 'Testing close():' + + for worker in pool._pool: + assert worker.is_alive() + + result = pool.apply_async(time.sleep, [0.5]) + pool.close() + pool.join() + + assert result.get() is None + + for worker in pool._pool: + assert not worker.is_alive() + + print '\tclose() succeeded\n' + + # + # Check terminate() method + # + + print 'Testing terminate():' + + pool = multiprocessing.Pool(2) + DELTA = 0.1 + ignore = pool.apply(pow3, [2]) + results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] + pool.terminate() + pool.join() + + for worker in pool._pool: + assert not worker.is_alive() + + print '\tterminate() succeeded\n' + + # + # Check garbage collection + # + + print 'Testing garbage collection:' + + pool = multiprocessing.Pool(2) + DELTA = 0.1 + processes = pool._pool + ignore = pool.apply(pow3, [2]) + results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] + + results = pool = None + + time.sleep(DELTA * 2) + + for worker in processes: + assert not worker.is_alive() + + print '\tgarbage collection succeeded\n' + + +if __name__ == '__main__': + multiprocessing.freeze_support() + + assert len(sys.argv) in (1, 2) + + if len(sys.argv) == 1 or sys.argv[1] == 'processes': + print ' Using processes '.center(79, '-') + elif sys.argv[1] == 'threads': + print ' Using threads '.center(79, '-') + import multiprocessing.dummy as multiprocessing + else: + print 'Usage:\n\t%s [processes | threads]' % sys.argv[0] + raise SystemExit(2) + + test() diff --git a/Doc/includes/mp_synchronize.py b/Doc/includes/mp_synchronize.py new file mode 100644 index 0000000..8cf11bd --- /dev/null +++ b/Doc/includes/mp_synchronize.py @@ -0,0 +1,273 @@ +# +# A test file for the `multiprocessing` package +# + +import time, sys, random +from Queue import Empty + +import multiprocessing # may get overwritten + + +#### TEST_VALUE + +def value_func(running, mutex): + random.seed() + time.sleep(random.random()*4) + + mutex.acquire() + print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished' + running.value -= 1 + mutex.release() + +def test_value(): + TASKS = 10 + running = multiprocessing.Value('i', TASKS) + mutex = multiprocessing.Lock() + + for i in range(TASKS): + p = multiprocessing.Process(target=value_func, args=(running, mutex)) + p.start() + + while running.value > 0: + time.sleep(0.08) + mutex.acquire() + print running.value, + sys.stdout.flush() + mutex.release() + + print + print 'No more running processes' + + +#### TEST_QUEUE + +def queue_func(queue): + for i in range(30): + time.sleep(0.5 * random.random()) + queue.put(i*i) + queue.put('STOP') + +def test_queue(): + q = multiprocessing.Queue() + + p = multiprocessing.Process(target=queue_func, args=(q,)) + p.start() + + o = None + while o != 'STOP': + try: + o = q.get(timeout=0.3) + print o, + sys.stdout.flush() + except Empty: + print 'TIMEOUT' + + print + + +#### TEST_CONDITION + +def condition_func(cond): + cond.acquire() + print '\t' + str(cond) + time.sleep(2) + print '\tchild is notifying' + print '\t' + str(cond) + cond.notify() + cond.release() + +def test_condition(): + cond = multiprocessing.Condition() + + p = multiprocessing.Process(target=condition_func, args=(cond,)) + print cond + + cond.acquire() + print cond + cond.acquire() + print cond + + p.start() + + print 'main is waiting' + cond.wait() + print 'main has woken up' + + print cond + cond.release() + print cond + cond.release() + + p.join() + print cond + + +#### TEST_SEMAPHORE + +def semaphore_func(sema, mutex, running): + sema.acquire() + + mutex.acquire() + running.value += 1 + print running.value, 'tasks are running' + mutex.release() + + random.seed() + time.sleep(random.random()*2) + + mutex.acquire() + running.value -= 1 + print '%s has finished' % multiprocessing.current_process() + mutex.release() + + sema.release() + +def test_semaphore(): + sema = multiprocessing.Semaphore(3) + mutex = multiprocessing.RLock() + running = multiprocessing.Value('i', 0) + + processes = [ + multiprocessing.Process(target=semaphore_func, + args=(sema, mutex, running)) + for i in range(10) + ] + + for p in processes: + p.start() + + for p in processes: + p.join() + + +#### TEST_JOIN_TIMEOUT + +def join_timeout_func(): + print '\tchild sleeping' + time.sleep(5.5) + print '\n\tchild terminating' + +def test_join_timeout(): + p = multiprocessing.Process(target=join_timeout_func) + p.start() + + print 'waiting for process to finish' + + while 1: + p.join(timeout=1) + if not p.is_alive(): + break + print '.', + sys.stdout.flush() + + +#### TEST_EVENT + +def event_func(event): + print '\t%r is waiting' % multiprocessing.current_process() + event.wait() + print '\t%r has woken up' % multiprocessing.current_process() + +def test_event(): + event = multiprocessing.Event() + + processes = [multiprocessing.Process(target=event_func, args=(event,)) + for i in range(5)] + + for p in processes: + p.start() + + print 'main is sleeping' + time.sleep(2) + + print 'main is setting event' + event.set() + + for p in processes: + p.join() + + +#### TEST_SHAREDVALUES + +def sharedvalues_func(values, arrays, shared_values, shared_arrays): + for i in range(len(values)): + v = values[i][1] + sv = shared_values[i].value + assert v == sv + + for i in range(len(values)): + a = arrays[i][1] + sa = list(shared_arrays[i][:]) + assert a == sa + + print 'Tests passed' + +def test_sharedvalues(): + values = [ + ('i', 10), + ('h', -2), + ('d', 1.25) + ] + arrays = [ + ('i', range(100)), + ('d', [0.25 * i for i in range(100)]), + ('H', range(1000)) + ] + + shared_values = [multiprocessing.Value(id, v) for id, v in values] + shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays] + + p = multiprocessing.Process( + target=sharedvalues_func, + args=(values, arrays, shared_values, shared_arrays) + ) + p.start() + p.join() + + assert p.get_exitcode() == 0 + + +#### + +def test(namespace=multiprocessing): + global multiprocessing + + multiprocessing = namespace + + for func in [ test_value, test_queue, test_condition, + test_semaphore, test_join_timeout, test_event, + test_sharedvalues ]: + + print '\n\t######## %s\n' % func.__name__ + func() + + ignore = multiprocessing.active_children() # cleanup any old processes + if hasattr(multiprocessing, '_debug_info'): + info = multiprocessing._debug_info() + if info: + print info + raise ValueError, 'there should be no positive refcounts left' + + +if __name__ == '__main__': + multiprocessing.freeze_support() + + assert len(sys.argv) in (1, 2) + + if len(sys.argv) == 1 or sys.argv[1] == 'processes': + print ' Using processes '.center(79, '-') + namespace = multiprocessing + elif sys.argv[1] == 'manager': + print ' Using processes and a manager '.center(79, '-') + namespace = multiprocessing.Manager() + namespace.Process = multiprocessing.Process + namespace.current_process = multiprocessing.current_process + namespace.active_children = multiprocessing.active_children + elif sys.argv[1] == 'threads': + print ' Using threads '.center(79, '-') + import multiprocessing.dummy as namespace + else: + print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0] + raise SystemExit, 2 + + test(namespace) diff --git a/Doc/includes/mp_webserver.py b/Doc/includes/mp_webserver.py new file mode 100644 index 0000000..15d2b6b --- /dev/null +++ b/Doc/includes/mp_webserver.py @@ -0,0 +1,67 @@ +# +# Example where a pool of http servers share a single listening socket +# +# On Windows this module depends on the ability to pickle a socket +# object so that the worker processes can inherit a copy of the server +# object. (We import `multiprocessing.reduction` to enable this pickling.) +# +# Not sure if we should synchronize access to `socket.accept()` method by +# using a process-shared lock -- does not seem to be necessary. +# + +import os +import sys + +from multiprocessing import Process, current_process, freeze_support +from BaseHTTPServer import HTTPServer +from SimpleHTTPServer import SimpleHTTPRequestHandler + +if sys.platform == 'win32': + import multiprocessing.reduction # make sockets pickable/inheritable + + +def note(format, *args): + sys.stderr.write('[%s]\t%s\n' % (current_process().get_name(),format%args)) + + +class RequestHandler(SimpleHTTPRequestHandler): + # we override log_message() to show which process is handling the request + def log_message(self, format, *args): + note(format, *args) + +def serve_forever(server): + note('starting server') + try: + server.serve_forever() + except KeyboardInterrupt: + pass + + +def runpool(address, number_of_processes): + # create a single server object -- children will each inherit a copy + server = HTTPServer(address, RequestHandler) + + # create child processes to act as workers + for i in range(number_of_processes-1): + Process(target=serve_forever, args=(server,)).start() + + # main process also acts as a worker + serve_forever(server) + + +def test(): + DIR = os.path.join(os.path.dirname(__file__), '..') + ADDRESS = ('localhost', 8000) + NUMBER_OF_PROCESSES = 4 + + print 'Serving at http://%s:%d using %d worker processes' % \ + (ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES) + print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32'] + + os.chdir(DIR) + runpool(ADDRESS, NUMBER_OF_PROCESSES) + + +if __name__ == '__main__': + freeze_support() + test() diff --git a/Doc/includes/mp_workers.py b/Doc/includes/mp_workers.py new file mode 100644 index 0000000..795e6cb --- /dev/null +++ b/Doc/includes/mp_workers.py @@ -0,0 +1,87 @@ +# +# Simple example which uses a pool of workers to carry out some tasks. +# +# Notice that the results will probably not come out of the output +# queue in the same in the same order as the corresponding tasks were +# put on the input queue. If it is important to get the results back +# in the original order then consider using `Pool.map()` or +# `Pool.imap()` (which will save on the amount of code needed anyway). +# + +import time +import random + +from multiprocessing import Process, Queue, current_process, freeze_support + +# +# Function run by worker processes +# + +def worker(input, output): + for func, args in iter(input.get, 'STOP'): + result = calculate(func, args) + output.put(result) + +# +# Function used to calculate result +# + +def calculate(func, args): + result = func(*args) + return '%s says that %s%s = %s' % \ + (current_process().get_name(), func.__name__, args, result) + +# +# Functions referenced by tasks +# + +def mul(a, b): + time.sleep(0.5*random.random()) + return a * b + +def plus(a, b): + time.sleep(0.5*random.random()) + return a + b + +# +# +# + +def test(): + NUMBER_OF_PROCESSES = 4 + TASKS1 = [(mul, (i, 7)) for i in range(20)] + TASKS2 = [(plus, (i, 8)) for i in range(10)] + + # Create queues + task_queue = Queue() + done_queue = Queue() + + # Submit tasks + for task in TASKS1: + task_queue.put(task) + + # Start worker processes + for i in range(NUMBER_OF_PROCESSES): + Process(target=worker, args=(task_queue, done_queue)).start() + + # Get and print results + print 'Unordered results:' + for i in range(len(TASKS1)): + print '\t', done_queue.get() + + # Add more tasks using `put()` + for task in TASKS2: + task_queue.put(task) + + # Get and print some more results + for i in range(len(TASKS2)): + print '\t', done_queue.get() + + # Tell child processes to stop + for i in range(NUMBER_OF_PROCESSES): + task_queue.put('STOP') + + +if __name__ == '__main__': + freeze_support() + test() |