diff options
author | Benjamin Peterson <benjamin@python.org> | 2008-06-11 16:44:04 (GMT) |
---|---|---|
committer | Benjamin Peterson <benjamin@python.org> | 2008-06-11 16:44:04 (GMT) |
commit | e711cafab13efc9c1fe6c5cd75826401445eb585 (patch) | |
tree | 091a6334fdf6ccdcb93027302c5e038570ca04a4 /Doc/includes/mp_pool.py | |
parent | eec3d7137929611b98dd593cd2f122cd91b723b2 (diff) | |
download | cpython-e711cafab13efc9c1fe6c5cd75826401445eb585.zip cpython-e711cafab13efc9c1fe6c5cd75826401445eb585.tar.gz cpython-e711cafab13efc9c1fe6c5cd75826401445eb585.tar.bz2 |
Merged revisions 64104,64117 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r64104 | benjamin.peterson | 2008-06-10 21:40:25 -0500 (Tue, 10 Jun 2008) | 2 lines
add the multiprocessing package to fulfill PEP 371
........
r64117 | benjamin.peterson | 2008-06-11 07:26:31 -0500 (Wed, 11 Jun 2008) | 2 lines
fix import of multiprocessing by juggling imports
........
Diffstat (limited to 'Doc/includes/mp_pool.py')
-rw-r--r-- | Doc/includes/mp_pool.py | 311 |
1 files changed, 311 insertions, 0 deletions
diff --git a/Doc/includes/mp_pool.py b/Doc/includes/mp_pool.py new file mode 100644 index 0000000..b937b86 --- /dev/null +++ b/Doc/includes/mp_pool.py @@ -0,0 +1,311 @@ +# +# A test of `multiprocessing.Pool` class +# + +import multiprocessing +import time +import random +import sys + +# +# Functions used by test code +# + +def calculate(func, args): + result = func(*args) + return '%s says that %s%s = %s' % ( + multiprocessing.current_process().get_name(), + func.__name__, args, result + ) + +def calculatestar(args): + return calculate(*args) + +def mul(a, b): + time.sleep(0.5*random.random()) + return a * b + +def plus(a, b): + time.sleep(0.5*random.random()) + return a + b + +def f(x): + return 1.0 / (x-5.0) + +def pow3(x): + return x**3 + +def noop(x): + pass + +# +# Test code +# + +def test(): + print 'cpu_count() = %d\n' % multiprocessing.cpu_count() + + # + # Create pool + # + + PROCESSES = 4 + print 'Creating pool with %d processes\n' % PROCESSES + pool = multiprocessing.Pool(PROCESSES) + print 'pool = %s' % pool + print + + # + # Tests + # + + TASKS = [(mul, (i, 7)) for i in range(10)] + \ + [(plus, (i, 8)) for i in range(10)] + + results = [pool.apply_async(calculate, t) for t in TASKS] + imap_it = pool.imap(calculatestar, TASKS) + imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) + + print 'Ordered results using pool.apply_async():' + for r in results: + print '\t', r.get() + print + + print 'Ordered results using pool.imap():' + for x in imap_it: + print '\t', x + print + + print 'Unordered results using pool.imap_unordered():' + for x in imap_unordered_it: + print '\t', x + print + + print 'Ordered results using pool.map() --- will block till complete:' + for x in pool.map(calculatestar, TASKS): + print '\t', x + print + + # + # Simple benchmarks + # + + N = 100000 + print 'def pow3(x): return x**3' + + t = time.time() + A = map(pow3, xrange(N)) + print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \ + (N, time.time() - t) + + t = time.time() + B = pool.map(pow3, xrange(N)) + print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \ + (N, time.time() - t) + + t = time.time() + C = list(pool.imap(pow3, xrange(N), chunksize=N//8)) + print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \ + ' seconds' % (N, N//8, time.time() - t) + + assert A == B == C, (len(A), len(B), len(C)) + print + + L = [None] * 1000000 + print 'def noop(x): pass' + print 'L = [None] * 1000000' + + t = time.time() + A = map(noop, L) + print '\tmap(noop, L):\n\t\t%s seconds' % \ + (time.time() - t) + + t = time.time() + B = pool.map(noop, L) + print '\tpool.map(noop, L):\n\t\t%s seconds' % \ + (time.time() - t) + + t = time.time() + C = list(pool.imap(noop, L, chunksize=len(L)//8)) + print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \ + (len(L)//8, time.time() - t) + + assert A == B == C, (len(A), len(B), len(C)) + print + + del A, B, C, L + + # + # Test error handling + # + + print 'Testing error handling:' + + try: + print pool.apply(f, (5,)) + except ZeroDivisionError: + print '\tGot ZeroDivisionError as expected from pool.apply()' + else: + raise AssertionError, 'expected ZeroDivisionError' + + try: + print pool.map(f, range(10)) + except ZeroDivisionError: + print '\tGot ZeroDivisionError as expected from pool.map()' + else: + raise AssertionError, 'expected ZeroDivisionError' + + try: + print list(pool.imap(f, range(10))) + except ZeroDivisionError: + print '\tGot ZeroDivisionError as expected from list(pool.imap())' + else: + raise AssertionError, 'expected ZeroDivisionError' + + it = pool.imap(f, range(10)) + for i in range(10): + try: + x = it.next() + except ZeroDivisionError: + if i == 5: + pass + except StopIteration: + break + else: + if i == 5: + raise AssertionError, 'expected ZeroDivisionError' + + assert i == 9 + print '\tGot ZeroDivisionError as expected from IMapIterator.next()' + print + + # + # Testing timeouts + # + + print 'Testing ApplyResult.get() with timeout:', + res = pool.apply_async(calculate, TASKS[0]) + while 1: + sys.stdout.flush() + try: + sys.stdout.write('\n\t%s' % res.get(0.02)) + break + except multiprocessing.TimeoutError: + sys.stdout.write('.') + print + print + + print 'Testing IMapIterator.next() with timeout:', + it = pool.imap(calculatestar, TASKS) + while 1: + sys.stdout.flush() + try: + sys.stdout.write('\n\t%s' % it.next(0.02)) + except StopIteration: + break + except multiprocessing.TimeoutError: + sys.stdout.write('.') + print + print + + # + # Testing callback + # + + print 'Testing callback:' + + A = [] + B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729] + + r = pool.apply_async(mul, (7, 8), callback=A.append) + r.wait() + + r = pool.map_async(pow3, range(10), callback=A.extend) + r.wait() + + if A == B: + print '\tcallbacks succeeded\n' + else: + print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B) + + # + # Check there are no outstanding tasks + # + + assert not pool._cache, 'cache = %r' % pool._cache + + # + # Check close() methods + # + + print 'Testing close():' + + for worker in pool._pool: + assert worker.is_alive() + + result = pool.apply_async(time.sleep, [0.5]) + pool.close() + pool.join() + + assert result.get() is None + + for worker in pool._pool: + assert not worker.is_alive() + + print '\tclose() succeeded\n' + + # + # Check terminate() method + # + + print 'Testing terminate():' + + pool = multiprocessing.Pool(2) + DELTA = 0.1 + ignore = pool.apply(pow3, [2]) + results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] + pool.terminate() + pool.join() + + for worker in pool._pool: + assert not worker.is_alive() + + print '\tterminate() succeeded\n' + + # + # Check garbage collection + # + + print 'Testing garbage collection:' + + pool = multiprocessing.Pool(2) + DELTA = 0.1 + processes = pool._pool + ignore = pool.apply(pow3, [2]) + results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] + + results = pool = None + + time.sleep(DELTA * 2) + + for worker in processes: + assert not worker.is_alive() + + print '\tgarbage collection succeeded\n' + + +if __name__ == '__main__': + multiprocessing.freeze_support() + + assert len(sys.argv) in (1, 2) + + if len(sys.argv) == 1 or sys.argv[1] == 'processes': + print ' Using processes '.center(79, '-') + elif sys.argv[1] == 'threads': + print ' Using threads '.center(79, '-') + import multiprocessing.dummy as multiprocessing + else: + print 'Usage:\n\t%s [processes | threads]' % sys.argv[0] + raise SystemExit(2) + + test() |